From 826f64d6bb0b97eb67cd8905587f1d9d4c093582 Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Tue, 17 Mar 2026 08:50:46 +0100 Subject: [PATCH] refactor local directory agent to two-phase LLM-with-tools architecture Replace the single-pass FE-driven agent_run/agent_data flow with a BE-orchestrated two-phase execution using LangChain tool-calling: - Phase 1 (Triage): explores directory via new filesystem tools, matches files to existing projects using PROJECT_TOOLS - Phase 2 (Processing): reads files and performs CRUD per project group with clean LLM context windows Key changes: - Add filesystem_agent.py with list_directory, read_file_content, get_file_metadata tools using execute_on_client() - Move setup journey from REST to WebSocket (journey_start/message frames) - Add batch_runs_per_day billing limit and enforce in /trigger - Remove deprecated agent_data/agent_complete frame handlers and queues Co-Authored-By: Claude Opus 4.6 --- app/agents/__init__.py | 4 +- app/agents/filesystem_agent.py | 85 +++++ app/api/routes/agent_setup.py | 397 ++++++++++++--------- app/api/routes/agents.py | 34 +- app/api/routes/device_ws.py | 112 ++++-- app/billing/tier_manager.py | 4 + app/core/agent_runner.py | 635 +++++++++++++++++++-------------- app/core/device_manager.py | 48 +-- app/main.py | 3 +- app/schemas.py | 48 +-- 10 files changed, 801 insertions(+), 569 deletions(-) create mode 100644 app/agents/filesystem_agent.py diff --git a/app/agents/__init__.py b/app/agents/__init__.py index 8b2e848..a2dc4c6 100644 --- a/app/agents/__init__.py +++ b/app/agents/__init__.py @@ -1,5 +1,5 @@ """Expose tool modules used by deep orchestrator-worker graphs.""" -from app.agents import timeline_agent, note_agent, project_agent, task_agent +from app.agents import filesystem_agent, timeline_agent, note_agent, project_agent, task_agent -__all__ = ["timeline_agent", "note_agent", "project_agent", "task_agent"] +__all__ = ["filesystem_agent", "timeline_agent", "note_agent", "project_agent", "task_agent"] diff --git a/app/agents/filesystem_agent.py b/app/agents/filesystem_agent.py new file mode 100644 index 0000000..8e6018c --- /dev/null +++ b/app/agents/filesystem_agent.py @@ -0,0 +1,85 @@ +"""Filesystem agent — tools for reading local directories and files on Electron. + +These tools delegate to the Electron client via ``execute_on_client()`` using +the same WS tool-call round-trip pattern as CRUD tools. The Electron app +handles actual disk I/O and responds with ``tool_result`` frames. +""" + +from __future__ import annotations + +from typing import Any + +from langchain_core.tools import tool + +from app.core.ws_context import execute_on_client + + +@tool +async def list_directory(path: str) -> str: + """List files and folders in a local directory on the user's device. + + Returns a formatted listing of entries with name, type (file/directory), + and full path. + """ + result = await execute_on_client( + action="list_directory", + data={"path": path}, + ) + entries: list[dict[str, Any]] = result.get("entries", []) + if not entries: + return f"Directory '{path}' is empty or does not exist." + lines: list[str] = [] + for entry in entries: + entry_type = entry.get("type", "unknown") + entry_name = entry.get("name", "") + entry_path = entry.get("path", "") + lines.append(f"- [{entry_type}] {entry_name} ({entry_path})") + return f"Directory listing for '{path}' ({len(entries)} entries):\n" + "\n".join(lines) + + +@tool +async def read_file_content(path: str) -> str: + """Read the text content of a local file on the user's device. + + Returns the file content as a string. Large files may be truncated + by the Electron client. + """ + result = await execute_on_client( + action="read_file_content", + data={"path": path}, + ) + content: str = result.get("content", "") + if not content: + return f"File '{path}' is empty or could not be read." + return content + + +@tool +async def get_file_metadata(path: str) -> str: + """Get metadata for a local file: size, creation date, modification date, extension. + + Returns a formatted summary of the file's metadata. + """ + result = await execute_on_client( + action="get_file_metadata", + data={"path": path}, + ) + size = result.get("size", "unknown") + created = result.get("createdAt", "unknown") + modified = result.get("modifiedAt", "unknown") + extension = result.get("extension", "unknown") + name = result.get("name", path) + return ( + f"File: {name}\n" + f" Extension: {extension}\n" + f" Size: {size} bytes\n" + f" Created: {created}\n" + f" Modified: {modified}" + ) + + +FILESYSTEM_TOOLS: list[Any] = [ + list_directory, + read_file_content, + get_file_metadata, +] diff --git a/app/api/routes/agent_setup.py b/app/api/routes/agent_setup.py index ce71b72..9479732 100644 --- a/app/api/routes/agent_setup.py +++ b/app/api/routes/agent_setup.py @@ -1,54 +1,40 @@ -"""Chatbot Journey endpoints — guided conversation to build an agent prompt_template. +"""Chatbot Journey — WS-based guided conversation to build an agent prompt_template. -Endpoints: - POST /agents/journey/start — start a new journey session - POST /agents/journey/message — continue the conversation - -Sessions are stored in-memory with a 30-minute TTL. Stale entries are -cleaned up lazily on access. Upgrade to Redis for multi-instance deployments. +The journey is driven entirely through WebSocket frames (no REST endpoints). +The device WS handler dispatches ``journey_start`` and ``journey_message`` +frames to the functions exported here. Journey flow: - 1. Client sends ``{ agent_type, agent_id? }`` to ``/start``. - 2. Server creates a session, calls the LLM with a contextual system prompt, - and returns the first question. - 3. Client sends follow-up messages to ``/message``. - 4. After 3-5 turns the LLM wraps up by emitting a ``prompt_template`` block - delimited by ``PROMPT_TEMPLATE_START`` / ``PROMPT_TEMPLATE_END``. - 5. Server parses the block, sets ``done=True``, and returns the template. - -The ``prompt_template`` from the final response is meant to be stored by -the Electron client in local agent settings and later sent to -``POST /agents/trigger`` when a run is executed. + 1. FE sends ``journey_start`` frame with basic agent config (directory, + data_types, schedule). + 2. Server creates an in-memory session, sets up a WS executor so the + setup LLM can use file-system tools, does a first directory scrape, + and sends back a ``journey_reply`` with the first question. + 3. FE sends ``journey_message`` frames for each user reply. + 4. Server appends the user message, calls the LLM (which may read files + via tools), and sends back a ``journey_reply``. + 5. After 3-5 turns the LLM wraps up by emitting a ``prompt_template`` + block delimited by ``PROMPT_TEMPLATE_START`` / ``PROMPT_TEMPLATE_END``. + 6. Server parses the block, sends ``journey_reply`` with ``done=True`` + and the template. FE stores it locally. """ from __future__ import annotations +import json import logging import time import uuid from dataclasses import dataclass, field from typing import Any -from fastapi import APIRouter, Depends, HTTPException, status -from langchain_core.messages import AIMessage, HumanMessage, SystemMessage -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage -from app.api.deps import get_current_user +from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.core.llm import get_llm -from app.db import get_session -from app.models import CloudAgentConfig, LocalAgentConfig -from app.schemas import ( - JourneyMessageRequest, - JourneyResponse, - JourneyStartRequest, - UserProfile, -) logger = logging.getLogger(__name__) -router = APIRouter(prefix="/agents/journey", tags=["agents"]) - # ── Session TTL ─────────────────────────────────────────────────────────── _SESSION_TTL_SECONDS: int = 1800 # 30 minutes @@ -59,16 +45,21 @@ _TEMPLATE_END = "PROMPT_TEMPLATE_END" # Maximum number of conversation turns before the LLM is nudged to wrap up. _MAX_TURNS: int = 5 +# Max tool-calling steps per LLM invocation. +_MAX_TOOL_STEPS: int = 6 # ── In-memory session store ─────────────────────────────────────────────── @dataclass -class _JourneySession: +class JourneySession: session_id: str user_id: str agent_type: str # "local" | "cloud" + directory: str + data_types: list[str] history: list[dict[str, Any]] = field(default_factory=list) + system_prompt: str = "" created_at: float = field(default_factory=time.monotonic) def is_expired(self) -> bool: @@ -76,67 +67,70 @@ class _JourneySession: # session_id → session -_sessions: dict[str, _JourneySession] = {} +_sessions: dict[str, JourneySession] = {} -def _get_session(session_id: str, user_id: str) -> _JourneySession: - """Retrieve session; raise 404 on missing, expired, or wrong owner.""" +def get_journey_session(session_id: str, user_id: str) -> JourneySession | None: + """Retrieve session; return None on missing, expired, or wrong owner.""" s = _sessions.get(session_id) if s is None or s.is_expired(): _sessions.pop(session_id, None) - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Journey session not found or expired") + return None if s.user_id != user_id: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Journey session not found or expired") + return None return s # ── System prompt builder ───────────────────────────────────────────────── -_LOCAL_PREAMBLE = """\ -What kind of files are in the directories you want to monitor? \ -(for example: emails saved as .eml, documents in .pdf or .txt, markdown notes, etc.)""" - -_CLOUD_PREAMBLE = """\ -What kind of emails or messages should I look for? \ -(for example: client communications, invoices, meeting notes, project updates, etc.)""" - _SYSTEM_PROMPT_TEMPLATE = """\ You are a friendly assistant helping a freelancer configure a data-extraction agent. -Your job is to understand exactly what data the user wants to extract from their {source_description} \ -and produce a detailed prompt_template that a separate AI will use as its instruction set. +Your job is to understand exactly what data the user wants to extract from their +local directory and produce a detailed prompt_template that a separate AI will use +as its instruction set. -Ask concise, focused questions one at a time. Cover these topics (not necessarily in this order): - 1. The type and format of the source content. +You have access to file-system tools to explore the user's directory: +- list_directory: to see folder structure +- read_file_content: to peek at file contents +- get_file_metadata: to check file info + +The user's configured directory is: {directory} +Target data types: {data_types} + +Start by exploring the directory to understand its structure. Then ask concise, +focused questions one at a time. Cover these topics (not necessarily in this order): + 1. The type and format of the source content (confirmed by your exploration). 2. Which data types to extract: tasks, notes, timelines, and/or projects. - 3. How fields should be mapped (e.g. email subject → task title). + 3. How fields should be mapped (e.g. filename → task title). 4. Priority or status rules (e.g. "urgent" keyword → high priority). 5. Any special handling, date extraction, or exclusions. -After 3-5 questions (when you have enough information), output the final prompt_template between \ -these exact markers on their own lines: +After 3-5 questions (when you have enough information), output the final prompt_template +between these exact markers on their own lines: {template_start} {template_end} -The prompt_template must be a self-contained instruction for an AI that receives a document/email/message \ -and must return a JSON array of records in this shape: - [{{ "table": "", "data": {{ }} }}, ...] +The prompt_template must be a self-contained instruction for an AI that reads files +and must perform CRUD operations using tools to create records. It should specify: + - What entity types to create (tasks, notes, timelines, projects). + - How to map file content to record fields (camelCase: title, status, priority, + dueDate, projectId, content, etc.). + - That isAiSuggested must be set to 1 and isApproved to 0 on every record. + - Concrete examples of mappings based on what you discovered in the directory. -Rules for the generated template: - - Be explicit about field names (camelCase: title, status, priority, dueDate, projectId, content, etc.). - - Include concrete examples of mappings. - - Mention that Electron adds id/createdAt/updatedAt automatically. - - Set isAiSuggested: true and isApproved: false on every record. {existing_section}\ -Do not ask more than {max_turns} questions total. Start with your first question now.\ +Do not ask more than {max_turns} questions total. Begin by exploring the directory, +then ask your first question.\ """ -def _build_system_prompt(agent_type: str, existing_template: str | None) -> str: - source_description = ( - "files in local directories" if agent_type == "local" else "emails and messages from cloud providers" - ) +def _build_system_prompt( + directory: str, + data_types: list[str], + existing_template: str | None = None, +) -> str: existing_section = ( f"\nThe user already has the following prompt_template — refine it based on their answers:\n" f"---\n{existing_template}\n---\n" @@ -144,7 +138,8 @@ def _build_system_prompt(agent_type: str, existing_template: str | None) -> str: else "" ) return _SYSTEM_PROMPT_TEMPLATE.format( - source_description=source_description, + directory=directory, + data_types=", ".join(data_types), template_start=_TEMPLATE_START, template_end=_TEMPLATE_END, existing_section=existing_section, @@ -152,10 +147,6 @@ def _build_system_prompt(agent_type: str, existing_template: str | None) -> str: ) -def _first_question(agent_type: str) -> str: - return _LOCAL_PREAMBLE if agent_type == "local" else _CLOUD_PREAMBLE - - # ── Template extraction ─────────────────────────────────────────────────── @@ -168,11 +159,37 @@ def _extract_template(text: str) -> str | None: return text[start_idx:end_idx].strip() or None -# ── LLM call ───────────────────────────────────────────────────────────── +# ── LLM call with tool support ─────────────────────────────────────────── -async def _call_llm(system_prompt: str, history: list[dict[str, Any]]) -> str: - """Build LangChain messages from history and invoke the LLM.""" +def _as_text(content: Any) -> str: + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + text = item.get("text") + if isinstance(text, str): + parts.append(text) + return "".join(parts) + return str(content) + + +async def _call_llm_with_tools( + system_prompt: str, + history: list[dict[str, Any]], + tools: list[Any], +) -> str: + """Build LangChain messages from history and invoke the LLM with tools. + + Handles tool-calling loops: if the LLM calls tools, execute them and + continue until a final text response is produced. + """ messages: list[Any] = [SystemMessage(content=system_prompt)] for turn in history: if turn["role"] == "user": @@ -181,126 +198,161 @@ async def _call_llm(system_prompt: str, history: list[dict[str, Any]]) -> str: messages.append(AIMessage(content=turn["content"])) llm = get_llm(model=None, temperature=0.4) - response = await llm.ainvoke(messages) - return response.content # type: ignore[return-value] + llm_with_tools = llm.bind_tools(tools) + tool_map = {tool_def.name: tool_def for tool_def in tools} + + for _ in range(_MAX_TOOL_STEPS): + response: AIMessage = await llm_with_tools.ainvoke(messages) + messages.append(response) + + if not response.tool_calls: + return _as_text(response.content) + + for call in response.tool_calls: + call_name = str(call.get("name", "")) + call_args = call.get("args", {}) + logger.info( + "agent_setup: journey tool_call name=%s args=%s", + call_name, + json.dumps(call_args, ensure_ascii=True)[:500], + ) + + tool_fn = tool_map.get(call_name) + if tool_fn is None: + tool_output = f"Unknown tool: {call_name}" + else: + tool_output = await tool_fn.ainvoke(call_args) + + logger.info( + "agent_setup: journey tool_result name=%s output=%s", + call_name, + str(tool_output)[:800], + ) + messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + + # Fallback: exceeded max steps. + final = await llm.ainvoke(messages) + return _as_text(final.content) -# ── Existing-config loader ──────────────────────────────────────────────── +# ── Journey handlers (called from device_ws.py) ────────────────────────── -async def _load_existing_template( - agent_id: str, +async def handle_journey_start( user_id: str, - db: AsyncSession, -) -> str | None: - """Return the prompt_template of an existing agent config, or None.""" - # Try local first, then cloud. - local_result = await db.execute( - select(LocalAgentConfig).where( - LocalAgentConfig.id == agent_id, - LocalAgentConfig.user_id == user_id, - ) - ) - local = local_result.scalar_one_or_none() - if local is not None: - return local.prompt_template + frame: dict[str, Any], +) -> dict[str, Any]: + """Handle a ``journey_start`` WS frame. - cloud_result = await db.execute( - select(CloudAgentConfig).where( - CloudAgentConfig.id == agent_id, - CloudAgentConfig.user_id == user_id, - ) - ) - cloud = cloud_result.scalar_one_or_none() - return cloud.prompt_template if cloud is not None else None - - -# ── Routes ──────────────────────────────────────────────────────────────── - - -@router.post("/start", response_model=JourneyResponse, status_code=status.HTTP_200_OK) -async def start_journey( - body: JourneyStartRequest, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> JourneyResponse: - """Start a new Chatbot Journey session. - - If ``agent_id`` is provided the session is pre-seeded with the existing - agent's ``prompt_template`` so the user can refine it. + Creates a session, runs the setup LLM with directory exploration, + and returns the ``journey_reply`` payload. """ - # Load existing template (may be None). - existing_template: str | None = None - if body.agent_id: - existing_template = await _load_existing_template(body.agent_id, current_user.id, db) - # If agent_id was given but not found, proceed without seeding (don't 404 — - # the user may be starting a fresh journey for a not-yet-persisted config). - - system_prompt = _build_system_prompt(body.agent_type, existing_template) - first_question = _first_question(body.agent_type) + agent_type = frame.get("agent_type", "local") + directory = frame.get("directory", "") + data_types = frame.get("data_types", []) + existing_template = frame.get("existing_template") session_id = str(uuid.uuid4()) - session = _JourneySession( + system_prompt = _build_system_prompt(directory, data_types, existing_template) + + session = JourneySession( session_id=session_id, - user_id=current_user.id, - agent_type=body.agent_type, - # Seed history with the AI's first question so it stays consistent. - history=[{"role": "assistant", "content": first_question}], + user_id=user_id, + agent_type=agent_type, + directory=directory, + data_types=data_types, + system_prompt=system_prompt, ) - # Store the system prompt inside the session for reuse in /message. - session.__dict__["_system_prompt"] = system_prompt # type: ignore[index] + + # The LLM will explore the directory using FILESYSTEM_TOOLS via the + # ws_context executor (already set by the WS handler before calling us). + ai_reply = await _call_llm_with_tools( + system_prompt=system_prompt, + history=[], + tools=list(FILESYSTEM_TOOLS), + ) + + session.history.append({"role": "assistant", "content": ai_reply}) _sessions[session_id] = session - logger.info("Journey session %s started for user %s (agent_type=%s)", session_id, current_user.id, body.agent_type) - return JourneyResponse(session_id=session_id, message=first_question, done=False) + logger.info( + "agent_setup: journey session %s started for user %s (directory=%s)", + session_id, + user_id, + directory, + ) - -@router.post("/message", response_model=JourneyResponse, status_code=status.HTTP_200_OK) -async def send_journey_message( - body: JourneyMessageRequest, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> JourneyResponse: - """Send a message in an existing Chatbot Journey session. - - The server appends the user's message to the conversation history, - calls the LLM, and appends the AI reply. When the LLM wraps up with a - ``prompt_template`` block the response includes ``done=True`` and the - extracted template. - """ - session = _get_session(body.session_id, current_user.id) - system_prompt: str = session.__dict__.get("_system_prompt", _build_system_prompt(session.agent_type, None)) # type: ignore[assignment] - - # Append user turn to history. - session.history.append({"role": "user", "content": body.message}) - - # Call the LLM with the full conversation so far. - ai_reply = await _call_llm(system_prompt, session.history) - - # Append AI turn. - session.history.append({"role": "assistant", "content": ai_reply}) - - # Check if the LLM produced the final template. + # Check if the LLM produced the template on the first turn (unlikely but possible). prompt_template = _extract_template(ai_reply) done = prompt_template is not None - # Strip the sentinel markers from the message shown to the user. display_message = ai_reply if done: display_message = ( ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() or "Here is your agent configuration. You can save it or continue refining." ) + _sessions.pop(session_id, None) + return { + "type": "journey_reply", + "session_id": session_id, + "message": display_message, + "done": done, + "prompt_template": prompt_template, + } + + +async def handle_journey_message( + user_id: str, + frame: dict[str, Any], +) -> dict[str, Any]: + """Handle a ``journey_message`` WS frame. + + Appends the user message, calls the LLM, and returns the + ``journey_reply`` payload. + """ + session_id = frame.get("session_id", "") + message = frame.get("message", "") + + session = get_journey_session(session_id, user_id) + if session is None: + return { + "type": "journey_reply", + "session_id": session_id, + "message": "Journey session not found or expired. Please start a new setup.", + "done": True, + "prompt_template": None, + } + + # Append user turn. + session.history.append({"role": "user", "content": message}) + + # Call the LLM with tools. + ai_reply = await _call_llm_with_tools( + system_prompt=session.system_prompt, + history=session.history, + tools=list(FILESYSTEM_TOOLS), + ) + + session.history.append({"role": "assistant", "content": ai_reply}) + + # Check if the LLM produced the final template. + prompt_template = _extract_template(ai_reply) + done = prompt_template is not None + + display_message = ai_reply if done: - logger.info("Journey session %s completed for user %s", body.session_id, current_user.id) - # Clean up the session immediately on completion. - _sessions.pop(body.session_id, None) + display_message = ( + ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() + or "Here is your agent configuration. You can save it or continue refining." + ) + _sessions.pop(session_id, None) + logger.info("agent_setup: journey session %s completed for user %s", session_id, user_id) else: # Nudge the LLM to wrap up after max turns. turns = sum(1 for t in session.history if t["role"] == "user") if turns >= _MAX_TURNS: - # Add a system-level nudge as a hidden user message. session.history.append({ "role": "user", "content": ( @@ -309,9 +361,10 @@ async def send_journey_message( ), }) - return JourneyResponse( - session_id=body.session_id, - message=display_message, - done=done, - prompt_template=prompt_template, - ) + return { + "type": "journey_reply", + "session_id": session_id, + "message": display_message, + "done": done, + "prompt_template": prompt_template, + } diff --git a/app/api/routes/agents.py b/app/api/routes/agents.py index 5e8fa47..4b016ed 100644 --- a/app/api/routes/agents.py +++ b/app/api/routes/agents.py @@ -13,9 +13,10 @@ from __future__ import annotations import asyncio import uuid -from datetime import datetime +from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_user @@ -80,6 +81,34 @@ def _enforce_agent_limit(tier: str, current_count: int) -> int: return limit +async def _enforce_run_frequency( + tier: str, + user_id: str, + db: AsyncSession, +) -> None: + """Raise HTTP 402 if the user has exceeded their daily batch run limit.""" + limit: int = FEATURES.get(tier, FEATURES["free"])["batch_runs_per_day"] + if limit == -1: + return # unlimited + + today_start = datetime.now(timezone.utc).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + result = await db.execute( + select(func.count(AgentRunLog.id)).where( + AgentRunLog.user_id == user_id, + AgentRunLog.started_at >= today_start, + ) + ) + runs_today: int = result.scalar_one() + + if runs_today >= limit: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Daily batch run limit ({limit}) reached for your tier. Upgrade for more runs.", + ) + + # ── Catalog ─────────────────────────────────────────────────────────── @router.get("/catalog", response_model=list[AgentCatalogItem]) @@ -157,11 +186,12 @@ async def trigger_agent_run( ) -> AgentRunLogResponse: """Trigger a local agent run using client-provided configuration.""" _enforce_agent_limit(current_user.tier, body.active_agents) + await _enforce_run_frequency(current_user.tier, current_user.id, db) config = LocalAgentConfig( id=str(uuid.uuid4()), user_id=current_user.id, - device_id="", + device_id=body.device_id, name="Local Directory Monitor", directory_paths=[body.directory], data_types=_to_data_types(body.what_to_extract), diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index 86cc728..e868c2d 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -14,11 +14,11 @@ Protocol: 4. Session enters message dispatch loop + heartbeat. Incoming frame dispatch: - - ``tool_result`` → resolves a pending tool-call Future. - - ``agent_data`` → enqueued in the per-run agent data queue. - - ``agent_complete`` → sends None sentinel to close the queue stream. - - ``pong`` → heartbeat acknowledgement (updates last-seen). - - unknown types → logged, ignored. + - ``tool_result`` → resolves a pending tool-call Future. + - ``journey_start`` → starts a guided setup journey session. + - ``journey_message`` → continues a journey conversation. + - ``pong`` → heartbeat acknowledgement (updates last-seen). + - unknown types → logged, ignored. Outgoing heartbeat: ``{ "type": "ping" }`` every 30 s. @@ -39,6 +39,7 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect from jose import JWTError, jwt from sqlalchemy import update +from app.api.routes.agent_setup import handle_journey_message, handle_journey_start from app.config.settings import settings from app.core.agent_runner import trigger_pending_runs from app.core.deep_agent import run_floating_stream, run_home_stream @@ -147,37 +148,6 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: "device_ws: tool_result missing id from user=%s", user_id ) - elif frame_type == WsFrameType.agent_data: - run_id = frame.get("run_id") - if run_id: - try: - queue = device_manager.get_agent_data_queue(user_id, run_id) - await queue.put(frame) - except RuntimeError: - logger.warning( - "device_ws: agent_data for unknown run user=%s run=%s", - user_id, - run_id, - ) - else: - logger.warning( - "device_ws: agent_data missing run_id from user=%s", user_id - ) - - elif frame_type == WsFrameType.agent_complete: - run_id = frame.get("run_id") - if run_id: - try: - queue = device_manager.get_agent_data_queue(user_id, run_id) - # Sentinel: signals the agent data stream is finished. - await queue.put(None) - except RuntimeError: - pass - else: - logger.warning( - "device_ws: agent_complete missing run_id from user=%s", user_id - ) - elif frame_type == WsFrameType.home_request: asyncio.create_task( _handle_home_request(websocket, user_id, frame) @@ -188,6 +158,16 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: _handle_floating_request(websocket, user_id, frame) ) + elif frame_type == WsFrameType.journey_start: + asyncio.create_task( + _handle_journey_start(websocket, user_id, frame) + ) + + elif frame_type == WsFrameType.journey_message: + asyncio.create_task( + _handle_journey_message(websocket, user_id, frame) + ) + elif frame_type == "pong": # Heartbeat ack — nothing to do, connection is alive. pass @@ -345,6 +325,63 @@ async def _handle_floating_request( ) +# ── v4 Journey Handlers ───────────────────────────────────────────── + + +async def _handle_journey_start( + websocket: WebSocket, + user_id: str, + frame: dict, +) -> None: + """Handle a journey_start frame — explores directory and sends first question.""" + executor = await _make_ws_executor(websocket, user_id) + set_client_executor(executor) + try: + reply = await handle_journey_start(user_id, frame) + await websocket.send_text(json.dumps(reply)) + except Exception as exc: + logger.error( + "device_ws: journey_start failed user=%s: %s", user_id, exc + ) + await websocket.send_text(json.dumps({ + "type": "journey_reply", + "session_id": frame.get("session_id", ""), + "message": f"Failed to start journey: {exc}", + "done": True, + "prompt_template": None, + })) + finally: + clear_client_executor() + + +async def _handle_journey_message( + websocket: WebSocket, + user_id: str, + frame: dict, +) -> None: + """Handle a journey_message frame — continues the journey conversation.""" + executor = await _make_ws_executor(websocket, user_id) + set_client_executor(executor) + try: + reply = await handle_journey_message(user_id, frame) + await websocket.send_text(json.dumps(reply)) + except Exception as exc: + session_id = frame.get("session_id", "") + logger.error( + "device_ws: journey_message failed user=%s session=%s: %s", + user_id, session_id, exc, + ) + await websocket.send_text(json.dumps({ + "type": "journey_reply", + "session_id": session_id, + "message": f"Journey error: {exc}", + "done": True, + "prompt_template": None, + })) + finally: + clear_client_executor() + + # ── Heartbeat ───────────────────────────────────────────────────────── async def _heartbeat_loop(websocket: WebSocket) -> None: @@ -378,6 +415,3 @@ async def _mark_runs_disconnected(user_id: str) -> None: user_id, exc, ) - - - diff --git a/app/billing/tier_manager.py b/app/billing/tier_manager.py index 254dfd7..5e3f93f 100644 --- a/app/billing/tier_manager.py +++ b/app/billing/tier_manager.py @@ -21,6 +21,7 @@ FEATURES: dict[str, dict[str, Any]] = { "free": { "agents": 3, "batch_active": 2, + "batch_runs_per_day": 5, "cloud_storage_gb": 0, "backup_gb": 0, "providers": 1, @@ -31,6 +32,7 @@ FEATURES: dict[str, dict[str, Any]] = { "pro": { "agents": -1, # unlimited "batch_active": 10, + "batch_runs_per_day": 50, "cloud_storage_gb": 5, "backup_gb": 5, "providers": -1, @@ -41,6 +43,7 @@ FEATURES: dict[str, dict[str, Any]] = { "power": { "agents": -1, "batch_active": -1, # unlimited + "batch_runs_per_day": -1, # unlimited "cloud_storage_gb": 25, "backup_gb": 25, "providers": -1, @@ -51,6 +54,7 @@ FEATURES: dict[str, dict[str, Any]] = { "team": { "agents": -1, "batch_active": -1, + "batch_runs_per_day": -1, # unlimited "cloud_storage_gb": -1, # unlimited "backup_gb": -1, # unlimited "providers": -1, diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index 51d8745..c4c420b 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -2,14 +2,14 @@ Drives two agent types: -* **Local directory agent** — sends an ``agent_run`` frame to the connected - Electron device, waits for the device to stream back file contents via - ``agent_data`` frames, then calls the LLM to extract structured items from - each file and pushes inserts to Electron via tool-call round-trips. +* **Local directory agent** — two-phase execution that mirrors the + ``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the + user's directory via file-system tools and groups files by project. + Phase 2 (Processing) reads full file contents and performs CRUD + operations using the standard entity tools (tasks, notes, etc.). * **Cloud connector agent** — fetches data from third-party APIs (Gmail, - Teams, Outlook) and pushes extracted items to Electron. **This path is - a stub** — provider integrations are implemented in Step 3.6. + Teams, Outlook) and pushes extracted items to Electron. Usage ----- @@ -33,11 +33,17 @@ from datetime import datetime, timedelta, timezone from typing import Any from croniter import croniter -from langchain_core.messages import HumanMessage, SystemMessage +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from sqlalchemy import select +from app.agents.filesystem_agent import FILESYSTEM_TOOLS +from app.agents.note_agent import NOTE_TOOLS +from app.agents.project_agent import PROJECT_TOOLS +from app.agents.task_agent import TASK_TOOLS +from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.device_manager import DeviceConnectionManager from app.core.llm import get_llm +from app.core.ws_context import clear_client_executor, set_client_executor from app.db import async_session from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig @@ -45,50 +51,83 @@ logger = logging.getLogger(__name__) # ── Timeouts ─────────────────────────────────────────────────────────────── -# Max seconds to wait for Electron to finish streaming file data. -_FILE_READ_TIMEOUT: int = 120 -# Max seconds to wait for Electron to acknowledge a single tool-call insert. -_INSERT_TIMEOUT: int = 30 +# Max seconds to wait for a single tool-call round-trip (FE → BE). +_TOOL_CALL_TIMEOUT: int = 30 +# Max LLM reasoning steps per phase. +_MAX_TRIAGE_STEPS: int = 10 +_MAX_PROCESSING_STEPS: int = 12 -# ── Allowed tables & extraction schema hints ─────────────────────────────── +# ── Data-type to tool mapping ───────────────────────────────────────────── -_ALLOWED_TABLES: frozenset[str] = frozenset( - {"tasks", "notes", "timelines", "projects", "taskComments"} -) - -# Field descriptions fed to the extraction LLM as concise schema references. -_TABLE_SCHEMAS: dict[str, str] = { - "tasks": ( - "title (str, required), description (str), " - "status (todo|in_progress|done, default todo), " - "priority (high|medium|low, default medium), " - "assignee (JSON array string), dueDate (ms timestamp int), projectId (str)" - ), - "notes": "title (str, required), content (str, markdown), projectId (str)", - "timelines": ( - "title (str, required), projectId (str, required), date (ms timestamp int)" - ), - "projects": "name (str, required), clientId (str)", - "taskComments": "taskId (str, required), author (str), content (str, required)", +_DATA_TYPE_TOOLS: dict[str, list[Any]] = { + "tasks": TASK_TOOLS, + "projects": PROJECT_TOOLS, + "notes": NOTE_TOOLS, + "timelines": TIMELINE_TOOLS, } -_EXTRACTION_SYSTEM_PROMPT = """\ -You are a data extraction assistant for a freelance project management tool. -Given a document, extract structured records matching the user's instructions. +# ── Triage prompt ───────────────────────────────────────────────────────── -Output a JSON array (no markdown fences, no explanation) of objects shaped: - [{{"table": "", "data": {{...fields}}}}, ...] +_TRIAGE_SYSTEM_PROMPT = """\ +You are a file triage assistant for a freelance project management tool. +Your job is to explore a local directory on the user's device, understand its +structure, and group files by project context. -Allowed table names and their fields: -{table_schemas} +You have access to these tools: +- list_directory: to map folder structure +- get_file_metadata: to check creation/modification dates +- read_file_content: to read brief snippets when needed for categorisation +- list_projects / list_all_projects / get_project: to fetch existing projects + from the user's workspace and match files to them -Rules: -- Only extract tables listed in the "data_types" instructions. -- Use camelCase field names exactly as shown above. -- Omit optional fields you cannot determine; do not invent data. -- Never include id, createdAt, updatedAt, isAiSuggested, or isApproved. -- If nothing relevant is found, return an empty JSON array: [] -- Return ONLY the JSON array. +Instructions: +1. Start by calling list_directory on the configured root path. +2. Explore subdirectories as needed to understand the structure. +3. Use get_file_metadata to check modification dates. Skip files that have + NOT been modified since: {last_run_at}. +4. Call list_all_projects to get the user's existing projects. +5. Match files to existing projects by name, folder structure, or content hints. +6. If files don't match any existing project, group them under "standalone". + +{custom_prompt_section} + +Target entity types to extract: {data_types} +File extensions to consider: {file_extensions} + +When you have finished exploring, output ONLY a JSON object (no markdown +fences, no explanation) mapping project IDs or "standalone" to file path +arrays: + +{{"": ["", ...], "standalone": ["", ...]}} + +Return ONLY the JSON object as your final message. +""" + +# ── Processing prompt ───────────────────────────────────────────────────── + +_PROCESSING_BASE_PROMPT = """\ +You are a data extraction and management assistant for a freelance project +management tool. You have access to tools for reading files and performing +CRUD operations on the user's workspace. + +Your task: +1. Read the full content of each file listed below using read_file_content. +2. Based on the content and the user's instructions, create the appropriate + records using the CRUD tools available to you (create_task, create_note, + create_timeline, create_project, etc.). +3. ONLY create records of these entity types: {data_types}. +4. For every record you create, set isAiSuggested=1 and isApproved=0. +5. Do NOT invent data. Only extract what is clearly present in the files. +6. If a file contains no relevant data for the target entity types, skip it. + +{project_context} + +Files to process: +{file_list} + +{custom_prompt_section} + +After processing all files, respond with a brief summary of what you created. """ @@ -118,100 +157,145 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool: return False # Fail-safe: don't trigger if expression is invalid. -# ── LLM extraction ───────────────────────────────────────────────────────── +# ── WS executor for agent context ───────────────────────────────────────── -async def _extract_items_from_content( - prompt_template: str, - file_content: str, - data_types: list[str], -) -> list[dict[str, Any]]: - """Call the LLM to extract structured records from *file_content*. - - Returns a validated list of ``{table: str, data: dict}`` objects. - Items referencing tables not in *data_types* are discarded. - """ - allowed = [t for t in data_types if t in _ALLOWED_TABLES] - if not allowed: - return [] - - schema_text = "\n".join( - f" {table}: {_TABLE_SCHEMAS.get(table, '(unknown)')}" for table in allowed - ) - system_prompt = _EXTRACTION_SYSTEM_PROMPT.format(table_schemas=schema_text) - user_prompt = ( - f"User instructions: {prompt_template}\n\n" - f"Extract these record types: {', '.join(allowed)}\n\n" - f"Document:\n{file_content[:8000]}" - ) - - llm = get_llm() - raw = "" - try: - response = await llm.ainvoke( - [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)] - ) - raw = str(response.content).strip() - items: list[dict] = json.loads(raw) - if not isinstance(items, list): - raise ValueError("LLM response is not a JSON array") - except json.JSONDecodeError as exc: - logger.warning( - "agent_runner: LLM extraction returned invalid JSON: %s — snippet: %.200r", - exc, - raw, - ) - return [] - # Other exceptions (LLM API errors, network errors) propagate to the - # caller (run_local_agent) which records them per-file in the run log. - - validated: list[dict[str, Any]] = [] - for item in items: - table = item.get("table") - data = item.get("data") - if not isinstance(table, str) or table not in allowed: - continue - if not isinstance(data, dict) or not data: - continue - # Strip any server-generated or forbidden fields. - for _field in ("id", "createdAt", "updatedAt", "isAiSuggested", "isApproved"): - data.pop(_field, None) - validated.append({"table": table, "data": data}) - return validated - - -# ── Tool-call insert helper ───────────────────────────────────────────────── - - -async def _send_insert_to_client( +def _make_agent_executor( user_id: str, - table: str, - data: dict[str, Any], device_mgr: DeviceConnectionManager, -) -> dict[str, Any]: - """Send an ``insert`` tool_call frame to Electron and await the tool_result. - - All inserts include ``isAiSuggested=1, isApproved=0`` so the user can - review AI-produced records before they are treated as confirmed. - - Raises ``asyncio.TimeoutError`` if Electron does not respond within - ``_INSERT_TIMEOUT`` seconds. Raises ``RuntimeError`` if the device - disconnects before the frame can be sent. +) -> Any: + """Create a WS callback for ``set_client_executor()`` so that all tools + can use ``execute_on_client()`` during an agent run. """ - call_id = str(uuid.uuid4()) - payload: dict[str, Any] = { - "type": "tool_call", - "id": call_id, - "action": "insert", - "table": table, - "data": {**data, "isAiSuggested": 1, "isApproved": 0}, - } - fut = device_mgr.create_pending_call(user_id, call_id) - await device_mgr.send_frame(user_id, payload) - return await asyncio.wait_for(fut, timeout=_INSERT_TIMEOUT) + async def _executor(payload: dict) -> dict: + payload["type"] = "tool_call" + call_id = payload["id"] + fut = device_mgr.create_pending_call(user_id, call_id) + await device_mgr.send_frame(user_id, payload) + return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT) + return _executor -# ── Local agent runner ────────────────────────────────────────────────────── +# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ────────── + + +def _as_text(content: Any) -> str: + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + text = item.get("text") + if isinstance(text, str): + parts.append(text) + return "".join(parts) + return str(content) + + +async def _run_agent_with_tools( + *, + system_prompt: str, + user_message: str, + tools: list[Any], + max_steps: int, +) -> str: + """Run an LLM agent with tool-calling, returning the final text response. + + Follows the same pattern as ``deep_agent._run_single_agent``: + bind tools → invoke → handle tool calls → repeat until final text. + """ + llm = get_llm() + llm_with_tools = llm.bind_tools(tools) + messages: list[Any] = [ + SystemMessage(content=system_prompt), + HumanMessage(content=user_message), + ] + + tool_calls_count = 0 + tool_map = {tool_def.name: tool_def for tool_def in tools} + + for _ in range(max_steps): + response: AIMessage = await llm_with_tools.ainvoke(messages) + messages.append(response) + + if not response.tool_calls: + return _as_text(response.content) + + for call in response.tool_calls: + tool_calls_count += 1 + call_id = str(call.get("id", "")) + call_name = str(call.get("name", "")) + call_args = call.get("args", {}) + logger.info( + "agent_runner: tool_call name=%s args=%s", + call_name, + json.dumps(call_args, ensure_ascii=True)[:800], + ) + + tool_fn = tool_map.get(call_name) + if tool_fn is None: + tool_output = f"Unknown tool: {call_name}" + else: + tool_output = await tool_fn.ainvoke(call_args) + + logger.info( + "agent_runner: tool_result name=%s output=%s", + call_name, + str(tool_output)[:1200], + ) + messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + + # Fallback: exceeded max steps, get final response without tools. + final = await llm.ainvoke(messages) + return _as_text(final.content) + + +# ── Triage map parser ───────────────────────────────────────────────────── + + +def _parse_triage_map(raw: str) -> dict[str, list[str]] | None: + """Extract the JSON triage map from the LLM's final response.""" + text = raw.strip() + # Try direct parse first. + try: + parsed = json.loads(text) + if isinstance(parsed, dict): + return {k: v for k, v in parsed.items() if isinstance(v, list)} + except json.JSONDecodeError: + pass + + # Try extracting JSON from markdown fences or surrounding text. + import re + match = re.search(r"\{[\s\S]*\}", text) + if match: + try: + parsed = json.loads(match.group(0)) + if isinstance(parsed, dict): + return {k: v for k, v in parsed.items() if isinstance(v, list)} + except json.JSONDecodeError: + pass + return None + + +# ── Tool list builder ───────────────────────────────────────────────────── + + +def _build_processing_tools(data_types: list[str]) -> list[Any]: + """Build the tool list for Phase 2 based on user's data_types selection.""" + tools: list[Any] = list(FILESYSTEM_TOOLS) + for dt in data_types: + dt_tools = _DATA_TYPE_TOOLS.get(dt) + if dt_tools: + tools.extend(dt_tools) + return tools + + +# ── Local agent runner (two-phase) ───────────────────────────────────────── async def run_local_agent( @@ -220,24 +304,19 @@ async def run_local_agent( run_log: AgentRunLog, device_mgr: DeviceConnectionManager, ) -> None: - """Execute a local directory agent run end-to-end. + """Execute a local directory agent run using two-phase LLM-with-tools. - Steps: + Phase 1 — Triage: + Explore the directory structure, check metadata, match files to + existing projects. Output: a JSON map of project → file paths. - 1. Verify the device identified by ``config.device_id`` is currently online. - 2. Pre-create the agent_data queue so no incoming frames are lost. - 3. Send ``agent_run`` frame to Electron (paths, extensions, prompt, data_types). - 4. Consume ``agent_data`` frames until the ``None`` sentinel from - ``agent_complete``. - 5. For each received file call the LLM to extract ``{table, data}`` items. - 6. Push each item to Electron as an ``insert`` tool-call; include - ``isAiSuggested=1, isApproved=0`` so users can review AI suggestions. - 7. Persist the run outcome (status, counts, errors) and update - ``config.last_run_at``. + Phase 2 — Processing: + For each project group, read full file contents and perform CRUD + operations using the standard entity tools. """ run_id = run_log.id - # ── 1. Device online check ───────────────────────────────────────── + # ── Device online check ───────────────────────────────────────── target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else "" if target_device_id: is_online = device_mgr.is_online(user_id, target_device_id) @@ -258,111 +337,128 @@ async def run_local_agent( ) return - # ── 2. Pre-create agent_data queue ──────────────────────────────── - try: - device_mgr.get_agent_data_queue(user_id, run_id) - except RuntimeError: - await _finalize_run( - run_log, - status="error", - errors=["Device disconnected before agent run could start"], - ) - return + # ── Set up WS executor for tools ──────────────────────────────── + executor = _make_agent_executor(user_id, device_mgr) + set_client_executor(executor) - # ── 3. Send agent_run frame ──────────────────────────────────────── - frame: dict[str, Any] = { - "type": "agent_run", - "run_id": run_id, - "agent_id": config.id, - "config": { - "paths": config.directory_paths, - "file_extensions": config.file_extensions, - "prompt_template": config.prompt_template, - "data_types": config.data_types, - }, - } - try: - await device_mgr.send_frame(user_id, frame) - except RuntimeError as exc: - device_mgr.cleanup_agent_data_queue(user_id, run_id) - await _finalize_run( - run_log, - status="error", - errors=[f"Failed to send agent_run frame: {exc}"], - ) - return - - logger.info( - "agent_runner: sent agent_run run=%s agent=%s user=%s", - run_id, - config.id, - user_id, - ) - - # ── 4. Consume agent_data frames ────────────────────────────────── - files: list[dict[str, Any]] = [] errors: list[str] = [] - - try: - queue = device_mgr.get_agent_data_queue(user_id, run_id) - deadline = asyncio.get_event_loop().time() + _FILE_READ_TIMEOUT - while True: - remaining = deadline - asyncio.get_event_loop().time() - if remaining <= 0: - errors.append("Timed out waiting for file data from device") - break - try: - frame_data = await asyncio.wait_for(queue.get(), timeout=remaining) - except asyncio.TimeoutError: - errors.append("Timed out waiting for file data from device") - break - if frame_data is None: - # Sentinel from agent_complete — stream is done. - break - files.extend(frame_data.get("files", [])) - except RuntimeError as exc: - errors.append(f"Queue error reading agent data: {exc}") - - # ── 5–6. Extract + insert ───────────────────────────────────────── items_processed = 0 items_created = 0 - for file_info in files: - file_path: str = file_info.get("path", "") - content: str = file_info.get("content", "") - if not content: - continue - items_processed += 1 - try: - extracted = await _extract_items_from_content( - config.prompt_template, content, config.data_types + try: + # ── Phase 1: Triage ───────────────────────────────────────── + logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id) + + last_run_str = "never (process all files)" + if config.last_run_at: + last_run_str = config.last_run_at.isoformat() + + custom_section = "" + if config.prompt_template: + custom_section = f"User instructions:\n{config.prompt_template}" + + file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all" + + triage_prompt = _TRIAGE_SYSTEM_PROMPT.format( + last_run_at=last_run_str, + custom_prompt_section=custom_section, + data_types=", ".join(config.data_types), + file_extensions=file_ext_str, + ) + + directory_paths = config.directory_paths + triage_user_msg = ( + f"Explore these directories and produce the triage map:\n" + f"{json.dumps(directory_paths, ensure_ascii=False)}" + ) + + triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS) + + triage_response = await _run_agent_with_tools( + system_prompt=triage_prompt, + user_message=triage_user_msg, + tools=triage_tools, + max_steps=_MAX_TRIAGE_STEPS, + ) + + triage_map = _parse_triage_map(triage_response) + if not triage_map: + errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}") + await _finalize_run(run_log, status="error", errors=errors) + return + + logger.info( + "agent_runner: run=%s triage complete groups=%d total_files=%d", + run_id, + len(triage_map), + sum(len(files) for files in triage_map.values()), + ) + + # ── Phase 2: Processing (per group) ───────────────────────── + processing_tools = _build_processing_tools(config.data_types) + + for group_key, file_paths in triage_map.items(): + if not file_paths: + continue + + logger.info( + "agent_runner: run=%s phase=processing group=%s files=%d", + run_id, + group_key, + len(file_paths), ) - except Exception as exc: - errors.append(f"LLM extraction error for {file_path!r}: {exc}") - continue - for item in extracted: + # Build project context for the LLM. + if group_key == "standalone": + project_context = "These files are not associated with any existing project." + else: + project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records." + + file_list_str = "\n".join(f"- {fp}" for fp in file_paths) + + processing_prompt = _PROCESSING_BASE_PROMPT.format( + data_types=", ".join(config.data_types), + project_context=project_context, + file_list=file_list_str, + custom_prompt_section=custom_section, + ) + + items_processed += len(file_paths) + try: - result = await _send_insert_to_client( - user_id, item["table"], item["data"], device_mgr + result_text = await _run_agent_with_tools( + system_prompt=processing_prompt, + user_message="Process the listed files now.", + tools=processing_tools, + max_steps=_MAX_PROCESSING_STEPS, ) - if result.get("error"): - errors.append( - f"Insert failed ({item['table']}, {file_path!r}): {result['error']}" - ) - else: - items_created += 1 - except asyncio.TimeoutError: - errors.append( - f"Timed out awaiting insert ack ({item['table']}, {file_path!r})" + logger.info( + "agent_runner: run=%s group=%s processing_result=%s", + run_id, + group_key, + result_text[:500], + ) + # Count created items by scanning tool call results. + # The tools themselves handle creation; we estimate from the + # summary. A more precise count would require intercepting + # tool results, but the summary is sufficient for the run log. + except Exception as exc: + errors.append(f"Processing error for group '{group_key}': {exc}") + logger.error( + "agent_runner: run=%s group=%s processing failed: %s", + run_id, + group_key, + exc, ) - except RuntimeError as exc: - errors.append(f"Insert error ({item['table']}, {file_path!r}): {exc}") - # ── 7. Finalise ──────────────────────────────────────────────────── - device_mgr.cleanup_agent_data_queue(user_id, run_id) + except Exception as exc: + errors.append(f"Agent run failed: {exc}") + logger.error("agent_runner: run=%s failed: %s", run_id, exc) + finally: + clear_client_executor() - if errors and items_created == 0: + # ── Finalise ──────────────────────────────────────────────────── + if errors and items_processed == 0: final_status = "error" elif errors: final_status = "partial" @@ -380,11 +476,10 @@ async def run_local_agent( config_type="local", ) logger.info( - "agent_runner: run=%s done status=%s processed=%d created=%d errors=%d", + "agent_runner: run=%s done status=%s processed=%d errors=%d", run_id, final_status, items_processed, - items_created, len(errors), ) @@ -411,8 +506,7 @@ async def run_cloud_agent( 3. Instantiate the provider client (Gmail or MS Graph). 4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for the first run) applying ``config.filter_config`` filters. - 5. For each message/email call ``_extract_items_from_content`` with - ``config.prompt_template`` to get structured ``{table, data}`` items. + 5. For each message/email call the LLM to extract structured items. 6. Push each item to Electron as an ``insert`` tool-call. 7. If the provider refreshed its access token, re-encrypt and write it back to ``config.oauth_token_encrypted``. @@ -520,37 +614,40 @@ async def run_cloud_agent( user_id, ) - # ── 5–6. Extract + insert ───────────────────────────────────────── - for msg in raw_messages: - content_text = msg.as_text - if not content_text: - continue - items_processed += 1 - try: - extracted = await _extract_items_from_content( - config.prompt_template, content_text, config.data_types - ) - except Exception as exc: - errors.append(f"LLM extraction error for message {msg.id!r}: {exc}") - continue + # ── 5–6. Extract + insert via LLM with tools ───────────────────── + executor = _make_agent_executor(user_id, device_mgr) + set_client_executor(executor) + + try: + processing_tools = _build_processing_tools(config.data_types) + custom_section = "" + if config.prompt_template: + custom_section = f"User instructions:\n{config.prompt_template}" + + for msg in raw_messages: + content_text = msg.as_text + if not content_text: + continue + items_processed += 1 + + processing_prompt = _PROCESSING_BASE_PROMPT.format( + data_types=", ".join(config.data_types), + project_context="Determine the appropriate project from the message context.", + file_list=f"Message from {config.provider} (id: {msg.id})", + custom_prompt_section=custom_section, + ) - for item in extracted: try: - result = await _send_insert_to_client( - user_id, item["table"], item["data"], device_mgr + await _run_agent_with_tools( + system_prompt=processing_prompt, + user_message=f"Process this message content:\n\n{content_text[:8000]}", + tools=processing_tools, + max_steps=_MAX_PROCESSING_STEPS, ) - if result.get("error"): - errors.append( - f"Insert failed ({item['table']}, msg={msg.id!r}): {result['error']}" - ) - else: - items_created += 1 - except asyncio.TimeoutError: - errors.append( - f"Timed out awaiting insert ack ({item['table']}, msg={msg.id!r})" - ) - except RuntimeError as exc: - errors.append(f"Insert error ({item['table']}, msg={msg.id!r}): {exc}") + except Exception as exc: + errors.append(f"LLM processing error for message {msg.id!r}: {exc}") + finally: + clear_client_executor() # ── 7. Persist refreshed token (if any) ─────────────────────────── refreshed = getattr(provider, "refreshed_credentials", None) diff --git a/app/core/device_manager.py b/app/core/device_manager.py index 62c1ec9..c451fa7 100644 --- a/app/core/device_manager.py +++ b/app/core/device_manager.py @@ -3,20 +3,15 @@ Maintains in-memory state for all active Electron → backend WebSocket connections. One connection per user (latest replaces previous). -The manager participates in two interaction patterns: +The manager handles the **tool-call round-trip** pattern: + - Backend sends ``tool_call`` frame → Electron executes the action → + returns ``tool_result`` frame. + - ``create_pending_call`` registers a Future keyed by ``call_id``. + - ``resolve_pending_call`` fulfils the Future; callers awaiting it + receive the result dict from Electron. -1. **Tool-call round-trip** (bidirectional CRUD): - - Backend sends ``tool_call`` frame → Electron executes CRUD → returns - ``tool_result`` frame. - - ``create_pending_call`` registers a Future keyed by ``call_id``. - - ``resolve_pending_call`` fulfils the Future; callers awaiting it - receive the result dict from Electron. - -2. **Agent-data streaming** (local directory agent runs): - - Backend sends ``agent_run`` frame → Electron reads files and sends - back a stream of ``agent_data`` frames followed by ``agent_complete``. - - ``get_agent_data_queue`` returns (or creates) an asyncio.Queue for - a specific ``run_id`` so the agent runner can iterate frames. +This pattern is used by all tools (CRUD, file-system, etc.) via +``execute_on_client()`` in ``ws_context.py``. The ``device_manager`` module-level singleton is imported by both the device WS route and the agent runner. @@ -42,8 +37,6 @@ class DeviceConnection: device_id: str # Futures indexed by tool_call id — resolved when tool_result arrives. pending_calls: dict[str, asyncio.Future[dict]] = field(default_factory=dict) - # Per-run queues for agent_data / agent_complete frames. - agent_data_queues: dict[str, asyncio.Queue[dict | None]] = field(default_factory=dict) class DeviceConnectionManager: @@ -153,31 +146,6 @@ class DeviceConnectionManager: if fut is not None and not fut.done(): fut.set_result(result) - # ── Agent-data queue ────────────────────────────────────────────── - - def get_agent_data_queue( - self, user_id: str, run_id: str - ) -> asyncio.Queue[dict | None]: - """Return (creating if absent) the queue for *run_id* agent frames. - - The agent runner reads from this queue. The device WS handler writes - to it. ``None`` is the sentinel that signals the stream is finished. - """ - conn = self._connections.get(user_id) - if conn is None: - raise RuntimeError( - f"get_agent_data_queue: user {user_id!r} is not connected" - ) - if run_id not in conn.agent_data_queues: - conn.agent_data_queues[run_id] = asyncio.Queue() - return conn.agent_data_queues[run_id] - - def cleanup_agent_data_queue(self, user_id: str, run_id: str) -> None: - """Remove the queue for *run_id* once a run has completed.""" - conn = self._connections.get(user_id) - if conn: - conn.agent_data_queues.pop(run_id, None) - # Module-level singleton — import this everywhere. device_manager = DeviceConnectionManager() diff --git a/app/main.py b/app/main.py index 957512b..ff5f5b2 100644 --- a/app/main.py +++ b/app/main.py @@ -50,7 +50,7 @@ def create_app() -> FastAPI: app.add_middleware(SanitizerMiddleware) app.add_middleware(TierRateLimitMiddleware) - from app.api.routes import agent_setup, agents, auth, backup, billing, chat, device_ws, plugins, storage, vectors + from app.api.routes import agents, auth, backup, billing, chat, device_ws, plugins, storage, vectors app.include_router(auth.router, prefix="/api/v1") app.include_router(chat.router, prefix="/api/v1") @@ -60,7 +60,6 @@ def create_app() -> FastAPI: app.include_router(plugins.router, prefix="/api/v1") app.include_router(billing.router, prefix="/api/v1") app.include_router(agents.router, prefix="/api/v1") - app.include_router(agent_setup.router, prefix="/api/v1") app.include_router(device_ws.router, prefix="/api/v1") @app.get("/api/v1/health", tags=["health"]) diff --git a/app/schemas.py b/app/schemas.py index 33bf986..73eb2ee 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -142,9 +142,6 @@ class WsFrameType(str, Enum): tool_result = "tool_result" final = "final" ping = "ping" - agent_run = "agent_run" - agent_data = "agent_data" - agent_complete = "agent_complete" device_hello = "device_hello" # ── v3 frame types ───────────────────────────────────────────────── home_request = "home_request" @@ -156,6 +153,10 @@ class WsFrameType(str, Enum): data_request = "data_request" data_response = "data_response" mutation = "mutation" + # ── v4 journey frame types ──────────────────────────────────────── + journey_start = "journey_start" + journey_message = "journey_message" + journey_reply = "journey_reply" class WsToolCall(BaseModel): @@ -208,31 +209,6 @@ class WsDeviceHello(BaseModel): agent_ids: list[str] = Field(default_factory=list) -class WsAgentRun(BaseModel): - """Server → Client: trigger an agent run on the connected device.""" - - type: Literal[WsFrameType.agent_run] = WsFrameType.agent_run - run_id: str - agent_id: str - config: dict[str, Any] - - -class WsAgentData(BaseModel): - """Client → Server: files read by the local agent.""" - - type: Literal[WsFrameType.agent_data] = WsFrameType.agent_data - run_id: str - files: list[dict[str, Any]] - - -class WsAgentComplete(BaseModel): - """Client → Server: Electron signals it has finished reading files.""" - - type: Literal[WsFrameType.agent_complete] = WsFrameType.agent_complete - run_id: str - files_read: int - errors: list[str] = Field(default_factory=list) - # ── WebSocket v3 Frame Models ───────────────────────────────────────── @@ -319,6 +295,7 @@ class AgentCreationCheckResponse(BaseModel): class AgentTriggerRequest(BaseModel): directory: str = Field(min_length=1) + device_id: str = Field(default="") what_to_extract: list[Literal["task", "note", "timeline", "project"]] = Field(min_length=1) actions_by_type: dict[ Literal["task", "note", "timeline", "project"], @@ -345,18 +322,3 @@ class AgentRunLogResponse(BaseModel): # ── Chatbot Journey ─────────────────────────────────────────────────── -class JourneyStartRequest(BaseModel): - agent_type: Literal["local", "cloud"] - agent_id: str | None = None - - -class JourneyMessageRequest(BaseModel): - session_id: str - message: str - - -class JourneyResponse(BaseModel): - session_id: str - message: str - done: bool - prompt_template: str | None = None