From 47bf1881e5b2dab36a61488d69d7605c2c93abf6 Mon Sep 17 00:00:00 2001 From: roberto Date: Thu, 12 Mar 2026 18:03:27 +0100 Subject: [PATCH] deep agent --- app/api/routes/device_ws.py | 16 ++++++++- app/core/deep_agent.py | 69 ++++++++++++++++++++++++++++++------- 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index fa3611c..7885abc 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -200,6 +200,9 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: # ── v3 Chat Handlers ────────────────────────────────────────────────── +_WS_TOOL_CALL_TIMEOUT = 30 # seconds to wait for Electron tool_result + + async def _make_ws_executor(websocket: WebSocket, user_id: str): """Return a callback that sends tool_call frames and awaits tool_result.""" async def _executor(payload: dict) -> dict: @@ -208,7 +211,18 @@ async def _make_ws_executor(websocket: WebSocket, user_id: str): logger.info("ws_executor: sending tool_call id=%s action=%s", call_id, payload.get("action")) await websocket.send_text(json.dumps(payload)) future = device_manager.create_pending_call(user_id, call_id) - result = await future + try: + result = await asyncio.wait_for(future, timeout=_WS_TOOL_CALL_TIMEOUT) + except asyncio.TimeoutError: + logger.error( + "ws_executor: timeout waiting for tool_result id=%s action=%s user=%s", + call_id, payload.get("action"), user_id, + ) + # Clean up the pending future so it doesn't leak + conn = device_manager._connections.get(user_id) + if conn: + conn.pending_calls.pop(call_id, None) + return {"error": f"Tool call timed out after {_WS_TOOL_CALL_TIMEOUT}s", "rows": []} logger.info("ws_executor: tool_result id=%s result_type=%s result_keys=%s", call_id, type(result).__name__, list(result.keys()) if isinstance(result, dict) else "N/A") diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index c7c4086..6250dff 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -114,7 +114,8 @@ def _make_subagent_specs() -> list[dict[str, Any]]: "name": "task_agent", "description": ( "Manages tasks and comments: list, create, update, delete, " - "due-today, comments. Delegate task-related queries here." + "due-today, and comments. Use when the user asks about tasks, " + "to-dos, assignments, deadlines, or anything task-related." ), "system_prompt": ( "You are a task management assistant. You create, update, list, " @@ -128,14 +129,13 @@ def _make_subagent_specs() -> list[dict[str, Any]]: " - For update_task, use -1 for integer fields you do not want to change\n" " - Always confirm the action in plain, user-friendly language." ), - "tools": _TASK_TOOLS, - "model": llm, + "tools": _TASK_TOOLS }, { "name": "note_agent", "description": ( "Manages notes: list, get, create, update, delete. " - "Delegate note-related queries here." + "Use when the user asks about notes, documents, or written content." ), "system_prompt": ( "You are a note-taking assistant. You help users create, retrieve, " @@ -146,14 +146,13 @@ def _make_subagent_specs() -> list[dict[str, Any]]: "content before appending or replacing sections\n" " - Do not fabricate note content." ), - "tools": _NOTE_TOOLS, - "model": llm, + "tools": _NOTE_TOOLS }, { "name": "project_agent", "description": ( "Manages projects: list, get, create, update, archive, delete. " - "Delegate project-related queries here." + "Use when the user asks about projects, workspaces, or project status." ), "system_prompt": ( "You are a project management assistant. You help users create, " @@ -163,14 +162,14 @@ def _make_subagent_specs() -> list[dict[str, Any]]: " - Prefer archiving over deletion\n" " - ai_summary is populated only when the user asks for a summary." ), - "tools": _PROJECT_TOOLS, - "model": llm, + "tools": _PROJECT_TOOLS }, { "name": "timeline_agent", "description": ( - "Manages project timelines (milestones): list, create, update, " - "delete. Delegate timeline/milestone queries here." + "Manages project timelines and milestones: list, create, update, " + "delete. Use when the user asks about timelines, milestones, " + "deadlines, or project scheduling." ), "system_prompt": ( "You are a project timeline assistant. Timelines are milestone " @@ -181,8 +180,7 @@ def _make_subagent_specs() -> list[dict[str, Any]]: " - For update_timeline, use -1 for integer fields you do not " "want to change." ), - "tools": _TIMELINE_TOOLS, - "model": llm, + "tools": _TIMELINE_TOOLS }, ] @@ -221,6 +219,11 @@ _HOME_SYSTEM = ( "multiple sub-agents in parallel if needed.\n\n" "You also have an update_core_memory tool — use it when the user states " "a preference or important fact worth remembering long-term.\n\n" + "IMPORTANT: You do NOT have direct access to workspace data. Always " + "delegate to your subagents using the task() tool. Do not attempt to " + "answer workspace queries yourself — the subagents have the tools to " + "fetch and modify data. You can call multiple subagents in parallel " + "when the request spans multiple domains.\n\n" "## Entity References\n" "When your response mentions specific workspace entities, embed them " "inline using entity tags so the UI can render interactive components.\n" @@ -263,6 +266,10 @@ _FLOATING_SYSTEM = ( "if the request requires it.\n\n" "You also have an update_core_memory tool — use it when the user states " "a preference or important fact worth remembering long-term.\n\n" + "IMPORTANT: You do NOT have direct access to workspace data. Always " + "delegate to your subagents using the task() tool. Do not attempt to " + "answer workspace queries yourself — the subagents have the tools to " + "fetch and modify data.\n\n" "Provide direct, conversational responses.\n\n" "Memory context:\n{memory_context}" ) @@ -367,6 +374,42 @@ async def _run_graph_stream( ): if stream_mode == "messages": msg, metadata = chunk + agent_name = ( + metadata.get("lc_agent_name", "?") + if isinstance(metadata, dict) else "?" + ) + node = ( + metadata.get("langgraph_node", "?") + if isinstance(metadata, dict) else "?" + ) + + # Log every message event with agent attribution + if isinstance(msg, (AIMessage, AIMessageChunk)) and msg.content: + logger.info( + "[%s] %s node=%s content=%s", + agent_name, + type(msg).__name__, + node, + str(msg.content), + ) + elif isinstance(msg, (AIMessage, AIMessageChunk)) and msg.tool_calls: + tool_names = [tc["name"] for tc in msg.tool_calls] + logger.info( + "[%s] %s node=%s tool_calls=%s", + agent_name, + type(msg).__name__, + node, + tool_names, + ) + elif hasattr(msg, "name") and hasattr(msg, "content") and msg.content: + # ToolMessage — log tool result + logger.info( + "[%s] ToolMessage tool=%s node=%s result=%s", + agent_name, + getattr(msg, "name", "?"), + node, + str(msg.content), + ) # Only yield tokens from the supervisor's final response # (not from sub-agent internal LLM calls). # Accept both AIMessageChunk (streamed tokens) and AIMessage