"""Deep Agent — ``create_deep_agent`` supervisors for home and floating modes. Two supervisor graphs (via ``deepagents.create_deep_agent``): * **HomeSupervisor** — gathers data from multiple domains, presents structured overview with entity/chart tags. * **FloatingSupervisor** — focused, scoped assistant for a single entity/domain. Each supervisor delegates to four sub-agents (task, project, note, timeline) via the built-in ``task`` tool provided by ``SubAgentMiddleware``. The sub-agents talk to Electron via ``execute_on_client``. Built-in middleware provides: todo-list tracking, virtual filesystem, automatic context summarisation, prompt-caching, and tool-call patching. Streaming uses ``astream(stream_mode=["messages", "updates"])`` so that callers can sniff: * ``("messages", (token, metadata))`` — text tokens for streaming * ``("updates", ...)`` — tool call results for mutations An ``update_core_memory`` tool is available to both supervisors for persisting user preferences mid-conversation (MemGPT-style). """ from __future__ import annotations import json import logging from typing import Any, AsyncGenerator from deepagents import create_deep_agent from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage from langchain_core.tools import tool from app.core.llm import get_llm from app.core.ws_context import ( clear_tool_result_collector, set_tool_result_collector, ) logger = logging.getLogger(__name__) # ── Sub-agent tool imports ──────────────────────────────────────────── from app.agents.task_agent import ( # noqa: E402 add_task_comment, create_task, delete_task, delete_task_comment, list_task_comments, list_tasks, list_tasks_due_today, update_task, ) from app.agents.note_agent import ( # noqa: E402 create_note, delete_note, get_note, list_notes, update_note, ) from app.agents.project_agent import ( # noqa: E402 create_project, delete_project, get_project, list_all_projects, list_projects, update_project, ) from app.agents.timeline_agent import ( # noqa: E402 create_timeline, delete_timeline, list_timelines, update_timeline, ) # ── Sub-agent definitions ───────────────────────────────────────────── _TASK_TOOLS = [ list_tasks, create_task, update_task, delete_task, list_tasks_due_today, list_task_comments, add_task_comment, delete_task_comment, ] _NOTE_TOOLS = [list_notes, get_note, create_note, update_note, delete_note] _PROJECT_TOOLS = [ list_projects, list_all_projects, get_project, create_project, update_project, delete_project, ] _TIMELINE_TOOLS = [list_timelines, create_timeline, update_timeline, delete_timeline] def _make_subagent_specs() -> list[dict[str, Any]]: """Return SubAgent dicts for the four workspace domains. Each dict follows the ``deepagents`` ``SubAgent`` TypedDict: name, description, system_prompt, tools, model The model and middleware are filled in by ``create_deep_agent`` automatically. """ llm = get_llm() return [ { "name": "task_agent", "description": ( "Manages tasks and comments: list, create, update, delete, " "due-today, comments. Delegate task-related queries here." ), "system_prompt": ( "You are a task management assistant. You create, update, list, " "and track tasks and their comments.\n\n" "Rules:\n" " - status must be one of: todo, in_progress, done\n" " - priority must be one of: high, medium, low\n" " - due_date is a Unix timestamp in milliseconds\n" " - assignees is a JSON-encoded array of strings\n" " - is_approved defaults to 0; set to 1 only when the user confirms\n" " - For update_task, use -1 for integer fields you do not want to change\n" " - Always confirm the action in plain, user-friendly language." ), "tools": _TASK_TOOLS, "model": llm, }, { "name": "note_agent", "description": ( "Manages notes: list, get, create, update, delete. " "Delegate note-related queries here." ), "system_prompt": ( "You are a note-taking assistant. You help users create, retrieve, " "update, and delete Markdown notes in their workspace.\n\n" "Rules:\n" " - content is always Markdown; preserve formatting when updating\n" " - When updating, call get_note first if you need to read existing " "content before appending or replacing sections\n" " - Do not fabricate note content." ), "tools": _NOTE_TOOLS, "model": llm, }, { "name": "project_agent", "description": ( "Manages projects: list, get, create, update, archive, delete. " "Delegate project-related queries here." ), "system_prompt": ( "You are a project management assistant. You help users create, " "find, update, and archive projects.\n\n" "Rules:\n" " - status must be one of: active, archived\n" " - Prefer archiving over deletion\n" " - ai_summary is populated only when the user asks for a summary." ), "tools": _PROJECT_TOOLS, "model": llm, }, { "name": "timeline_agent", "description": ( "Manages project timelines (milestones): list, create, update, " "delete. Delegate timeline/milestone queries here." ), "system_prompt": ( "You are a project timeline assistant. Timelines are milestone " "dates that track progress on a project.\n\n" "Rules:\n" " - project_id is REQUIRED for every create\n" " - date is a Unix timestamp in milliseconds\n" " - For update_timeline, use -1 for integer fields you do not " "want to change." ), "tools": _TIMELINE_TOOLS, "model": llm, }, ] # ── Update core memory tool ────────────────────────────────────────── def _make_update_core_memory_tool(user_id: str, db_session_factory): """Create a tool that persists a key/value preference in core memory.""" @tool async def update_core_memory(key: str, value: str) -> str: """Save a user preference or fact to long-term core memory. key: short label for the memory (e.g. 'preferred_language', 'timezone') value: the value to remember Use this when the user states a preference or fact worth remembering. """ from app.core.memory_middleware import MemoryMiddleware async with db_session_factory() as db: memory = MemoryMiddleware(db) await memory.update_core(user_id, key, value) return f"Remembered: {key} = {value}" return update_core_memory # ── System prompts ──────────────────────────────────────────────────── _HOME_SYSTEM = ( "You are Adiuva, a smart workspace assistant on the Home dashboard.\n" "Your job is to help the user by gathering data from their workspace and " "presenting a comprehensive overview.\n\n" "You have sub-agents (task_agent, note_agent, project_agent, " "timeline_agent) accessible via the `task` tool. Delegate to " "the appropriate sub-agent(s) based on the user's request. You can call " "multiple sub-agents in parallel if needed.\n\n" "You also have an update_core_memory tool — use it when the user states " "a preference or important fact worth remembering long-term.\n\n" "## Entity References\n" "When your response mentions specific workspace entities, embed them " "inline using entity tags so the UI can render interactive components.\n" "Format: [comma-separated UUIDs]\n" "Supported types: task, project, note, timeline\n\n" "Example response:\n" " Here is your project:\n" " [abc-123-def]\n" " It has these pending tasks:\n" " [def-456,ghi-789]\n\n" "IMPORTANT: Only include IDs of entities that are directly relevant to " "the user's question. Do NOT dump all entity IDs returned by a tool — " "filter to only the ones the user asked about or that matter for the answer.\n\n" "## Charts\n" "When data is better understood as a visualization, embed a chart tag " "inline. The frontend renders it using shadcn/ui Recharts components.\n" "Format: {{JSON}}\n\n" "JSON shape:\n" ' {{"chartType":"","title":"...","data":[...],"config":{{...}}}}\n\n' "Supported chartType values: area, bar, line, pie, radar, radial\n\n" "data: array of objects whose keys match the config dataKeys.\n" "config: {{ dataKey: {{ label, color }} }} — follows shadcn ChartConfig.\n\n" "Example:\n" " Here is your task breakdown:\n" ' {{"chartType":"bar","title":"Tasks by Status",' '"data":[{{"status":"done","count":12}},{{"status":"pending","count":5}}],' '"config":{{"count":{{"label":"Tasks","color":"#2563eb"}}}}}}\n\n' "Only include a chart when the user asks for a summary, overview, or " "analytics — not for simple lookups.\n\n" "Memory context:\n{memory_context}" ) _FLOATING_SYSTEM = ( "You are Adiuva, a focused workspace assistant in the floating panel.\n" "The user is currently working in the '{scope_type}' section" "{scope_detail}.\n\n" "You have sub-agents (task_agent, note_agent, project_agent, " "timeline_agent) accessible via the `task` tool. Focus your " "help on the user's current scope, but you can use other sub-agents " "if the request requires it.\n\n" "You also have an update_core_memory tool — use it when the user states " "a preference or important fact worth remembering long-term.\n\n" "Provide direct, conversational responses.\n\n" "Memory context:\n{memory_context}" ) def _format_memory_context(memory: dict[str, Any]) -> str: """Format the memory dict into a readable string for the system prompt.""" if not memory: return "(no memory available)" parts = [] if memory.get("core_memory"): parts.append("Preferences: " + json.dumps(memory["core_memory"])) if memory.get("associative_memory"): parts.append("Related memories: " + "; ".join(memory["associative_memory"][:3])) if memory.get("episodic_memory"): parts.append("Recent sessions: " + "; ".join(memory["episodic_memory"][:3])) if memory.get("proactive_hints"): parts.append("Patterns: " + "; ".join(memory["proactive_hints"][:3])) return "\n".join(parts) if parts else "(no memory available)" # ── Graph builders ──────────────────────────────────────────────────── def build_home_graph( user_id: str, memory_context: dict[str, Any], db_session_factory, ): """Build the Home supervisor graph.""" subagent_specs = _make_subagent_specs() memory_tool = _make_update_core_memory_tool(user_id, db_session_factory) prompt = _HOME_SYSTEM.format( memory_context=_format_memory_context(memory_context), ) return create_deep_agent( model=get_llm(), tools=[memory_tool], system_prompt=prompt, subagents=subagent_specs, name="home_supervisor", ) def build_floating_graph( user_id: str, memory_context: dict[str, Any], scope: dict[str, Any], db_session_factory, ): """Build the Floating supervisor graph.""" subagent_specs = _make_subagent_specs() memory_tool = _make_update_core_memory_tool(user_id, db_session_factory) scope_type = scope.get("type", "general") scope_id = scope.get("id") scope_detail = f" (id: {scope_id})" if scope_id else "" prompt = _FLOATING_SYSTEM.format( scope_type=scope_type, scope_detail=scope_detail, memory_context=_format_memory_context(memory_context), ) return create_deep_agent( model=get_llm(), tools=[memory_tool], system_prompt=prompt, subagents=subagent_specs, name="floating_supervisor", ) # ── Stream event type ──────────────────────────────────────────────── # Events yielded by run_*_stream: # ("token", str) — text token for streaming # ("tool_start", dict) — {"name": "task_agent", "args": {...}} # ("tool_end", dict) — {"name": "task_agent", "result": "..."} # ── Stream runners ──────────────────────────────────────────────────── async def _run_graph_stream( graph, message: str, ) -> AsyncGenerator[tuple[str, Any], None]: """Run a supervisor graph with streaming, yielding event tuples. Uses ``stream_mode=["messages", "updates"]`` to get both token-level streaming and update events for tool calls. """ inputs = {"messages": [HumanMessage(content=message)]} collector: list[dict] = [] set_tool_result_collector(collector) try: async for stream_mode, chunk in graph.astream( inputs, stream_mode=["messages", "updates"], ): if stream_mode == "messages": msg, metadata = chunk # Only yield tokens from the supervisor's final response # (not from sub-agent internal LLM calls). # Accept both AIMessageChunk (streamed tokens) and AIMessage # (full response from non-streaming providers). # create_deep_agent names the LLM node "model". if ( isinstance(msg, (AIMessage, AIMessageChunk)) and msg.content and not msg.tool_calls and isinstance(metadata, dict) and metadata.get("langgraph_node") == "model" ): yield ("token", str(msg.content)) elif stream_mode == "updates": # Updates is a dict of {node_name: state_update} if not isinstance(chunk, dict): continue for node_name, state_update in chunk.items(): if node_name != "tools": continue # Tool node executed — extract tool call results tool_messages = state_update.get("messages", []) for tool_msg in tool_messages: if hasattr(tool_msg, "name") and hasattr(tool_msg, "content"): yield ( "tool_end", {"name": tool_msg.name, "result": str(tool_msg.content)}, ) finally: clear_tool_result_collector() # Yield the collected mutations so callers can attach them to stream_end yield ("mutations", collector) async def run_home_stream( user_id: str, message: str, context: dict[str, Any], db_session_factory, ) -> AsyncGenerator[tuple[str, Any], None]: """Run the Home supervisor and yield streaming events.""" graph = build_home_graph(user_id, context, db_session_factory) async for event in _run_graph_stream(graph, message): yield event async def run_floating_stream( user_id: str, message: str, context: dict[str, Any], scope: dict[str, Any], db_session_factory, ) -> AsyncGenerator[tuple[str, Any], None]: """Run the Floating supervisor and yield streaming events.""" graph = build_floating_graph(user_id, context, scope, db_session_factory) async for event in _run_graph_stream(graph, message): yield event async def run_home( user_id: str, message: str, context: dict[str, Any], db_session_factory, ) -> str: """Run the Home supervisor (non-streaming) and return full response text.""" graph = build_home_graph(user_id, context, db_session_factory) result = await graph.ainvoke( {"messages": [HumanMessage(content=message)]} ) messages = result["messages"] for msg in reversed(messages): if hasattr(msg, "content") and msg.content and not getattr(msg, "tool_calls", None): return str(msg.content) return ""