From 67562b8092d2fdca50d0406ec88d37fd307191b0 Mon Sep 17 00:00:00 2001 From: Roberto Date: Mon, 4 May 2026 15:09:58 +0200 Subject: [PATCH] Add task brief research agent: Stage 1 deep-research + canvas draft emission - run_task_brief_research() runner with brief-specific tool set and max_steps=12 - New agents: client_agent (list_clients, get_client) and relations_agent (query_relations) - search_associative tool wrapping MemoryMiddleware semantic search - BRIEF_RESEARCH_TOOLS constant: read-only task/project/note/timeline + memory + client/relations - canvas block extraction in output_formatter (splits visible text from draft) - device_ws.py: task_brief_research request type; emits canvas_draft mutation on stream_end - Stage 2 briefMode: briefing_context injected into floating system prompt when present - briefingContext kwarg wired through compile_prompt call chain Co-Authored-By: Claude Sonnet 4.6 --- .env.example | 4 + app/agents/client_agent.py | 52 ++++++++++ app/agents/relations_agent.py | 63 ++++++++++++ app/api/routes/device_ws.py | 99 ++++++++++++++++++- app/config/settings.py | 5 +- app/core/deep_agent.py | 181 +++++++++++++++++++++++++++++++++- app/core/llm.py | 1 + app/core/output_formatter.py | 24 +++++ app/schemas.py | 3 + 9 files changed, 427 insertions(+), 5 deletions(-) create mode 100644 app/agents/client_agent.py create mode 100644 app/agents/relations_agent.py diff --git a/.env.example b/.env.example index b8bce20..2c1990e 100644 --- a/.env.example +++ b/.env.example @@ -56,6 +56,10 @@ LLM_MODEL_CLOUD_PROCESSOR= # A small model (e.g. gpt-4o-mini) is sufficient. # LLM_MODEL_BRIEF_AGENT= +# Task-brief-agent — per-task deep research (Stage 1 executive assistant). +# Needs tool-use + reasoning; a capable model recommended (e.g. gpt-4o, gemini-2.5-flash). +# LLM_MODEL_TASK_BRIEF_AGENT= + # Setup-agent — guided journey to build an AgentConfig via WebSocket chat. LLM_MODEL_SETUP_AGENT= diff --git a/app/agents/client_agent.py b/app/agents/client_agent.py new file mode 100644 index 0000000..df1e945 --- /dev/null +++ b/app/agents/client_agent.py @@ -0,0 +1,52 @@ +"""Client agent — read-only tools for the clients table.""" + +from __future__ import annotations + +import json +from typing import Any + +from langchain_core.tools import tool + +from app.core.ws_context import execute_on_client + + +@tool +async def list_clients(search: str = "", limit: int = 20) -> str: + """List clients, optionally filtered by a name/email substring search. + + search: optional substring to match against client name or email. + limit: max rows to return (default 20). + """ + filters: dict[str, Any] = {"limit": limit} + if search: + filters["search"] = search + + result = await execute_on_client(action="select", table="clients", filters=filters) + rows = result.get("rows", []) + if not rows: + return "No clients found." + lines = [ + f"- {r.get('name', '?')} (id: {r.get('id')}, email: {r.get('email', '')}, " + f"company: {r.get('company', '')})" + for r in rows + ] + return f"Found {len(rows)} client(s):\n" + "\n".join(lines) + + +@tool +async def get_client(id: str) -> str: + """Get full details for one client by UUID. + + id: the client's UUID. + """ + if not id: + return "Client id is required." + + result = await execute_on_client(action="get", table="clients", data={"id": id}) + row = result.get("row") or result.get("rows", [None])[0] if result else None + if not row: + return f"Client '{id}' not found." + return f"Client details:\n{json.dumps(row, ensure_ascii=False, indent=2)}" + + +CLIENT_TOOLS: list[Any] = [list_clients, get_client] diff --git a/app/agents/relations_agent.py b/app/agents/relations_agent.py new file mode 100644 index 0000000..5e98ab7 --- /dev/null +++ b/app/agents/relations_agent.py @@ -0,0 +1,63 @@ +"""Relations agent — read-only tool wrapping MemoryMiddleware.query_relations.""" + +from __future__ import annotations + +from typing import Any + +from langchain_core.tools import tool + +from app.core.memory_middleware import MemoryMiddleware +from app.db import async_session + +# Injected at tool-factory time by _brief_research_tools(); not a module-level global. +# Each tool closure captures the user_id bound at factory time. + + +def make_query_relations_tool(user_id: str, trace_id: str | None = None) -> Any: + """Return a query_relations tool bound to *user_id*.""" + + @tool + async def query_relations( + subject_label: str = "", + predicate: str = "", + object_label: str = "", + limit: int = 10, + ) -> str: + """Query the relational memory graph for entity relationships. + + Returns rows where subject ↔ predicate ↔ object match the given filters. + All parameters are optional — omit to retrieve all relations up to limit. + + subject_label: entity label on the left side (e.g. a client name, "Acme Corp"). + predicate: relationship type (e.g. "mentioned_in", "works_at", "related_to"). + object_label: entity label on the right side (e.g. a project name, "Website Redesign"). + limit: max rows to return (default 10). + """ + import logging + logger = logging.getLogger(__name__) + logger.info( + "relations_agent: query_relations trace=%s user=%s subject=%r predicate=%r object=%r", + trace_id or "-", user_id, subject_label, predicate, object_label, + ) + + async with async_session() as db: + memory = MemoryMiddleware(db) + rows = await memory.query_relations( + user_id=user_id, + subject=subject_label or None, + predicate=predicate or None, + object_=object_label or None, + limit=limit, + ) + + if not rows: + return "No relational memory entries found for the given filters." + + lines = [ + f"- {r.subject_label} —[{r.predicate}]→ {r.object_label}" + + (f" (confidence: {r.confidence:.2f})" if r.confidence is not None else "") + for r in rows + ] + return f"Found {len(rows)} relation(s):\n" + "\n".join(lines) + + return query_relations diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index e66304a..91de0f4 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -43,7 +43,8 @@ from app.api.routes.agent_setup import handle_journey_message, handle_journey_st from app.config.settings import settings from app.core.agent_runner import trigger_pending_runs from app.core.brief_agent import run_home_brief, run_project_brief -from app.core.deep_agent import run_floating_stream, run_home_stream +from app.core.deep_agent import run_floating_stream, run_home_stream, run_task_brief_research_stream +from app.core.output_formatter import extract_canvas_block from app.core.device_manager import device_manager from app.core.memory_middleware import MemoryMiddleware from app.core.output_formatter import StreamFormatter @@ -164,6 +165,11 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: _handle_brief_request(websocket, user_id, frame) ) + elif frame_type == WsFrameType.task_brief_request: + asyncio.create_task( + _handle_task_brief_request(websocket, user_id, frame) + ) + elif frame_type == WsFrameType.journey_start: asyncio.create_task( _handle_journey_start(websocket, user_id, frame) @@ -415,6 +421,97 @@ async def _handle_brief_request( ) +# ── v6 Task Brief Handler ──────────────────────────────────────────── + + +async def _handle_task_brief_request( + websocket: WebSocket, + user_id: str, + frame: dict, +) -> None: + """Handle a task_brief_request frame — Stage-1 executive assistant deep research. + + Streams the briefing markdown back to the client. + On stream_end, emits a ``canvas_draft`` mutation if the agent produced one. + """ + request_id = frame.get("request_id") or str(uuid4()) + session_id = frame.get("session_id") or str(uuid4()) + task_id: str = frame.get("task_id") or frame.get("taskId") or "" + + logger.info( + "device_ws: task_brief_request_start user=%s req=%s task=%s [cache_miss]", + user_id, request_id, task_id, + ) + + if not task_id: + await websocket.send_text( + WsStreamEnd(request_id=request_id, error="task_id is required").model_dump_json() + ) + return + + async with async_session() as db: + memory = MemoryMiddleware(db) + memory_context = await memory.enrich_context( + user_id, + f"task brief: {task_id}", + trace_id=request_id, + session_id=session_id, + ) + + context: dict = { + "_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id}, + "format_prefs": frame.get("format_prefs"), + **memory_context, + } + + executor = await _make_ws_executor(websocket, user_id) + set_client_executor(executor) + response_chunks: list[str] = [] + + try: + event_stream = run_task_brief_research_stream(user_id, task_id, context) + formatter = StreamFormatter(request_id=request_id) + async for ws_frame in formatter.format(event_stream): + if ws_frame.type == "stream_text": # type: ignore[union-attr] + response_chunks.append(ws_frame.chunk) # type: ignore[union-attr] + await websocket.send_text(ws_frame.model_dump_json()) + elif ws_frame.type == "stream_start": + await websocket.send_text(ws_frame.model_dump_json()) + # stream_end is emitted below with mutations — skip formatter's version + except Exception as exc: + logger.error( + "device_ws: task_brief_request failed user=%s req=%s task=%s: %s", + user_id, request_id, task_id, exc, + ) + await websocket.send_text( + WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json() + ) + return + finally: + clear_client_executor() + + # Extract canvas block then emit stream_end with optional mutations. + full_response = "".join(response_chunks) + _visible, canvas_content, canvas_kind = extract_canvas_block(full_response) + + mutations: list[dict] = [] + if canvas_content: + mutations.append({ + "type": "canvas_draft", + "content": canvas_content, + "kind": canvas_kind, + }) + + await websocket.send_text( + WsStreamEnd(request_id=request_id, mutations=mutations or None).model_dump_json() + ) + + logger.info( + "device_ws: task_brief_request_end user=%s req=%s task=%s response_chars=%d canvas=%s", + user_id, request_id, task_id, len(full_response), canvas_kind or "none", + ) + + # ── v4 Journey Handlers ───────────────────────────────────────────── diff --git a/app/config/settings.py b/app/config/settings.py index 582c46c..a8bf029 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -28,8 +28,9 @@ class Settings(BaseSettings): LLM_MODEL_FLOATING_AGENT: str = "" # floating-agent (contextual chat) LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner) LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner) - LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs) - LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey + LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs) + LLM_MODEL_TASK_BRIEF_AGENT: str = "" # task-brief-agent (per-task deep research) + LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide) LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining) LLM_MODEL_MEMORY_AUDITOR: str = "" # memory-auditor (Phase 7 weekly audit) diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index a3a7c7d..4141f47 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -12,8 +12,10 @@ from typing import Any, Literal from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.tools import tool +from app.agents.client_agent import CLIENT_TOOLS from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS +from app.agents.relations_agent import make_query_relations_tool from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.agent_session_buffer import session_buffer @@ -303,6 +305,80 @@ For specific dates not listed, compute local-midnight in the user timezone and c {request_context}\ """ +_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\ +You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task. +Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity} + +# Research workflow +Follow these steps in order, using tools: +1. Read the task fully (title, description, due date, priority, status, project, comments). +2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client. +3. If the project has a clientId: call `get_client(id)` to retrieve full client details. +4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects. +5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions. +6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`. +7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece. + +# Output structure +Write the briefing in the user's language. Use this exact structure: + +**What needs to be done** +(1–2 sentences, concrete and specific — what action the user must take) + +**Context you should know** +(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies) + +**Suggested first step** +(one specific, immediately actionable instruction) + +If this is a writing task, append a canvas block at the very end: + +...ready-to-use draft here... + + +Do NOT include the canvas block for non-writing tasks. +Do NOT repeat verbatim task fields the user already sees in the UI. +Be concrete — no vague advice. Every bullet should be a fact that changes what the user does. + +# Date context +{date_context} + +# Language +{language_instruction} + +# Known people & projects +{relational_memory} + +# Request context +{request_context}\ +""" + +_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\ +You are an executive assistant continuing a conversation with your principal. +You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity} + +Your briefing: +--- +{briefing_context} +--- + +Continue from here. Do NOT repeat the briefing. Refer to it when relevant. +Help the user execute: edit drafts, refine wording, look up additional details, plan next steps. +Stay terse — your principal is a busy executive. + +# Date context +{date_context} + +# Language +{language_instruction} + +# Known people & projects +{relational_memory} + +# Request context +{request_context}\ +""" + _FLOATING_DOMAIN_CLASSIFIER_PROMPT = ( "You are a strict domain classifier for websocket floating requests. " "Return ONLY a JSON object with keys: type, id, section. " @@ -679,6 +755,25 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: lines = [f"- {item}" for item in results] return "Recall memory results:\n" + "\n".join(lines) + @tool + async def search_associative(query: str, limit: int = 5) -> str: + """Semantic search across associative (archival) memory for a given query. + + Use this to surface long-term memories related to a topic, client, or task + that may not appear in recent episodes. + + query: natural-language search phrase. + limit: max results (default 5). + """ + logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80]) + async with async_session() as db: + memory = MemoryMiddleware(db) + results = await memory.search_archival(user_id, query, top_k=limit) + if not results: + return "No associative memory results found." + lines = [f"- {item}" for item in results] + return "Associative memory results:\n" + "\n".join(lines) + return [ memory_list_blocks, memory_get, @@ -689,16 +784,33 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: archival_memory_insert, archival_memory_search, conversation_search, + search_associative, ] def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]: """Return memory tools that only read — safe for the read-only brief-agent subset.""" all_mem = _memory_tools(user_id, trace_id) - _read_names = {"memory_list_blocks", "memory_get", "archival_memory_search", "conversation_search"} + _read_names = { + "memory_list_blocks", "memory_get", "archival_memory_search", + "conversation_search", "search_associative", + } return [t for t in all_mem if t.name in _read_names] +def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]: + """Return the full tool palette for Stage-1 task brief research (read-only).""" + return [ + *TASK_TOOLS, + *PROJECT_TOOLS, + *NOTE_TOOLS, + *TIMELINE_TOOLS, + *CLIENT_TOOLS, + *_read_only_memory_tools(user_id, trace_id), + make_query_relations_tool(user_id, trace_id), + ] + + def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]: return [*_all_tools(), *_memory_tools(user_id, trace_id)] @@ -1249,7 +1361,29 @@ async def run_floating_stream( domain = await _infer_floating_domain(message, prepared_context) yield "floating_domain", domain - system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context) + brief_mode: bool = bool(context.get("brief_mode")) + briefing_context_text: str = str(context.get("briefing_context") or "").strip() + + if brief_mode and briefing_context_text: + # Stage 2: inject briefing as ground truth context. + # Pre-substitute {briefing_context} in the template (handles both Langfuse {{}} and fallback {}) + # before compile_prompt sees the remaining standard variables. + template, langfuse_prompt = get_prompt_or_fallback( + "task_brief_followup_system", + _TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT, + ) + system_prompt = compile_prompt( + template, langfuse_prompt, + date_context=_datetime_context_injection(prepared_context).strip(), + language_instruction=_language_instruction(prepared_context).strip(), + user_identity=_user_identity_injection(prepared_context).strip(), + relational_memory=_relational_memory_injection(prepared_context).strip(), + proactive_hints=_proactive_hints_injection(prepared_context).strip(), + request_context=_request_context_block(prepared_context), + briefing_context=briefing_context_text, + ) + else: + system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context) sanitizer = _FloatingStreamSanitizer() emitted_sanitized = False raw_chunks: list[str] = [] @@ -1283,6 +1417,49 @@ async def run_floating_stream( yield "token", _fallback_from_raw_floating_text("".join(raw_chunks)) +async def run_task_brief_research_stream( + user_id: str, + task_id: str, + context: dict[str, Any], +) -> AsyncGenerator[tuple[str, Any], None]: + """Stage-1 executive assistant: deep research for one task. + + Yields ``("token", chunk)`` events like other stream runners. + The final concatenated text may contain a ``...`` block + which the WS handler strips and emits as a ``canvas_draft`` mutation. + """ + prepared_context = await _prepare_context(f"task:{task_id}", context) + tools = _brief_research_tools(user_id, _trace_id_from_context(prepared_context)) + + # Inject task_id so the agent knows what to look up first. + research_message = ( + f"Prepare a briefing dossier for task ID: {task_id}\n" + "Follow the research workflow: read the task, then project, then client, " + "then cross-project relations, then relevant memory. " + "End with a concrete suggested first step. " + "If this is a writing task, include a draft." + ) + + system_prompt, langfuse_prompt = _build_system_prompt( + "task_brief_research_system", + _TASK_BRIEF_RESEARCH_SYSTEM_PROMPT, + prepared_context, + ) + + async for event in _run_single_agent_stream( + user_id=user_id, + system_prompt=system_prompt, + message=research_message, + context=prepared_context, + max_steps=12, + langfuse_prompt=langfuse_prompt, + agent_name="task-brief-agent", + tools=tools, + conversation_history=None, + ): + yield event + + async def update_core_memory(user_id: str, key: str, value: str) -> None: """Compatibility helper kept for callers that expect explicit memory update API.""" async with async_session() as db: diff --git a/app/core/llm.py b/app/core/llm.py index b74bc34..586d25b 100644 --- a/app/core/llm.py +++ b/app/core/llm.py @@ -107,6 +107,7 @@ _AGENT_MODEL_SETTINGS: dict[str, Callable[[], str]] = { "unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL, "cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL, "brief-agent": lambda: settings.LLM_MODEL_BRIEF_AGENT or settings.LLM_MODEL, + "task-brief-agent": lambda: settings.LLM_MODEL_TASK_BRIEF_AGENT or settings.LLM_MODEL, "setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL, "memory-extractor": lambda: settings.LLM_MODEL_MEMORY_EXTRACTOR or "gpt-4o-mini", "memory-miner": lambda: settings.LLM_MODEL_MEMORY_MINER or "gpt-4o-mini", diff --git a/app/core/output_formatter.py b/app/core/output_formatter.py index 3c6f6df..03026e1 100644 --- a/app/core/output_formatter.py +++ b/app/core/output_formatter.py @@ -2,11 +2,35 @@ from __future__ import annotations +import re from collections.abc import AsyncGenerator from typing import Any from app.schemas import WsFloatingDomain, WsStreamEnd, WsStreamStart, WsStreamText +# Matches ... blocks (single-line or multiline). +_CANVAS_BLOCK_RE = re.compile( + r'(.*?)', + re.DOTALL | re.IGNORECASE, +) + + +def extract_canvas_block(text: str) -> tuple[str, str | None, str | None]: + """Strip the first ... block from *text*. + + Returns ``(visible_text, canvas_content, canvas_kind)``. + ``canvas_content`` and ``canvas_kind`` are ``None`` when no block is found. + """ + match = _CANVAS_BLOCK_RE.search(text) + if not match: + return text, None, None + + canvas_kind = match.group(1).strip() + canvas_content = match.group(2).strip() + visible = text[: match.start()] + text[match.end() :] + visible = visible.strip() + return visible, canvas_content, canvas_kind + WsFrame = WsStreamStart | WsStreamText | WsStreamEnd | WsFloatingDomain diff --git a/app/schemas.py b/app/schemas.py index 4c33386..6bf1db5 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -87,6 +87,8 @@ class WsFrameType(str, Enum): journey_reply = "journey_reply" # ── v5 brief frame types ────────────────────────────────────────── brief_request = "brief_request" + # ── v6 task brief frame types ───────────────────────────────────── + task_brief_request = "task_brief_request" class WsToolCall(BaseModel): @@ -209,6 +211,7 @@ class WsStreamEnd(BaseModel): type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end request_id: str error: str | None = None + mutations: list[dict[str, Any]] | None = None class WsDomain(BaseModel):