"""Agent run orchestrator. Drives two agent types: * **Local directory agent** — two-phase execution that mirrors the ``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the user's directory via file-system tools and groups files by project. Phase 2 (Processing) reads full file contents and performs CRUD operations using the standard entity tools (tasks, notes, etc.). * **Cloud connector agent** — fetches data from third-party APIs (Gmail, Teams, Outlook) and pushes extracted items to Electron. Usage ----- Background tasks are spawned with ``asyncio.create_task()``:: asyncio.create_task(run_local_agent(user_id, config, run_log, device_manager)) asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager)) The ``trigger_pending_runs`` function is called by the device WS endpoint when Electron sends ``device_hello``, so any overdue runs fire immediately when the device reconnects. """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime, timedelta, timezone from typing import Any from croniter import croniter from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from sqlalchemy import select from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.device_manager import DeviceConnectionManager from app.core.llm import get_llm from app.core.ws_context import clear_client_executor, set_client_executor from app.db import async_session from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig logger = logging.getLogger(__name__) # ── Timeouts ─────────────────────────────────────────────────────────────── # Max seconds to wait for a single tool-call round-trip (FE → BE). _TOOL_CALL_TIMEOUT: int = 30 # Max LLM reasoning steps per phase. _MAX_TRIAGE_STEPS: int = 10 _MAX_PROCESSING_STEPS: int = 12 # ── Data-type to tool mapping ───────────────────────────────────────────── _DATA_TYPE_TOOLS: dict[str, list[Any]] = { "tasks": TASK_TOOLS, "projects": PROJECT_TOOLS, "notes": NOTE_TOOLS, "timelines": TIMELINE_TOOLS, } # ── Triage prompt ───────────────────────────────────────────────────────── _TRIAGE_SYSTEM_PROMPT = """\ You are a file triage assistant for a freelance project management tool. Your job is to explore a local directory on the user's device, understand its structure, and group files by project context. You have access to these tools: - list_directory: to map folder structure - get_file_metadata: to check creation/modification dates - read_file_content: to read brief snippets when needed for categorisation - list_projects / list_all_projects / get_project: to fetch existing projects from the user's workspace and match files to them Instructions: 1. Start by calling list_directory on the configured root path. 2. Explore subdirectories as needed to understand the structure. 3. Use get_file_metadata to check modification dates. Skip files that have NOT been modified since: {last_run_at}. 4. Call list_all_projects to get the user's existing projects. 5. Match files to existing projects by name, folder structure, or content hints. 6. If files don't match any existing project, group them under "standalone". {custom_prompt_section} Target entity types to extract: {data_types} File extensions to consider: {file_extensions} When you have finished exploring, output ONLY a JSON object (no markdown fences, no explanation) mapping project IDs or "standalone" to file path arrays: {{"": ["", ...], "standalone": ["", ...]}} Return ONLY the JSON object as your final message. """ # ── Processing prompt ───────────────────────────────────────────────────── _PROCESSING_BASE_PROMPT = """\ You are a data extraction and management assistant for a freelance project management tool. Available tools: Filesystem : read_file_content, list_directory, get_file_metadata Tasks : list_tasks, create_task, update_task, add_task_comment Notes : list_notes, get_note, create_note, update_note Timelines : list_timelines, create_timeline, update_timeline Projects : list_all_projects, get_project, create_project, update_project Your task: 1. Read the full content of each file below using read_file_content. 2. For each piece of information found, ALWAYS try to match and update an existing record before creating a new one. 3. ONLY act on these entity types: {data_types}. 4. Do NOT invent data. Only extract what is clearly present in the files. 5. If a file contains no relevant data for the target entity types, skip it. Update-first rules (apply in this order): Tasks: - Call list_tasks to find a match by title or context. - If found: call add_task_comment (author "Adiuva"), update_task to set assignees, state (ToDo / In Progress / Completed), or other fields. - If NOT found: call create_task with isAiSuggested=1, isApproved=0. Timelines: - Call list_timelines to find a match by title or date. - If found: call update_timeline to edit fields or mark it complete. - If NOT found: call create_timeline with isAiSuggested=1, isApproved=0. Notes: - Call list_notes to find a match by title or topic, then get_note to read its current content. - If found: call update_note with the merged content. - If NOT found: call create_note with isAiSuggested=1, isApproved=0. Projects: - Call list_all_projects to check for a match first. - Only call create_project if the information is clearly significant and no existing project matches. Set isAiSuggested=1, isApproved=0. {project_context} Files to process: {file_list} {custom_prompt_section} After processing all files, respond with a brief summary of what you updated and what you created. """ # ── Cron helper ──────────────────────────────────────────────────────────── def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool: """Return ``True`` if the next scheduled run time has already passed. Always validates the cron expression first — an invalid expression returns ``False`` (fail-safe: never trigger an unparseable schedule). """ try: now = datetime.now(timezone.utc) if last_run_at is None: # Validate the expression before deciding this is overdue. croniter(schedule_cron, now) return True ts = last_run_at if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) cron = croniter(schedule_cron, ts) next_run: datetime = cron.get_next(datetime) return now >= next_run except Exception as exc: logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc) return False # Fail-safe: don't trigger if expression is invalid. # ── WS executor for agent context ───────────────────────────────────────── def _make_agent_executor( user_id: str, device_mgr: DeviceConnectionManager, run_context: dict | None = None, ) -> Any: """Create a WS callback for ``set_client_executor()`` so that all tools can use ``execute_on_client()`` during an agent run. If *run_context* is provided it is attached to every ``tool_call`` frame so the Electron client can attribute actions to the correct agent run. """ async def _executor(payload: dict) -> dict: payload["type"] = "tool_call" if run_context: payload["run_context"] = run_context call_id = payload["id"] fut = device_mgr.create_pending_call(user_id, call_id) await device_mgr.send_frame(user_id, payload) return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT) return _executor # ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ────────── def _as_text(content: Any) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): text = item.get("text") if isinstance(text, str): parts.append(text) return "".join(parts) return str(content) async def _run_agent_with_tools( *, system_prompt: str, user_message: str, tools: list[Any], max_steps: int, ) -> str: """Run an LLM agent with tool-calling, returning the final text response. Follows the same pattern as ``deep_agent._run_single_agent``: bind tools → invoke → handle tool calls → repeat until final text. """ llm = get_llm() llm_with_tools = llm.bind_tools(tools) messages: list[Any] = [ SystemMessage(content=system_prompt), HumanMessage(content=user_message), ] tool_calls_count = 0 tool_map = {tool_def.name: tool_def for tool_def in tools} for _ in range(max_steps): response: AIMessage = await llm_with_tools.ainvoke(messages) messages.append(response) if not response.tool_calls: return _as_text(response.content) for call in response.tool_calls: tool_calls_count += 1 call_id = str(call.get("id", "")) call_name = str(call.get("name", "")) call_args = call.get("args", {}) logger.info( "agent_runner: tool_call name=%s args=%s", call_name, json.dumps(call_args, ensure_ascii=True)[:800], ) tool_fn = tool_map.get(call_name) if tool_fn is None: tool_output = f"Unknown tool: {call_name}" else: tool_output = await tool_fn.ainvoke(call_args) logger.info( "agent_runner: tool_result name=%s output=%s", call_name, str(tool_output)[:1200], ) messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) # Fallback: exceeded max steps, get final response without tools. final = await llm.ainvoke(messages) return _as_text(final.content) # ── Triage map parser ───────────────────────────────────────────────────── def _parse_triage_map(raw: str) -> dict[str, list[str]] | None: """Extract the JSON triage map from the LLM's final response.""" text = raw.strip() # Try direct parse first. try: parsed = json.loads(text) if isinstance(parsed, dict): return {k: v for k, v in parsed.items() if isinstance(v, list)} except json.JSONDecodeError: pass # Try extracting JSON from markdown fences or surrounding text. import re match = re.search(r"\{[\s\S]*\}", text) if match: try: parsed = json.loads(match.group(0)) if isinstance(parsed, dict): return {k: v for k, v in parsed.items() if isinstance(v, list)} except json.JSONDecodeError: pass return None # ── Tool list builder ───────────────────────────────────────────────────── def _build_processing_tools(data_types: list[str]) -> list[Any]: """Build the tool list for Phase 2 based on user's data_types selection.""" tools: list[Any] = list(FILESYSTEM_TOOLS) for dt in data_types: dt_tools = _DATA_TYPE_TOOLS.get(dt) if dt_tools: tools.extend(dt_tools) return tools # ── Local agent runner (two-phase) ───────────────────────────────────────── async def run_local_agent( user_id: str, config: LocalAgentConfig, run_log: AgentRunLog, device_mgr: DeviceConnectionManager, run_context: dict | None = None, ) -> None: """Execute a local directory agent run using two-phase LLM-with-tools. Phase 1 — Triage: Explore the directory structure, check metadata, match files to existing projects. Output: a JSON map of project → file paths. Phase 2 — Processing: For each project group, read full file contents and perform CRUD operations using the standard entity tools. """ run_id = run_log.id # ── Device online check ───────────────────────────────────────── target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else "" if target_device_id: is_online = device_mgr.is_online(user_id, target_device_id) else: is_online = device_mgr.is_online(user_id) if not is_online: logger.info( "agent_runner: skip run=%s — device %r offline for user=%s", run_id, target_device_id or "", user_id, ) await _finalize_run( run_log, status="error", errors=[f"Device {target_device_id or ''!r} is not connected"], ) return # ── Set up WS executor for tools ──────────────────────────────── executor = _make_agent_executor(user_id, device_mgr, run_context) set_client_executor(executor) errors: list[str] = [] items_processed = 0 items_created = 0 try: # ── Phase 1: Triage ───────────────────────────────────────── logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id) last_run_str = "never (process all files)" if config.last_run_at: last_run_str = config.last_run_at.isoformat() custom_section = "" if config.prompt_template: custom_section = f"User instructions:\n{config.prompt_template}" file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all" triage_prompt = _TRIAGE_SYSTEM_PROMPT.format( last_run_at=last_run_str, custom_prompt_section=custom_section, data_types=", ".join(config.data_types), file_extensions=file_ext_str, ) directory_paths = config.directory_paths triage_user_msg = ( f"Explore these directories and produce the triage map:\n" f"{json.dumps(directory_paths, ensure_ascii=False)}" ) triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS) triage_response = await _run_agent_with_tools( system_prompt=triage_prompt, user_message=triage_user_msg, tools=triage_tools, max_steps=_MAX_TRIAGE_STEPS, ) triage_map = _parse_triage_map(triage_response) if not triage_map: errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}") await _finalize_run(run_log, status="error", errors=errors) return logger.info( "agent_runner: run=%s triage complete groups=%d total_files=%d", run_id, len(triage_map), sum(len(files) for files in triage_map.values()), ) # ── Phase 2: Processing (per group) ───────────────────────── processing_tools = _build_processing_tools(config.data_types) for group_key, file_paths in triage_map.items(): if not file_paths: continue logger.info( "agent_runner: run=%s phase=processing group=%s files=%d", run_id, group_key, len(file_paths), ) # Build project context for the LLM. if group_key == "standalone": project_context = "These files are not associated with any existing project." else: project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records." file_list_str = "\n".join(f"- {fp}" for fp in file_paths) processing_prompt = _PROCESSING_BASE_PROMPT.format( data_types=", ".join(config.data_types), project_context=project_context, file_list=file_list_str, custom_prompt_section=custom_section, ) items_processed += len(file_paths) try: result_text = await _run_agent_with_tools( system_prompt=processing_prompt, user_message="Process the listed files now.", tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, ) logger.info( "agent_runner: run=%s group=%s processing_result=%s", run_id, group_key, result_text[:500], ) # Count created items by scanning tool call results. # The tools themselves handle creation; we estimate from the # summary. A more precise count would require intercepting # tool results, but the summary is sufficient for the run log. except Exception as exc: errors.append(f"Processing error for group '{group_key}': {exc}") logger.error( "agent_runner: run=%s group=%s processing failed: %s", run_id, group_key, exc, ) except Exception as exc: errors.append(f"Agent run failed: {exc}") logger.error("agent_runner: run=%s failed: %s", run_id, exc) finally: clear_client_executor() # ── Finalise ──────────────────────────────────────────────────── if errors and items_processed == 0: final_status = "error" elif errors: final_status = "partial" else: final_status = "success" await _finalize_run( run_log, status=final_status, items_processed=items_processed, items_created=items_created, errors=errors, update_config_last_run=False, config_id=config.id, config_type="local", ) logger.info( "agent_runner: run=%s done status=%s processed=%d errors=%d", run_id, final_status, items_processed, len(errors), ) # Notify the Electron client that the run is complete so it can close # the run record in its local SQLite. if run_context and device_mgr.is_online(user_id): try: await device_mgr.send_frame(user_id, { "type": "run_complete", "run_context": run_context, "status": final_status, }) except Exception as exc: logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc) # ── Cloud agent runner ───────────────────────────────────────────────────── # Default lookback window when an agent has never run before. _CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7 async def run_cloud_agent( user_id: str, config: CloudAgentConfig, run_log: AgentRunLog, device_mgr: DeviceConnectionManager, ) -> None: """Execute a cloud connector agent run end-to-end. Steps: 1. Verify the user's device is online — results are pushed to Electron via WS tool-call frames. If no device is connected, abort. 2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``. 3. Instantiate the provider client (Gmail or MS Graph). 4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for the first run) applying ``config.filter_config`` filters. 5. For each message/email call the LLM to extract structured items. 6. Push each item to Electron as an ``insert`` tool-call. 7. If the provider refreshed its access token, re-encrypt and write it back to ``config.oauth_token_encrypted``. 8. Persist the run outcome via ``_finalize_run``. """ run_id = run_log.id # ── 1. Device online check ───────────────────────────────────────── if not device_mgr.is_online(user_id): logger.info( "agent_runner: skip cloud run=%s — no device online for user=%s", run_id, user_id, ) await _finalize_run( run_log, status="error", errors=["No connected device — cloud agent results cannot be delivered"], ) return # ── 2. Decrypt OAuth token ───────────────────────────────────────── from app.integrations import decrypt_token, encrypt_token, get_provider if not config.oauth_token_encrypted: await _finalize_run( run_log, status="error", errors=[f"No OAuth token stored for cloud agent '{config.name}'"], ) return try: credentials_info = decrypt_token(config.oauth_token_encrypted) except ValueError as exc: logger.error("agent_runner: failed to decrypt OAuth token for agent %s: %s", config.id, exc) await _finalize_run( run_log, status="error", errors=[f"Failed to decrypt OAuth token: {exc}"], ) return # ── 3. Instantiate provider client ──────────────────────────────── try: provider = get_provider(config.provider, credentials_info) except ValueError as exc: await _finalize_run( run_log, status="error", errors=[str(exc)], ) return # ── 4. Fetch messages ───────────────────────────────────────────── since: datetime | None = config.last_run_at if since is None: since = datetime.now(timezone.utc) - timedelta(days=_CLOUD_DEFAULT_LOOKBACK_DAYS) if since.tzinfo is None: since = since.replace(tzinfo=timezone.utc) errors: list[str] = [] items_processed = 0 items_created = 0 try: if config.provider == "gmail": raw_messages = await provider.fetch_messages( # type: ignore[union-attr] filter_config=config.filter_config, since=since, ) elif config.provider == "outlook": raw_messages = await provider.fetch_emails( # type: ignore[union-attr] filter_config=config.filter_config, since=since, ) elif config.provider == "teams": raw_messages = await provider.fetch_messages( # type: ignore[union-attr] filter_config=config.filter_config, since=since, ) else: raw_messages = [] except RuntimeError as exc: logger.error( "agent_runner: provider fetch failed for cloud agent %s: %s", config.id, exc, ) await _finalize_run( run_log, status="error", errors=[f"Provider fetch failed: {exc}"], update_config_last_run=True, config_id=config.id, config_type="cloud", ) return logger.info( "agent_runner: cloud agent %s fetched %d item(s) from %s for user=%s", config.id, len(raw_messages), config.provider, user_id, ) # ── 5–6. Extract + insert via LLM with tools ───────────────────── executor = _make_agent_executor(user_id, device_mgr) set_client_executor(executor) try: processing_tools = _build_processing_tools(config.data_types) custom_section = "" if config.prompt_template: custom_section = f"User instructions:\n{config.prompt_template}" for msg in raw_messages: content_text = msg.as_text if not content_text: continue items_processed += 1 processing_prompt = _PROCESSING_BASE_PROMPT.format( data_types=", ".join(config.data_types), project_context="Determine the appropriate project from the message context.", file_list=f"Message from {config.provider} (id: {msg.id})", custom_prompt_section=custom_section, ) try: await _run_agent_with_tools( system_prompt=processing_prompt, user_message=f"Process this message content:\n\n{content_text[:8000]}", tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, ) except Exception as exc: errors.append(f"LLM processing error for message {msg.id!r}: {exc}") finally: clear_client_executor() # ── 7. Persist refreshed token (if any) ─────────────────────────── refreshed = getattr(provider, "refreshed_credentials", None) if refreshed: try: new_encrypted = encrypt_token(refreshed) async with async_session() as db: cfg_result = await db.execute( select(CloudAgentConfig).where(CloudAgentConfig.id == config.id) ) cfg_row = cfg_result.scalar_one_or_none() if cfg_row: cfg_row.oauth_token_encrypted = new_encrypted await db.commit() logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id) except Exception as exc: logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc) # ── 8. Finalise ──────────────────────────────────────────────────── if errors and items_created == 0: final_status = "error" elif errors: final_status = "partial" else: final_status = "success" await _finalize_run( run_log, status=final_status, items_processed=items_processed, items_created=items_created, errors=errors, update_config_last_run=True, config_id=config.id, config_type="cloud", ) logger.info( "agent_runner: cloud run=%s done status=%s processed=%d created=%d errors=%d", run_id, final_status, items_processed, items_created, len(errors), ) # ── Pending-run trigger ───────────────────────────────────────────────────── async def trigger_pending_runs( user_id: str, device_id: str, device_mgr: DeviceConnectionManager, ) -> None: """Dispatch any overdue agent runs after an Electron device connects. Called as a background task from the device WS endpoint on ``device_hello``. Scheduling rules: * **Local agents**: only triggered when ``config.device_id == device_id``. * **Cloud agents**: triggered on any connected device (no device binding). * Runs execute **sequentially** to avoid flooding the WS connection. """ logger.info( "agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)", user_id, device_id, ) return # ── Internal helper ───────────────────────────────────────────────────────── async def _finalize_run( run_log: AgentRunLog, *, status: str, items_processed: int = 0, items_created: int = 0, errors: list[str] | None = None, update_config_last_run: bool = False, config_id: str | None = None, config_type: str | None = None, ) -> None: """Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``. Uses a fresh DB session so this is safe to call from background tasks after the original request session has closed. """ now = datetime.now(timezone.utc) try: async with async_session() as db: managed = await db.merge(run_log) managed.status = status managed.items_processed = items_processed managed.items_created = items_created managed.errors = errors or [] managed.completed_at = now if update_config_last_run and config_id: if config_type == "local": cfg_result = await db.execute( select(LocalAgentConfig).where(LocalAgentConfig.id == config_id) ) cfg = cfg_result.scalar_one_or_none() if cfg: cfg.last_run_at = now elif config_type == "cloud": cfg_result = await db.execute( select(CloudAgentConfig).where(CloudAgentConfig.id == config_id) ) cfg = cfg_result.scalar_one_or_none() if cfg: cfg.last_run_at = now await db.commit() except Exception as exc: logger.error( "agent_runner: failed to finalize run_log=%s: %s", run_log.id, exc )