Files
api/app/core/agent_runner.py
Roberto Musso 725cece5c1 Add run_context to agent tool calls for FE run logging
- AgentTriggerRequest accepts optional agent_id (FE's stable electron-store UUID)
- _make_agent_executor injects run_context into every tool_call frame
  so Electron can attribute actions to the correct agent run
- run_local_agent accepts run_context and sends a run_complete WS frame
  when the run finishes so the FE can close the run record
- trigger_agent_run builds run_context with run_id=run_log.id and the
  stable agent_id, passes it through to run_local_agent

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 09:46:17 +01:00

817 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent run orchestrator.
Drives two agent types:
* **Local directory agent** — two-phase execution that mirrors the
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
user's directory via file-system tools and groups files by project.
Phase 2 (Processing) reads full file contents and performs CRUD
operations using the standard entity tools (tasks, notes, etc.).
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron.
Usage
-----
Background tasks are spawned with ``asyncio.create_task()``::
asyncio.create_task(run_local_agent(user_id, config, run_log, device_manager))
asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager))
The ``trigger_pending_runs`` function is called by the device WS endpoint
when Electron sends ``device_hello``, so any overdue runs fire immediately
when the device reconnects.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from croniter import croniter
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from sqlalchemy import select
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.device_manager import DeviceConnectionManager
from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, set_client_executor
from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
logger = logging.getLogger(__name__)
# ── Timeouts ───────────────────────────────────────────────────────────────
# Max seconds to wait for a single tool-call round-trip (FE → BE).
_TOOL_CALL_TIMEOUT: int = 30
# Max LLM reasoning steps per phase.
_MAX_TRIAGE_STEPS: int = 10
_MAX_PROCESSING_STEPS: int = 12
# ── Data-type to tool mapping ─────────────────────────────────────────────
_DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"tasks": TASK_TOOLS,
"projects": PROJECT_TOOLS,
"notes": NOTE_TOOLS,
"timelines": TIMELINE_TOOLS,
}
# ── Triage prompt ─────────────────────────────────────────────────────────
_TRIAGE_SYSTEM_PROMPT = """\
You are a file triage assistant for a freelance project management tool.
Your job is to explore a local directory on the user's device, understand its
structure, and group files by project context.
You have access to these tools:
- list_directory: to map folder structure
- get_file_metadata: to check creation/modification dates
- read_file_content: to read brief snippets when needed for categorisation
- list_projects / list_all_projects / get_project: to fetch existing projects
from the user's workspace and match files to them
Instructions:
1. Start by calling list_directory on the configured root path.
2. Explore subdirectories as needed to understand the structure.
3. Use get_file_metadata to check modification dates. Skip files that have
NOT been modified since: {last_run_at}.
4. Call list_all_projects to get the user's existing projects.
5. Match files to existing projects by name, folder structure, or content hints.
6. If files don't match any existing project, group them under "standalone".
{custom_prompt_section}
Target entity types to extract: {data_types}
File extensions to consider: {file_extensions}
When you have finished exploring, output ONLY a JSON object (no markdown
fences, no explanation) mapping project IDs or "standalone" to file path
arrays:
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
Return ONLY the JSON object as your final message.
"""
# ── Processing prompt ─────────────────────────────────────────────────────
_PROCESSING_BASE_PROMPT = """\
You are a data extraction and management assistant for a freelance project
management tool.
Available tools:
Filesystem : read_file_content, list_directory, get_file_metadata
Tasks : list_tasks, create_task, update_task, add_task_comment
Notes : list_notes, get_note, create_note, update_note
Timelines : list_timelines, create_timeline, update_timeline
Projects : list_all_projects, get_project, create_project, update_project
Your task:
1. Read the full content of each file below using read_file_content.
2. For each piece of information found, ALWAYS try to match and update an
existing record before creating a new one.
3. ONLY act on these entity types: {data_types}.
4. Do NOT invent data. Only extract what is clearly present in the files.
5. If a file contains no relevant data for the target entity types, skip it.
Update-first rules (apply in this order):
Tasks:
- Call list_tasks to find a match by title or context.
- If found: call add_task_comment (author "Adiuva"), update_task to set
assignees, state (ToDo / In Progress / Completed), or other fields.
- If NOT found: call create_task with isAiSuggested=1, isApproved=0.
Timelines:
- Call list_timelines to find a match by title or date.
- If found: call update_timeline to edit fields or mark it complete.
- If NOT found: call create_timeline with isAiSuggested=1, isApproved=0.
Notes:
- Call list_notes to find a match by title or topic, then get_note to
read its current content.
- If found: call update_note with the merged content.
- If NOT found: call create_note with isAiSuggested=1, isApproved=0.
Projects:
- Call list_all_projects to check for a match first.
- Only call create_project if the information is clearly significant and
no existing project matches. Set isAiSuggested=1, isApproved=0.
{project_context}
Files to process:
{file_list}
{custom_prompt_section}
After processing all files, respond with a brief summary of what you updated
and what you created.
"""
# ── Cron helper ────────────────────────────────────────────────────────────
def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
"""Return ``True`` if the next scheduled run time has already passed.
Always validates the cron expression first — an invalid expression returns
``False`` (fail-safe: never trigger an unparseable schedule).
"""
try:
now = datetime.now(timezone.utc)
if last_run_at is None:
# Validate the expression before deciding this is overdue.
croniter(schedule_cron, now)
return True
ts = last_run_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
cron = croniter(schedule_cron, ts)
next_run: datetime = cron.get_next(datetime)
return now >= next_run
except Exception as exc:
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
return False # Fail-safe: don't trigger if expression is invalid.
# ── WS executor for agent context ─────────────────────────────────────────
def _make_agent_executor(
user_id: str,
device_mgr: DeviceConnectionManager,
run_context: dict | None = None,
) -> Any:
"""Create a WS callback for ``set_client_executor()`` so that all tools
can use ``execute_on_client()`` during an agent run.
If *run_context* is provided it is attached to every ``tool_call`` frame
so the Electron client can attribute actions to the correct agent run.
"""
async def _executor(payload: dict) -> dict:
payload["type"] = "tool_call"
if run_context:
payload["run_context"] = run_context
call_id = payload["id"]
fut = device_mgr.create_pending_call(user_id, call_id)
await device_mgr.send_frame(user_id, payload)
return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT)
return _executor
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
def _as_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content)
async def _run_agent_with_tools(
*,
system_prompt: str,
user_message: str,
tools: list[Any],
max_steps: int,
) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.
Follows the same pattern as ``deep_agent._run_single_agent``:
bind tools → invoke → handle tool calls → repeat until final text.
"""
llm = get_llm()
llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_message),
]
tool_calls_count = 0
tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps):
response: AIMessage = await llm_with_tools.ainvoke(messages)
messages.append(response)
if not response.tool_calls:
return _as_text(response.content)
for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_runner: tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:1200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps, get final response without tools.
final = await llm.ainvoke(messages)
return _as_text(final.content)
# ── Triage map parser ─────────────────────────────────────────────────────
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
"""Extract the JSON triage map from the LLM's final response."""
text = raw.strip()
# Try direct parse first.
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown fences or surrounding text.
import re
match = re.search(r"\{[\s\S]*\}", text)
if match:
try:
parsed = json.loads(match.group(0))
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
return None
# ── Tool list builder ─────────────────────────────────────────────────────
def _build_processing_tools(data_types: list[str]) -> list[Any]:
"""Build the tool list for Phase 2 based on user's data_types selection."""
tools: list[Any] = list(FILESYSTEM_TOOLS)
for dt in data_types:
dt_tools = _DATA_TYPE_TOOLS.get(dt)
if dt_tools:
tools.extend(dt_tools)
return tools
# ── Local agent runner (two-phase) ─────────────────────────────────────────
async def run_local_agent(
user_id: str,
config: LocalAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
run_context: dict | None = None,
) -> None:
"""Execute a local directory agent run using two-phase LLM-with-tools.
Phase 1 — Triage:
Explore the directory structure, check metadata, match files to
existing projects. Output: a JSON map of project → file paths.
Phase 2 — Processing:
For each project group, read full file contents and perform CRUD
operations using the standard entity tools.
"""
run_id = run_log.id
# ── Device online check ─────────────────────────────────────────
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
if target_device_id:
is_online = device_mgr.is_online(user_id, target_device_id)
else:
is_online = device_mgr.is_online(user_id)
if not is_online:
logger.info(
"agent_runner: skip run=%s — device %r offline for user=%s",
run_id,
target_device_id or "<any>",
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Device {target_device_id or '<any>'!r} is not connected"],
)
return
# ── Set up WS executor for tools ────────────────────────────────
executor = _make_agent_executor(user_id, device_mgr, run_context)
set_client_executor(executor)
errors: list[str] = []
items_processed = 0
items_created = 0
try:
# ── Phase 1: Triage ─────────────────────────────────────────
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
last_run_str = "never (process all files)"
if config.last_run_at:
last_run_str = config.last_run_at.isoformat()
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
last_run_at=last_run_str,
custom_prompt_section=custom_section,
data_types=", ".join(config.data_types),
file_extensions=file_ext_str,
)
directory_paths = config.directory_paths
triage_user_msg = (
f"Explore these directories and produce the triage map:\n"
f"{json.dumps(directory_paths, ensure_ascii=False)}"
)
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
triage_response = await _run_agent_with_tools(
system_prompt=triage_prompt,
user_message=triage_user_msg,
tools=triage_tools,
max_steps=_MAX_TRIAGE_STEPS,
)
triage_map = _parse_triage_map(triage_response)
if not triage_map:
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
await _finalize_run(run_log, status="error", errors=errors)
return
logger.info(
"agent_runner: run=%s triage complete groups=%d total_files=%d",
run_id,
len(triage_map),
sum(len(files) for files in triage_map.values()),
)
# ── Phase 2: Processing (per group) ─────────────────────────
processing_tools = _build_processing_tools(config.data_types)
for group_key, file_paths in triage_map.items():
if not file_paths:
continue
logger.info(
"agent_runner: run=%s phase=processing group=%s files=%d",
run_id,
group_key,
len(file_paths),
)
# Build project context for the LLM.
if group_key == "standalone":
project_context = "These files are not associated with any existing project."
else:
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context=project_context,
file_list=file_list_str,
custom_prompt_section=custom_section,
)
items_processed += len(file_paths)
try:
result_text = await _run_agent_with_tools(
system_prompt=processing_prompt,
user_message="Process the listed files now.",
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
)
logger.info(
"agent_runner: run=%s group=%s processing_result=%s",
run_id,
group_key,
result_text[:500],
)
# Count created items by scanning tool call results.
# The tools themselves handle creation; we estimate from the
# summary. A more precise count would require intercepting
# tool results, but the summary is sufficient for the run log.
except Exception as exc:
errors.append(f"Processing error for group '{group_key}': {exc}")
logger.error(
"agent_runner: run=%s group=%s processing failed: %s",
run_id,
group_key,
exc,
)
except Exception as exc:
errors.append(f"Agent run failed: {exc}")
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
finally:
clear_client_executor()
# ── Finalise ────────────────────────────────────────────────────
if errors and items_processed == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=False,
config_id=config.id,
config_type="local",
)
logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d",
run_id,
final_status,
items_processed,
len(errors),
)
# Notify the Electron client that the run is complete so it can close
# the run record in its local SQLite.
if run_context and device_mgr.is_online(user_id):
try:
await device_mgr.send_frame(user_id, {
"type": "run_complete",
"run_context": run_context,
"status": final_status,
})
except Exception as exc:
logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc)
# ── Cloud agent runner ─────────────────────────────────────────────────────
# Default lookback window when an agent has never run before.
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
async def run_cloud_agent(
user_id: str,
config: CloudAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
) -> None:
"""Execute a cloud connector agent run end-to-end.
Steps:
1. Verify the user's device is online — results are pushed to Electron
via WS tool-call frames. If no device is connected, abort.
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
3. Instantiate the provider client (Gmail or MS Graph).
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
the first run) applying ``config.filter_config`` filters.
5. For each message/email call the LLM to extract structured items.
6. Push each item to Electron as an ``insert`` tool-call.
7. If the provider refreshed its access token, re-encrypt and write it
back to ``config.oauth_token_encrypted``.
8. Persist the run outcome via ``_finalize_run``.
"""
run_id = run_log.id
# ── 1. Device online check ─────────────────────────────────────────
if not device_mgr.is_online(user_id):
logger.info(
"agent_runner: skip cloud run=%s — no device online for user=%s",
run_id,
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=["No connected device — cloud agent results cannot be delivered"],
)
return
# ── 2. Decrypt OAuth token ─────────────────────────────────────────
from app.integrations import decrypt_token, encrypt_token, get_provider
if not config.oauth_token_encrypted:
await _finalize_run(
run_log,
status="error",
errors=[f"No OAuth token stored for cloud agent '{config.name}'"],
)
return
try:
credentials_info = decrypt_token(config.oauth_token_encrypted)
except ValueError as exc:
logger.error("agent_runner: failed to decrypt OAuth token for agent %s: %s", config.id, exc)
await _finalize_run(
run_log,
status="error",
errors=[f"Failed to decrypt OAuth token: {exc}"],
)
return
# ── 3. Instantiate provider client ────────────────────────────────
try:
provider = get_provider(config.provider, credentials_info)
except ValueError as exc:
await _finalize_run(
run_log,
status="error",
errors=[str(exc)],
)
return
# ── 4. Fetch messages ─────────────────────────────────────────────
since: datetime | None = config.last_run_at
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=_CLOUD_DEFAULT_LOOKBACK_DAYS)
if since.tzinfo is None:
since = since.replace(tzinfo=timezone.utc)
errors: list[str] = []
items_processed = 0
items_created = 0
try:
if config.provider == "gmail":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "outlook":
raw_messages = await provider.fetch_emails( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "teams":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
else:
raw_messages = []
except RuntimeError as exc:
logger.error(
"agent_runner: provider fetch failed for cloud agent %s: %s",
config.id,
exc,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Provider fetch failed: {exc}"],
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
return
logger.info(
"agent_runner: cloud agent %s fetched %d item(s) from %s for user=%s",
config.id,
len(raw_messages),
config.provider,
user_id,
)
# ── 56. Extract + insert via LLM with tools ─────────────────────
executor = _make_agent_executor(user_id, device_mgr)
set_client_executor(executor)
try:
processing_tools = _build_processing_tools(config.data_types)
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
for msg in raw_messages:
content_text = msg.as_text
if not content_text:
continue
items_processed += 1
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})",
custom_prompt_section=custom_section,
)
try:
await _run_agent_with_tools(
system_prompt=processing_prompt,
user_message=f"Process this message content:\n\n{content_text[:8000]}",
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
)
except Exception as exc:
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
finally:
clear_client_executor()
# ── 7. Persist refreshed token (if any) ───────────────────────────
refreshed = getattr(provider, "refreshed_credentials", None)
if refreshed:
try:
new_encrypted = encrypt_token(refreshed)
async with async_session() as db:
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config.id)
)
cfg_row = cfg_result.scalar_one_or_none()
if cfg_row:
cfg_row.oauth_token_encrypted = new_encrypted
await db.commit()
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
except Exception as exc:
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
# ── 8. Finalise ────────────────────────────────────────────────────
if errors and items_created == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
logger.info(
"agent_runner: cloud run=%s done status=%s processed=%d created=%d errors=%d",
run_id,
final_status,
items_processed,
items_created,
len(errors),
)
# ── Pending-run trigger ─────────────────────────────────────────────────────
async def trigger_pending_runs(
user_id: str,
device_id: str,
device_mgr: DeviceConnectionManager,
) -> None:
"""Dispatch any overdue agent runs after an Electron device connects.
Called as a background task from the device WS endpoint on ``device_hello``.
Scheduling rules:
* **Local agents**: only triggered when ``config.device_id == device_id``.
* **Cloud agents**: triggered on any connected device (no device binding).
* Runs execute **sequentially** to avoid flooding the WS connection.
"""
logger.info(
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
user_id,
device_id,
)
return
# ── Internal helper ─────────────────────────────────────────────────────────
async def _finalize_run(
run_log: AgentRunLog,
*,
status: str,
items_processed: int = 0,
items_created: int = 0,
errors: list[str] | None = None,
update_config_last_run: bool = False,
config_id: str | None = None,
config_type: str | None = None,
) -> None:
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
Uses a fresh DB session so this is safe to call from background tasks
after the original request session has closed.
"""
now = datetime.now(timezone.utc)
try:
async with async_session() as db:
managed = await db.merge(run_log)
managed.status = status
managed.items_processed = items_processed
managed.items_created = items_created
managed.errors = errors or []
managed.completed_at = now
if update_config_last_run and config_id:
if config_type == "local":
cfg_result = await db.execute(
select(LocalAgentConfig).where(LocalAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
elif config_type == "cloud":
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
await db.commit()
except Exception as exc:
logger.error(
"agent_runner: failed to finalize run_log=%s: %s", run_log.id, exc
)