diff --git a/.env.example b/.env.example
index b8bce20..2c1990e 100644
--- a/.env.example
+++ b/.env.example
@@ -56,6 +56,10 @@ LLM_MODEL_CLOUD_PROCESSOR=
# A small model (e.g. gpt-4o-mini) is sufficient.
# LLM_MODEL_BRIEF_AGENT=
+# Task-brief-agent — per-task deep research (Stage 1 executive assistant).
+# Needs tool-use + reasoning; a capable model recommended (e.g. gpt-4o, gemini-2.5-flash).
+# LLM_MODEL_TASK_BRIEF_AGENT=
+
# Setup-agent — guided journey to build an AgentConfig via WebSocket chat.
LLM_MODEL_SETUP_AGENT=
diff --git a/app/agents/client_agent.py b/app/agents/client_agent.py
new file mode 100644
index 0000000..df1e945
--- /dev/null
+++ b/app/agents/client_agent.py
@@ -0,0 +1,52 @@
+"""Client agent — read-only tools for the clients table."""
+
+from __future__ import annotations
+
+import json
+from typing import Any
+
+from langchain_core.tools import tool
+
+from app.core.ws_context import execute_on_client
+
+
+@tool
+async def list_clients(search: str = "", limit: int = 20) -> str:
+ """List clients, optionally filtered by a name/email substring search.
+
+ search: optional substring to match against client name or email.
+ limit: max rows to return (default 20).
+ """
+ filters: dict[str, Any] = {"limit": limit}
+ if search:
+ filters["search"] = search
+
+ result = await execute_on_client(action="select", table="clients", filters=filters)
+ rows = result.get("rows", [])
+ if not rows:
+ return "No clients found."
+ lines = [
+ f"- {r.get('name', '?')} (id: {r.get('id')}, email: {r.get('email', '')}, "
+ f"company: {r.get('company', '')})"
+ for r in rows
+ ]
+ return f"Found {len(rows)} client(s):\n" + "\n".join(lines)
+
+
+@tool
+async def get_client(id: str) -> str:
+ """Get full details for one client by UUID.
+
+ id: the client's UUID.
+ """
+ if not id:
+ return "Client id is required."
+
+ result = await execute_on_client(action="get", table="clients", data={"id": id})
+ row = result.get("row") or result.get("rows", [None])[0] if result else None
+ if not row:
+ return f"Client '{id}' not found."
+ return f"Client details:\n{json.dumps(row, ensure_ascii=False, indent=2)}"
+
+
+CLIENT_TOOLS: list[Any] = [list_clients, get_client]
diff --git a/app/agents/relations_agent.py b/app/agents/relations_agent.py
new file mode 100644
index 0000000..5e98ab7
--- /dev/null
+++ b/app/agents/relations_agent.py
@@ -0,0 +1,63 @@
+"""Relations agent — read-only tool wrapping MemoryMiddleware.query_relations."""
+
+from __future__ import annotations
+
+from typing import Any
+
+from langchain_core.tools import tool
+
+from app.core.memory_middleware import MemoryMiddleware
+from app.db import async_session
+
+# Injected at tool-factory time by _brief_research_tools(); not a module-level global.
+# Each tool closure captures the user_id bound at factory time.
+
+
+def make_query_relations_tool(user_id: str, trace_id: str | None = None) -> Any:
+ """Return a query_relations tool bound to *user_id*."""
+
+ @tool
+ async def query_relations(
+ subject_label: str = "",
+ predicate: str = "",
+ object_label: str = "",
+ limit: int = 10,
+ ) -> str:
+ """Query the relational memory graph for entity relationships.
+
+ Returns rows where subject ↔ predicate ↔ object match the given filters.
+ All parameters are optional — omit to retrieve all relations up to limit.
+
+ subject_label: entity label on the left side (e.g. a client name, "Acme Corp").
+ predicate: relationship type (e.g. "mentioned_in", "works_at", "related_to").
+ object_label: entity label on the right side (e.g. a project name, "Website Redesign").
+ limit: max rows to return (default 10).
+ """
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.info(
+ "relations_agent: query_relations trace=%s user=%s subject=%r predicate=%r object=%r",
+ trace_id or "-", user_id, subject_label, predicate, object_label,
+ )
+
+ async with async_session() as db:
+ memory = MemoryMiddleware(db)
+ rows = await memory.query_relations(
+ user_id=user_id,
+ subject=subject_label or None,
+ predicate=predicate or None,
+ object_=object_label or None,
+ limit=limit,
+ )
+
+ if not rows:
+ return "No relational memory entries found for the given filters."
+
+ lines = [
+ f"- {r.subject_label} —[{r.predicate}]→ {r.object_label}"
+ + (f" (confidence: {r.confidence:.2f})" if r.confidence is not None else "")
+ for r in rows
+ ]
+ return f"Found {len(rows)} relation(s):\n" + "\n".join(lines)
+
+ return query_relations
diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py
index e66304a..91de0f4 100644
--- a/app/api/routes/device_ws.py
+++ b/app/api/routes/device_ws.py
@@ -43,7 +43,8 @@ from app.api.routes.agent_setup import handle_journey_message, handle_journey_st
from app.config.settings import settings
from app.core.agent_runner import trigger_pending_runs
from app.core.brief_agent import run_home_brief, run_project_brief
-from app.core.deep_agent import run_floating_stream, run_home_stream
+from app.core.deep_agent import run_floating_stream, run_home_stream, run_task_brief_research_stream
+from app.core.output_formatter import extract_canvas_block
from app.core.device_manager import device_manager
from app.core.memory_middleware import MemoryMiddleware
from app.core.output_formatter import StreamFormatter
@@ -164,6 +165,11 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None:
_handle_brief_request(websocket, user_id, frame)
)
+ elif frame_type == WsFrameType.task_brief_request:
+ asyncio.create_task(
+ _handle_task_brief_request(websocket, user_id, frame)
+ )
+
elif frame_type == WsFrameType.journey_start:
asyncio.create_task(
_handle_journey_start(websocket, user_id, frame)
@@ -415,6 +421,97 @@ async def _handle_brief_request(
)
+# ── v6 Task Brief Handler ────────────────────────────────────────────
+
+
+async def _handle_task_brief_request(
+ websocket: WebSocket,
+ user_id: str,
+ frame: dict,
+) -> None:
+ """Handle a task_brief_request frame — Stage-1 executive assistant deep research.
+
+ Streams the briefing markdown back to the client.
+ On stream_end, emits a ``canvas_draft`` mutation if the agent produced one.
+ """
+ request_id = frame.get("request_id") or str(uuid4())
+ session_id = frame.get("session_id") or str(uuid4())
+ task_id: str = frame.get("task_id") or frame.get("taskId") or ""
+
+ logger.info(
+ "device_ws: task_brief_request_start user=%s req=%s task=%s [cache_miss]",
+ user_id, request_id, task_id,
+ )
+
+ if not task_id:
+ await websocket.send_text(
+ WsStreamEnd(request_id=request_id, error="task_id is required").model_dump_json()
+ )
+ return
+
+ async with async_session() as db:
+ memory = MemoryMiddleware(db)
+ memory_context = await memory.enrich_context(
+ user_id,
+ f"task brief: {task_id}",
+ trace_id=request_id,
+ session_id=session_id,
+ )
+
+ context: dict = {
+ "_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
+ "format_prefs": frame.get("format_prefs"),
+ **memory_context,
+ }
+
+ executor = await _make_ws_executor(websocket, user_id)
+ set_client_executor(executor)
+ response_chunks: list[str] = []
+
+ try:
+ event_stream = run_task_brief_research_stream(user_id, task_id, context)
+ formatter = StreamFormatter(request_id=request_id)
+ async for ws_frame in formatter.format(event_stream):
+ if ws_frame.type == "stream_text": # type: ignore[union-attr]
+ response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
+ await websocket.send_text(ws_frame.model_dump_json())
+ elif ws_frame.type == "stream_start":
+ await websocket.send_text(ws_frame.model_dump_json())
+ # stream_end is emitted below with mutations — skip formatter's version
+ except Exception as exc:
+ logger.error(
+ "device_ws: task_brief_request failed user=%s req=%s task=%s: %s",
+ user_id, request_id, task_id, exc,
+ )
+ await websocket.send_text(
+ WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json()
+ )
+ return
+ finally:
+ clear_client_executor()
+
+ # Extract canvas block then emit stream_end with optional mutations.
+ full_response = "".join(response_chunks)
+ _visible, canvas_content, canvas_kind = extract_canvas_block(full_response)
+
+ mutations: list[dict] = []
+ if canvas_content:
+ mutations.append({
+ "type": "canvas_draft",
+ "content": canvas_content,
+ "kind": canvas_kind,
+ })
+
+ await websocket.send_text(
+ WsStreamEnd(request_id=request_id, mutations=mutations or None).model_dump_json()
+ )
+
+ logger.info(
+ "device_ws: task_brief_request_end user=%s req=%s task=%s response_chars=%d canvas=%s",
+ user_id, request_id, task_id, len(full_response), canvas_kind or "none",
+ )
+
+
# ── v4 Journey Handlers ─────────────────────────────────────────────
diff --git a/app/config/settings.py b/app/config/settings.py
index 582c46c..a8bf029 100644
--- a/app/config/settings.py
+++ b/app/config/settings.py
@@ -28,8 +28,9 @@ class Settings(BaseSettings):
LLM_MODEL_FLOATING_AGENT: str = "" # floating-agent (contextual chat)
LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner)
LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner)
- LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs)
- LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey
+ LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs)
+ LLM_MODEL_TASK_BRIEF_AGENT: str = "" # task-brief-agent (per-task deep research)
+ LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey
LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide)
LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining)
LLM_MODEL_MEMORY_AUDITOR: str = "" # memory-auditor (Phase 7 weekly audit)
diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py
index a3a7c7d..4141f47 100644
--- a/app/core/deep_agent.py
+++ b/app/core/deep_agent.py
@@ -12,8 +12,10 @@ from typing import Any, Literal
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
+from app.agents.client_agent import CLIENT_TOOLS
from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
+from app.agents.relations_agent import make_query_relations_tool
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.agent_session_buffer import session_buffer
@@ -303,6 +305,80 @@ For specific dates not listed, compute local-midnight in the user timezone and c
{request_context}\
"""
+_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
+You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
+Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
+
+# Research workflow
+Follow these steps in order, using tools:
+1. Read the task fully (title, description, due date, priority, status, project, comments).
+2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
+3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
+4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
+5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
+6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
+7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
+
+# Output structure
+Write the briefing in the user's language. Use this exact structure:
+
+**What needs to be done**
+(1–2 sentences, concrete and specific — what action the user must take)
+
+**Context you should know**
+(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
+
+**Suggested first step**
+(one specific, immediately actionable instruction)
+
+If this is a writing task, append a canvas block at the very end:
+
+
+Do NOT include the canvas block for non-writing tasks.
+Do NOT repeat verbatim task fields the user already sees in the UI.
+Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
+
+# Date context
+{date_context}
+
+# Language
+{language_instruction}
+
+# Known people & projects
+{relational_memory}
+
+# Request context
+{request_context}\
+"""
+
+_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
+You are an executive assistant continuing a conversation with your principal.
+You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
+
+Your briefing:
+---
+{briefing_context}
+---
+
+Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
+Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
+Stay terse — your principal is a busy executive.
+
+# Date context
+{date_context}
+
+# Language
+{language_instruction}
+
+# Known people & projects
+{relational_memory}
+
+# Request context
+{request_context}\
+"""
+
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
"You are a strict domain classifier for websocket floating requests. "
"Return ONLY a JSON object with keys: type, id, section. "
@@ -679,6 +755,25 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
lines = [f"- {item}" for item in results]
return "Recall memory results:\n" + "\n".join(lines)
+ @tool
+ async def search_associative(query: str, limit: int = 5) -> str:
+ """Semantic search across associative (archival) memory for a given query.
+
+ Use this to surface long-term memories related to a topic, client, or task
+ that may not appear in recent episodes.
+
+ query: natural-language search phrase.
+ limit: max results (default 5).
+ """
+ logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
+ async with async_session() as db:
+ memory = MemoryMiddleware(db)
+ results = await memory.search_archival(user_id, query, top_k=limit)
+ if not results:
+ return "No associative memory results found."
+ lines = [f"- {item}" for item in results]
+ return "Associative memory results:\n" + "\n".join(lines)
+
return [
memory_list_blocks,
memory_get,
@@ -689,16 +784,33 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
archival_memory_insert,
archival_memory_search,
conversation_search,
+ search_associative,
]
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return memory tools that only read — safe for the read-only brief-agent subset."""
all_mem = _memory_tools(user_id, trace_id)
- _read_names = {"memory_list_blocks", "memory_get", "archival_memory_search", "conversation_search"}
+ _read_names = {
+ "memory_list_blocks", "memory_get", "archival_memory_search",
+ "conversation_search", "search_associative",
+ }
return [t for t in all_mem if t.name in _read_names]
+def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]:
+ """Return the full tool palette for Stage-1 task brief research (read-only)."""
+ return [
+ *TASK_TOOLS,
+ *PROJECT_TOOLS,
+ *NOTE_TOOLS,
+ *TIMELINE_TOOLS,
+ *CLIENT_TOOLS,
+ *_read_only_memory_tools(user_id, trace_id),
+ make_query_relations_tool(user_id, trace_id),
+ ]
+
+
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
return [*_all_tools(), *_memory_tools(user_id, trace_id)]
@@ -1249,7 +1361,29 @@ async def run_floating_stream(
domain = await _infer_floating_domain(message, prepared_context)
yield "floating_domain", domain
- system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
+ brief_mode: bool = bool(context.get("brief_mode"))
+ briefing_context_text: str = str(context.get("briefing_context") or "").strip()
+
+ if brief_mode and briefing_context_text:
+ # Stage 2: inject briefing as ground truth context.
+ # Pre-substitute {briefing_context} in the template (handles both Langfuse {{}} and fallback {})
+ # before compile_prompt sees the remaining standard variables.
+ template, langfuse_prompt = get_prompt_or_fallback(
+ "task_brief_followup_system",
+ _TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT,
+ )
+ system_prompt = compile_prompt(
+ template, langfuse_prompt,
+ date_context=_datetime_context_injection(prepared_context).strip(),
+ language_instruction=_language_instruction(prepared_context).strip(),
+ user_identity=_user_identity_injection(prepared_context).strip(),
+ relational_memory=_relational_memory_injection(prepared_context).strip(),
+ proactive_hints=_proactive_hints_injection(prepared_context).strip(),
+ request_context=_request_context_block(prepared_context),
+ briefing_context=briefing_context_text,
+ )
+ else:
+ system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
sanitizer = _FloatingStreamSanitizer()
emitted_sanitized = False
raw_chunks: list[str] = []
@@ -1283,6 +1417,49 @@ async def run_floating_stream(
yield "token", _fallback_from_raw_floating_text("".join(raw_chunks))
+async def run_task_brief_research_stream(
+ user_id: str,
+ task_id: str,
+ context: dict[str, Any],
+) -> AsyncGenerator[tuple[str, Any], None]:
+ """Stage-1 executive assistant: deep research for one task.
+
+ Yields ``("token", chunk)`` events like other stream runners.
+ The final concatenated text may contain a ```` block
+ which the WS handler strips and emits as a ``canvas_draft`` mutation.
+ """
+ prepared_context = await _prepare_context(f"task:{task_id}", context)
+ tools = _brief_research_tools(user_id, _trace_id_from_context(prepared_context))
+
+ # Inject task_id so the agent knows what to look up first.
+ research_message = (
+ f"Prepare a briefing dossier for task ID: {task_id}\n"
+ "Follow the research workflow: read the task, then project, then client, "
+ "then cross-project relations, then relevant memory. "
+ "End with a concrete suggested first step. "
+ "If this is a writing task, include a