diff --git a/app/agents/task_agent.py b/app/agents/task_agent.py index 0259a0f..5be4632 100644 --- a/app/agents/task_agent.py +++ b/app/agents/task_agent.py @@ -29,7 +29,7 @@ TASK_SYSTEM_PROMPT = ( " - project_id is optional; link to a project when the user mentions one\n" " - is_ai_suggested: 1 only when proactively proposing a task the user\n" " did not explicitly request; 0 otherwise\n" - " - is_approved defaults to 0; set to 1 only when the user confirms\n" + " - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n" " - Use list_tasks_due_today for 'what's due today' queries\n" " - For update_task, use -1 for integer fields you do not want to change\n" " - Always confirm the action in plain, user-friendly language." @@ -79,7 +79,6 @@ async def create_task( due_date: int = 0, project_id: str = "", is_ai_suggested: int = 0, - is_approved: int = 0, ) -> str: """Create a new task. title: task title (required) @@ -90,7 +89,6 @@ async def create_task( due_date: Unix timestamp in milliseconds; 0 means no due date project_id: optional UUID of the parent project is_ai_suggested: 1 if proactively suggested, 0 if user-requested - is_approved: 0 until the user confirms; 1 when confirmed """ result = await execute_on_client( action="insert", @@ -104,7 +102,6 @@ async def create_task( "dueDate": due_date or None, "projectId": project_id or None, "isAiSuggested": is_ai_suggested, - "isApproved": is_approved, }, ) row = result["row"] @@ -124,12 +121,10 @@ async def update_task( assignees: str = "", due_date: int = -1, project_id: str = "", - is_approved: int = -1, ) -> str: """Update fields on an existing task. Only pass fields you want to change. task_id: the task's UUID (required) due_date: -1 means unchanged; 0 clears the due date; any positive value sets it - is_approved: -1 means unchanged; 0 or 1 sets the value """ updates: dict[str, Any] = {} if title: @@ -146,8 +141,6 @@ async def update_task( updates["dueDate"] = due_date or None if project_id: updates["projectId"] = project_id - if is_approved != -1: - updates["isApproved"] = is_approved result = await execute_on_client( action="update", table="tasks", diff --git a/app/agents/timeline_agent.py b/app/agents/timeline_agent.py index f9b5652..4c7a217 100644 --- a/app/agents/timeline_agent.py +++ b/app/agents/timeline_agent.py @@ -25,7 +25,7 @@ TIMELINE_SYSTEM_PROMPT = ( " - For listing, project_id must be a UUID; never pass plain names as project_id\n" " - date is a Unix timestamp in milliseconds; convert human-readable dates\n" " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" - " - is_approved: 0 until the user explicitly confirms; then 1\n" + " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" " - For update_timeline, use -1 for integer fields you do not want to change\n" " - Listing without a project_id returns all timelines across projects\n" " - Always echo the title and formatted date in your confirmation." @@ -54,14 +54,12 @@ async def create_timeline( title: str, date: int, is_ai_suggested: int = 0, - is_approved: int = 0, ) -> str: """Create a project timeline (milestone). project_id: REQUIRED UUID of the parent project title: descriptive name for the milestone date: Unix timestamp in milliseconds is_ai_suggested: 1 if proactively suggested, 0 if user-requested - is_approved: 0 until the user confirms """ result = await execute_on_client( action="insert", @@ -71,7 +69,6 @@ async def create_timeline( "title": title, "date": date, "isAiSuggested": is_ai_suggested, - "isApproved": is_approved, }, ) row = result["row"] @@ -83,20 +80,16 @@ async def update_timeline( timeline_id: str, title: str = "", date: int = -1, - is_approved: int = -1, ) -> str: """Update a timeline. Only pass fields that should change. timeline_id: UUID of the timeline (required) date: -1 means unchanged; any other value sets the new date (ms timestamp) - is_approved: -1 means unchanged; 0 or 1 sets the approval state """ updates: dict[str, Any] = {} if title: updates["title"] = title if date != -1: updates["date"] = date - if is_approved != -1: - updates["isApproved"] = is_approved result = await execute_on_client( action="update", table="timelines", diff --git a/app/api/routes/agents.py b/app/api/routes/agents.py index 53d0edd..30ecfc9 100644 --- a/app/api/routes/agents.py +++ b/app/api/routes/agents.py @@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_user from app.billing.tier_manager import FEATURES -from app.core.agent_runner import run_local_agent +from app.core.agent_runner import is_agent_running, run_local_agent from app.core.device_manager import device_manager from app.db import get_session from app.models import AgentRunLog, LocalAgentConfig @@ -193,6 +193,12 @@ async def trigger_agent_run( # Use the FE's stable agent_id if provided, fall back to the ephemeral config id. stable_agent_id = body.agent_id or config.id + if is_agent_running(stable_agent_id): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Agent is already running. Only one run per agent is allowed at a time.", + ) + run_log = AgentRunLog( agent_id=stable_agent_id, agent_type="local", diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index 4926a6d..7292848 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -2,11 +2,12 @@ Drives two agent types: -* **Local directory agent** — two-phase execution that mirrors the - ``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the - user's directory via file-system tools and groups files by project. - Phase 2 (Processing) reads full file contents and performs CRUD - operations using the standard entity tools (tasks, notes, etc.). +* **Local directory agent** — two-step execution per file: + Step 1 (Classification) uses code to fetch all projects and asks the LLM + to identify which project the file belongs to and which domains are relevant. + Step 2 (Processing) fetches existing entities for that project/domains via + code and runs an LLM with tools — existing data in context enforces + update-first naturally. * **Cloud connector agent** — fetches data from third-party APIs (Gmail, Teams, Outlook) and pushes extracted items to Electron. @@ -43,19 +44,30 @@ from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.device_manager import DeviceConnectionManager from app.core.llm import get_llm -from app.core.ws_context import clear_client_executor, set_client_executor +from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.db import async_session from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig logger = logging.getLogger(__name__) +# ── Concurrency guard ───────────────────────────────────────────────────── +# Tracks agent IDs that currently have a run in progress. +# Prevents multiple simultaneous runs of the same agent within a single process. +_running_agents: set[str] = set() + + +def is_agent_running(agent_id: str) -> bool: + """Return ``True`` if *agent_id* already has a run in progress.""" + return agent_id in _running_agents + # ── Timeouts ─────────────────────────────────────────────────────────────── # Max seconds to wait for a single tool-call round-trip (FE → BE). _TOOL_CALL_TIMEOUT: int = 30 -# Max LLM reasoning steps per phase. -_MAX_TRIAGE_STEPS: int = 10 +# Max LLM reasoning steps for Step 2 processing. _MAX_PROCESSING_STEPS: int = 12 +# Max directory recursion depth during scan. +_MAX_SCAN_DEPTH: int = 5 # ── Data-type to tool mapping ───────────────────────────────────────────── @@ -66,46 +78,72 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = { "timelines": TIMELINE_TOOLS, } -# ── Triage prompt ───────────────────────────────────────────────────────── +# ── Step 1: Classification prompt ───────────────────────────────────────── -_TRIAGE_SYSTEM_PROMPT = """\ -You are a file triage assistant for a freelance project management tool. -Your job is to explore a local directory on the user's device, understand its -structure, and group files by project context. +_DOMAIN_DESCRIPTIONS: dict[str, str] = { + "tasks": ( + "Action items, to-dos, deliverables — anything that describes work to be done, " + "assigned to someone, or tracked with a due date or status." + ), + "notes": ( + "Documentation, meeting notes, summaries, reference material — " + "written content meant to be read and referenced rather than acted on." + ), + "timelines": ( + "Project milestones, deadlines, scheduled events — " + "specific dates that mark a point in the progress of a project." + ), + "projects": ( + "High-level project entities — only relevant if the file clearly introduces " + "a new project or updates the scope of an existing one." + ), +} -You have access to these tools: -- list_directory: to map folder structure -- get_file_metadata: to check creation/modification dates -- read_file_content: to read brief snippets when needed for categorisation -- list_projects / list_all_projects / get_project: to fetch existing projects - from the user's workspace and match files to them +_STEP1_SYSTEM_PROMPT = """\ +You are a file classifier for a freelance project management tool. -Instructions: -1. Start by calling list_directory on the configured root path. -2. Explore subdirectories as needed to understand the structure. -3. Use get_file_metadata to check modification dates. Skip files that have - NOT been modified since: {last_run_at}. -4. Call list_all_projects to get the user's existing projects. -5. Match files to existing projects by name, folder structure, or content hints. -6. If files don't match any existing project, group them under "standalone". +Given a file's content and a list of existing projects, your job is to: +1. Identify which project this file belongs to (or "standalone" if none match). +2. Identify which data domains are relevant to extract from this file, + limited to the allowed domains listed below. -{custom_prompt_section} +Domain definitions (only consider domains in the allowed list): +{domain_definitions} -Target entity types to extract: {data_types} -File extensions to consider: {file_extensions} +Respond ONLY with a JSON object — no markdown, no explanation: -When you have finished exploring, output ONLY a JSON object (no markdown -fences, no explanation) mapping project IDs or "standalone" to file path -arrays: +{{"project_id": " or standalone", "domains": ["tasks", "notes"]}} -{{"": ["", ...], "standalone": ["", ...]}} - -Return ONLY the JSON object as your final message. +Existing projects: +{projects_list} """ -# ── Processing prompt ───────────────────────────────────────────────────── +# ── Step 2: Processing prompt ───────────────────────────────────────────── -_PROCESSING_BASE_PROMPT = """\ +_PROCESSING_SYSTEM_PROMPT = """\ +You are a data extraction assistant for a freelance project management tool. + +Your task is to read the file content provided and create or update records +using the available tools. + +IMPORTANT — update-first rules: + The existing records below are the source of truth. + If an existing record semantically matches the content (by title, topic, + or context), update it instead of creating a duplicate. + Only create a new record when no existing match is found. + Set isAiSuggested=1 on all new records. + +{existing_context} + +Project context: {project_context} +Target domains: {data_types} + +{custom_prompt_section} +""" + +# ── Cloud processing prompt (kept separate for cloud agent) ─────────────── + +_CLOUD_PROCESSING_PROMPT = """\ You are a data extraction and management assistant for a freelance project management tool. @@ -124,26 +162,6 @@ Your task: 4. Do NOT invent data. Only extract what is clearly present in the files. 5. If a file contains no relevant data for the target entity types, skip it. -Update-first rules (apply in this order): - Tasks: - - Call list_tasks to find a match by title or context. - - If found: call add_task_comment (author "Adiuva"), update_task to set - assignees, state (ToDo / In Progress / Completed), or other fields. - - If NOT found: call create_task with isAiSuggested=1, isApproved=0. - Timelines: - - Call list_timelines to find a match by title or date. - - If found: call update_timeline to edit fields or mark it complete. - - If NOT found: call create_timeline with isAiSuggested=1, isApproved=0. - Notes: - - Call list_notes to find a match by title or topic, then get_note to - read its current content. - - If found: call update_note with the merged content. - - If NOT found: call create_note with isAiSuggested=1, isApproved=0. - Projects: - - Call list_all_projects to check for a match first. - - Only call create_project if the information is clearly significant and - no existing project matches. Set isAiSuggested=1, isApproved=0. - {project_context} Files to process: @@ -168,7 +186,6 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool: try: now = datetime.now(timezone.utc) if last_run_at is None: - # Validate the expression before deciding this is overdue. croniter(schedule_cron, now) return True ts = last_run_at @@ -179,7 +196,7 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool: return now >= next_run except Exception as exc: logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc) - return False # Fail-safe: don't trigger if expression is invalid. + return False # ── WS executor for agent context ───────────────────────────────────────── @@ -207,7 +224,7 @@ def _make_agent_executor( return _executor -# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ────────── +# ── LLM tool-calling loop ───────────────────────────────────────────────── def _as_text(content: Any) -> str: @@ -235,11 +252,7 @@ async def _run_agent_with_tools( tools: list[Any], max_steps: int, ) -> str: - """Run an LLM agent with tool-calling, returning the final text response. - - Follows the same pattern as ``deep_agent._run_single_agent``: - bind tools → invoke → handle tool calls → repeat until final text. - """ + """Run an LLM agent with tool-calling, returning the final text response.""" llm = get_llm() llm_with_tools = llm.bind_tools(tools) messages: list[Any] = [ @@ -247,7 +260,6 @@ async def _run_agent_with_tools( HumanMessage(content=user_message), ] - tool_calls_count = 0 tool_map = {tool_def.name: tool_def for tool_def in tools} for _ in range(max_steps): @@ -258,7 +270,6 @@ async def _run_agent_with_tools( return _as_text(response.content) for call in response.tool_calls: - tool_calls_count += 1 call_id = str(call.get("id", "")) call_name = str(call.get("name", "")) call_args = call.get("args", {}) @@ -277,47 +288,19 @@ async def _run_agent_with_tools( logger.info( "agent_runner: tool_result name=%s output=%s", call_name, - str(tool_output)[:1200], + str(tool_output)[:200], ) messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) - # Fallback: exceeded max steps, get final response without tools. final = await llm.ainvoke(messages) return _as_text(final.content) -# ── Triage map parser ───────────────────────────────────────────────────── - - -def _parse_triage_map(raw: str) -> dict[str, list[str]] | None: - """Extract the JSON triage map from the LLM's final response.""" - text = raw.strip() - # Try direct parse first. - try: - parsed = json.loads(text) - if isinstance(parsed, dict): - return {k: v for k, v in parsed.items() if isinstance(v, list)} - except json.JSONDecodeError: - pass - - # Try extracting JSON from markdown fences or surrounding text. - import re - match = re.search(r"\{[\s\S]*\}", text) - if match: - try: - parsed = json.loads(match.group(0)) - if isinstance(parsed, dict): - return {k: v for k, v in parsed.items() if isinstance(v, list)} - except json.JSONDecodeError: - pass - return None - - # ── Tool list builder ───────────────────────────────────────────────────── def _build_processing_tools(data_types: list[str]) -> list[Any]: - """Build the tool list for Phase 2 based on user's data_types selection.""" + """Build the tool list for processing based on user's data_types selection.""" tools: list[Any] = list(FILESYSTEM_TOOLS) for dt in data_types: dt_tools = _DATA_TYPE_TOOLS.get(dt) @@ -326,7 +309,223 @@ def _build_processing_tools(data_types: list[str]) -> list[Any]: return tools -# ── Local agent runner (two-phase) ───────────────────────────────────────── +# ── Code-based directory scanner ───────────────────────────────────────── + + +async def _scan_directories( + paths: list[str], + extensions: list[str], + last_run_at: datetime | None, +) -> list[str]: + """Walk directories via WS tool calls and return filtered file paths. + + Recursion is capped at ``_MAX_SCAN_DEPTH``. Files are filtered by + extension (if configured) and by modification date (if ``last_run_at`` + is set). Fails open: if metadata cannot be read, the file is included. + """ + all_files: list[str] = [] + ext_set = {e.lstrip(".").lower() for e in extensions} if extensions else set() + + async def _walk(path: str, depth: int) -> None: + if depth > _MAX_SCAN_DEPTH: + return + try: + result = await execute_on_client(action="list_directory", data={"path": path}) + except Exception as exc: + logger.warning("agent_runner: list_directory failed %r: %s", path, exc) + return + for entry in result.get("entries", []): + entry_path = entry.get("path", "") + if not entry_path: + continue + if entry.get("type") == "directory": + await _walk(entry_path, depth + 1) + elif entry.get("type") == "file": + if ext_set: + dot_pos = entry_path.rfind(".") + file_ext = entry_path[dot_pos + 1:].lower() if dot_pos != -1 else "" + if file_ext not in ext_set: + continue + all_files.append(entry_path) + + for root in paths: + await _walk(root, depth=0) + + if last_run_at is None: + return all_files + + # Filter by modification date. + last_run_ms = int(last_run_at.timestamp() * 1000) + filtered: list[str] = [] + for file_path in all_files: + try: + meta = await execute_on_client(action="get_file_metadata", data={"path": file_path}) + modified_at = meta.get("modifiedAt") + if modified_at is None: + filtered.append(file_path) + continue + if isinstance(modified_at, (int, float)): + mod_ms = int(modified_at) + else: + mod_ms = int(datetime.fromisoformat(str(modified_at)).timestamp() * 1000) + if mod_ms > last_run_ms: + filtered.append(file_path) + except Exception: + filtered.append(file_path) # fail-open + + return filtered + + +# ── Code-based entity fetchers ──────────────────────────────────────────── + + +async def _fetch_projects() -> list[dict]: + """Fetch all projects from the Electron client via WS.""" + try: + result = await execute_on_client(action="select", table="projects") + return result.get("rows", []) + except Exception as exc: + logger.warning("agent_runner: failed to fetch projects: %s", exc) + return [] + + +_DOMAIN_TABLE: dict[str, str] = { + "tasks": "tasks", + "notes": "notes", + "timelines": "timelines", + "projects": "projects", +} + + +async def _fetch_domain_entities(domain: str, project_id: str) -> list[dict]: + """Fetch existing rows for a domain, scoped to a project where applicable.""" + table = _DOMAIN_TABLE.get(domain) + if not table: + return [] + filters: dict[str, Any] = {} + if project_id != "standalone" and domain != "projects": + filters["projectId"] = project_id + try: + result = await execute_on_client( + action="select", + table=table, + filters=filters if filters else None, + ) + return result.get("rows", []) + except Exception as exc: + logger.warning("agent_runner: failed to fetch %s: %s", domain, exc) + return [] + + +def _format_entities_for_context(domain: str, rows: list[dict]) -> str: + """Format existing entity rows as a readable context block for the LLM. + + Includes enough detail per record for the LLM to make a confident + update-vs-create decision without overwhelming the context. + Note content is truncated to 200 chars to stay within token budget. + """ + if not rows: + return f"No existing {domain}." + lines: list[str] = [] + for r in rows: + if domain == "tasks": + desc = r.get("description") or "" + desc_part = f" — {desc[:120]}" if desc else "" + assignee = r.get("assignee") or r.get("assignees") or "" + due = r.get("dueDate") or r.get("due_date") or "" + meta = ", ".join(filter(None, [ + f"priority: {r.get('priority', '')}" if r.get("priority") else "", + f"assignee: {assignee}" if assignee else "", + f"due: {due}" if due else "", + ])) + lines.append( + f" - [{r.get('status', '?')}] {r.get('title', '')}{desc_part}" + f" ({meta}, id: {r['id']})" + ) + elif domain == "notes": + snippet = (r.get("content") or "")[:200].replace("\n", " ") + snippet_part = f"\n Preview: {snippet}" if snippet else "" + lines.append( + f" - {r.get('title', '')} (id: {r['id']}){snippet_part}" + ) + elif domain == "timelines": + lines.append( + f" - {r.get('title', '')} date={r.get('date', '')} (id: {r['id']})" + ) + elif domain == "projects": + summary = (r.get("aiSummary") or r.get("ai_summary") or "")[:120] + summary_part = f" — {summary}" if summary else "" + lines.append( + f" - {r.get('name', '')} [{r.get('status', '')}]{summary_part}" + f" (id: {r['id']})" + ) + return f"Existing {domain}:\n" + "\n".join(lines) + + +# ── Step 1: LLM file classifier ─────────────────────────────────────────── + + +async def _classify_file( + file_path: str, + file_content: str, + projects: list[dict], + config_data_types: list[str], +) -> tuple[str, list[str]]: + """Call the LLM to classify a file by project and relevant domains. + + Returns ``(project_id_or_"standalone", domains)``. + Falls back to ``("standalone", config_data_types)`` on any error. + """ + fallback = ("standalone", list(config_data_types)) + + if not file_content.strip(): + return fallback + + projects_list = "\n".join( + f" - {p.get('name', '')} (id: {p['id']}, status: {p.get('status', '')})" + for p in projects + ) or " (none — all files are standalone)" + + domain_definitions = "\n".join( + f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}" + for d in config_data_types + if d in _DOMAIN_DESCRIPTIONS + ) + + system = _STEP1_SYSTEM_PROMPT.format( + domain_definitions=domain_definitions, + projects_list=projects_list, + ) + + llm = get_llm() + try: + response = await llm.ainvoke([ + SystemMessage(content=system), + HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), + ]) + raw = _as_text(response.content).strip() + # Strip markdown fences if the model wraps the JSON. + if raw.startswith("```"): + raw = raw.split("```")[1] + if raw.startswith("json"): + raw = raw[4:] + parsed = json.loads(raw.strip()) + project_id: str = str(parsed.get("project_id") or "standalone") + domains: list[str] = [ + d for d in parsed.get("domains", []) + if d in config_data_types + ] + if not domains: + domains = list(config_data_types) + return project_id, domains + except Exception as exc: + logger.warning( + "agent_runner: step1 classification failed for %r: %s", file_path, exc + ) + return fallback + + +# ── Local agent runner (two-step per file) ──────────────────────────────── async def run_local_agent( @@ -336,24 +535,28 @@ async def run_local_agent( device_mgr: DeviceConnectionManager, run_context: dict | None = None, ) -> None: - """Execute a local directory agent run using two-phase LLM-with-tools. + """Execute a local directory agent run using a two-step approach per file. - Phase 1 — Triage: - Explore the directory structure, check metadata, match files to - existing projects. Output: a JSON map of project → file paths. + Step 1 — Classification (code + 1 LLM call per file, no tools): + Code scans directories and fetches all projects via WS. + For each file, LLM identifies the project and relevant domains. - Phase 2 — Processing: - For each project group, read full file contents and perform CRUD - operations using the standard entity tools. + Step 2 — Processing (code + 1 LLM call per file, with tools): + Code fetches existing entities for the identified project/domains. + LLM receives file content + existing entities in context and uses + tools to update existing records or create new ones. """ run_id = run_log.id + agent_id = (run_context or {}).get("agent_id") or config.id + _running_agents.add(agent_id) # ── Device online check ───────────────────────────────────────── target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else "" - if target_device_id: - is_online = device_mgr.is_online(user_id, target_device_id) - else: - is_online = device_mgr.is_online(user_id) + is_online = ( + device_mgr.is_online(user_id, target_device_id) + if target_device_id + else device_mgr.is_online(user_id) + ) if not is_online: logger.info( @@ -377,116 +580,112 @@ async def run_local_agent( items_processed = 0 items_created = 0 + custom_section = ( + f"User instructions:\n{config.prompt_template}" + if config.prompt_template + else "" + ) + try: - # ── Phase 1: Triage ───────────────────────────────────────── - logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id) - - last_run_str = "never (process all files)" - if config.last_run_at: - last_run_str = config.last_run_at.isoformat() - - custom_section = "" - if config.prompt_template: - custom_section = f"User instructions:\n{config.prompt_template}" - - file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all" - - triage_prompt = _TRIAGE_SYSTEM_PROMPT.format( - last_run_at=last_run_str, - custom_prompt_section=custom_section, - data_types=", ".join(config.data_types), - file_extensions=file_ext_str, + # ── Code: scan directories ─────────────────────────────────── + logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id) + file_paths = await _scan_directories( + paths=config.directory_paths, + extensions=config.file_extensions or [], + last_run_at=config.last_run_at, + ) + logger.info( + "agent_runner: run=%s found %d file(s) after filtering", run_id, len(file_paths) ) - directory_paths = config.directory_paths - triage_user_msg = ( - f"Explore these directories and produce the triage map:\n" - f"{json.dumps(directory_paths, ensure_ascii=False)}" - ) - - triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS) - - triage_response = await _run_agent_with_tools( - system_prompt=triage_prompt, - user_message=triage_user_msg, - tools=triage_tools, - max_steps=_MAX_TRIAGE_STEPS, - ) - - triage_map = _parse_triage_map(triage_response) - if not triage_map: - errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}") - await _finalize_run(run_log, status="error", errors=errors) + if not file_paths: + await _finalize_run(run_log, status="success", items_processed=0, items_created=0) return - logger.info( - "agent_runner: run=%s triage complete groups=%d total_files=%d", - run_id, - len(triage_map), - sum(len(files) for files in triage_map.values()), - ) + # ── Code: fetch all projects once ──────────────────────────── + projects = await _fetch_projects() - # ── Phase 2: Processing (per group) ───────────────────────── + # ── Per-file processing ────────────────────────────────────── processing_tools = _build_processing_tools(config.data_types) - for group_key, file_paths in triage_map.items(): - if not file_paths: - continue - - logger.info( - "agent_runner: run=%s phase=processing group=%s files=%d", - run_id, - group_key, - len(file_paths), - ) - - # Build project context for the LLM. - if group_key == "standalone": - project_context = "These files are not associated with any existing project." - else: - project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records." - - file_list_str = "\n".join(f"- {fp}" for fp in file_paths) - - processing_prompt = _PROCESSING_BASE_PROMPT.format( - data_types=", ".join(config.data_types), - project_context=project_context, - file_list=file_list_str, - custom_prompt_section=custom_section, - ) - - items_processed += len(file_paths) - + for file_path in file_paths: try: + # Read file content via code. + file_result = await execute_on_client( + action="read_file_content", data={"path": file_path} + ) + file_content: str = file_result.get("content", "") + if not file_content: + logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path) + continue + + items_processed += 1 + + # Step 1 — classify file. + project_id, domains = await _classify_file( + file_path=file_path, + file_content=file_content, + projects=projects, + config_data_types=config.data_types, + ) + logger.info( + "agent_runner: run=%s file=%r → project=%s domains=%s", + run_id, + file_path, + project_id, + domains, + ) + + # Step 2 — fetch existing entities for this project + domains. + existing_blocks: list[str] = [] + for domain in domains: + rows = await _fetch_domain_entities(domain, project_id) + existing_blocks.append(_format_entities_for_context(domain, rows)) + + existing_context = "\n\n".join(existing_blocks) + + if project_id == "standalone": + project_context = "This file is not associated with any existing project." + else: + project_context = ( + f"This file belongs to project ID: {project_id}. " + "Use this project_id when creating records." + ) + + system_prompt = _PROCESSING_SYSTEM_PROMPT.format( + existing_context=existing_context, + project_context=project_context, + data_types=", ".join(domains), + custom_prompt_section=custom_section, + ) + result_text = await _run_agent_with_tools( - system_prompt=processing_prompt, - user_message="Process the listed files now.", + system_prompt=system_prompt, + user_message=( + f"Process this file and extract relevant information.\n\n" + f"File: {file_path}\n\nContent:\n{file_content}" + ), tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, ) logger.info( - "agent_runner: run=%s group=%s processing_result=%s", + "agent_runner: run=%s file=%r result=%s", run_id, - group_key, - result_text[:500], + file_path, + result_text[:200], ) - # Count created items by scanning tool call results. - # The tools themselves handle creation; we estimate from the - # summary. A more precise count would require intercepting - # tool results, but the summary is sufficient for the run log. + except Exception as exc: - errors.append(f"Processing error for group '{group_key}': {exc}") + errors.append(f"Error processing '{file_path}': {exc}") logger.error( - "agent_runner: run=%s group=%s processing failed: %s", - run_id, - group_key, - exc, + "agent_runner: run=%s file=%r failed: %s", run_id, file_path, exc ) except Exception as exc: errors.append(f"Agent run failed: {exc}") logger.error("agent_runner: run=%s failed: %s", run_id, exc) finally: + _running_agents.discard(agent_id) clear_client_executor() # ── Finalise ──────────────────────────────────────────────────── @@ -503,9 +702,6 @@ async def run_local_agent( items_processed=items_processed, items_created=items_created, errors=errors, - update_config_last_run=False, - config_id=config.id, - config_type="local", ) logger.info( "agent_runner: run=%s done status=%s processed=%d errors=%d", @@ -515,8 +711,7 @@ async def run_local_agent( len(errors), ) - # Notify the Electron client that the run is complete so it can close - # the run record in its local SQLite. + # Notify Electron that the run is complete. if run_context and device_mgr.is_online(user_id): try: await device_mgr.send_frame(user_id, { @@ -525,12 +720,13 @@ async def run_local_agent( "status": final_status, }) except Exception as exc: - logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc) + logger.warning( + "agent_runner: run=%s failed to send run_complete: %s", run_id, exc + ) # ── Cloud agent runner ───────────────────────────────────────────────────── -# Default lookback window when an agent has never run before. _CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7 @@ -544,8 +740,7 @@ async def run_cloud_agent( Steps: - 1. Verify the user's device is online — results are pushed to Electron - via WS tool-call frames. If no device is connected, abort. + 1. Verify the user's device is online. 2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``. 3. Instantiate the provider client (Gmail or MS Graph). 4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for @@ -598,11 +793,7 @@ async def run_cloud_agent( try: provider = get_provider(config.provider, credentials_info) except ValueError as exc: - await _finalize_run( - run_log, - status="error", - errors=[str(exc)], - ) + await _finalize_run(run_log, status="error", errors=[str(exc)]) return # ── 4. Fetch messages ───────────────────────────────────────────── @@ -636,9 +827,7 @@ async def run_cloud_agent( raw_messages = [] except RuntimeError as exc: logger.error( - "agent_runner: provider fetch failed for cloud agent %s: %s", - config.id, - exc, + "agent_runner: provider fetch failed for cloud agent %s: %s", config.id, exc ) await _finalize_run( run_log, @@ -664,9 +853,11 @@ async def run_cloud_agent( try: processing_tools = _build_processing_tools(config.data_types) - custom_section = "" - if config.prompt_template: - custom_section = f"User instructions:\n{config.prompt_template}" + custom_section = ( + f"User instructions:\n{config.prompt_template}" + if config.prompt_template + else "" + ) for msg in raw_messages: content_text = msg.as_text @@ -674,7 +865,7 @@ async def run_cloud_agent( continue items_processed += 1 - processing_prompt = _PROCESSING_BASE_PROMPT.format( + processing_prompt = _CLOUD_PROCESSING_PROMPT.format( data_types=", ".join(config.data_types), project_context="Determine the appropriate project from the message context.", file_list=f"Message from {config.provider} (id: {msg.id})", @@ -708,7 +899,11 @@ async def run_cloud_agent( await db.commit() logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id) except Exception as exc: - logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc) + logger.warning( + "agent_runner: failed to persist refreshed token for agent %s: %s", + config.id, + exc, + ) # ── 8. Finalise ──────────────────────────────────────────────────── if errors and items_created == 0: @@ -749,12 +944,6 @@ async def trigger_pending_runs( """Dispatch any overdue agent runs after an Electron device connects. Called as a background task from the device WS endpoint on ``device_hello``. - - Scheduling rules: - - * **Local agents**: only triggered when ``config.device_id == device_id``. - * **Cloud agents**: triggered on any connected device (no device binding). - * Runs execute **sequentially** to avoid flooding the WS connection. """ logger.info( "agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)", @@ -778,11 +967,7 @@ async def _finalize_run( config_id: str | None = None, config_type: str | None = None, ) -> None: - """Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``. - - Uses a fresh DB session so this is safe to call from background tasks - after the original request session has closed. - """ + """Persist the run outcome and optionally update ``last_run_at`` on the config.""" now = datetime.now(timezone.utc) try: async with async_session() as db: