Files
api/app/core/agent_runner.py
Roberto Musso 826f64d6bb refactor local directory agent to two-phase LLM-with-tools architecture
Replace the single-pass FE-driven agent_run/agent_data flow with a
BE-orchestrated two-phase execution using LangChain tool-calling:
- Phase 1 (Triage): explores directory via new filesystem tools, matches
  files to existing projects using PROJECT_TOOLS
- Phase 2 (Processing): reads files and performs CRUD per project group
  with clean LLM context windows

Key changes:
- Add filesystem_agent.py with list_directory, read_file_content,
  get_file_metadata tools using execute_on_client()
- Move setup journey from REST to WebSocket (journey_start/message frames)
- Add batch_runs_per_day billing limit and enforce in /trigger
- Remove deprecated agent_data/agent_complete frame handlers and queues

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 08:50:46 +01:00

773 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent run orchestrator.
Drives two agent types:
* **Local directory agent** — two-phase execution that mirrors the
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
user's directory via file-system tools and groups files by project.
Phase 2 (Processing) reads full file contents and performs CRUD
operations using the standard entity tools (tasks, notes, etc.).
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron.
Usage
-----
Background tasks are spawned with ``asyncio.create_task()``::
asyncio.create_task(run_local_agent(user_id, config, run_log, device_manager))
asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager))
The ``trigger_pending_runs`` function is called by the device WS endpoint
when Electron sends ``device_hello``, so any overdue runs fire immediately
when the device reconnects.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from croniter import croniter
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from sqlalchemy import select
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.device_manager import DeviceConnectionManager
from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, set_client_executor
from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
logger = logging.getLogger(__name__)
# ── Timeouts ───────────────────────────────────────────────────────────────
# Max seconds to wait for a single tool-call round-trip (FE → BE).
_TOOL_CALL_TIMEOUT: int = 30
# Max LLM reasoning steps per phase.
_MAX_TRIAGE_STEPS: int = 10
_MAX_PROCESSING_STEPS: int = 12
# ── Data-type to tool mapping ─────────────────────────────────────────────
_DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"tasks": TASK_TOOLS,
"projects": PROJECT_TOOLS,
"notes": NOTE_TOOLS,
"timelines": TIMELINE_TOOLS,
}
# ── Triage prompt ─────────────────────────────────────────────────────────
_TRIAGE_SYSTEM_PROMPT = """\
You are a file triage assistant for a freelance project management tool.
Your job is to explore a local directory on the user's device, understand its
structure, and group files by project context.
You have access to these tools:
- list_directory: to map folder structure
- get_file_metadata: to check creation/modification dates
- read_file_content: to read brief snippets when needed for categorisation
- list_projects / list_all_projects / get_project: to fetch existing projects
from the user's workspace and match files to them
Instructions:
1. Start by calling list_directory on the configured root path.
2. Explore subdirectories as needed to understand the structure.
3. Use get_file_metadata to check modification dates. Skip files that have
NOT been modified since: {last_run_at}.
4. Call list_all_projects to get the user's existing projects.
5. Match files to existing projects by name, folder structure, or content hints.
6. If files don't match any existing project, group them under "standalone".
{custom_prompt_section}
Target entity types to extract: {data_types}
File extensions to consider: {file_extensions}
When you have finished exploring, output ONLY a JSON object (no markdown
fences, no explanation) mapping project IDs or "standalone" to file path
arrays:
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
Return ONLY the JSON object as your final message.
"""
# ── Processing prompt ─────────────────────────────────────────────────────
_PROCESSING_BASE_PROMPT = """\
You are a data extraction and management assistant for a freelance project
management tool. You have access to tools for reading files and performing
CRUD operations on the user's workspace.
Your task:
1. Read the full content of each file listed below using read_file_content.
2. Based on the content and the user's instructions, create the appropriate
records using the CRUD tools available to you (create_task, create_note,
create_timeline, create_project, etc.).
3. ONLY create records of these entity types: {data_types}.
4. For every record you create, set isAiSuggested=1 and isApproved=0.
5. Do NOT invent data. Only extract what is clearly present in the files.
6. If a file contains no relevant data for the target entity types, skip it.
{project_context}
Files to process:
{file_list}
{custom_prompt_section}
After processing all files, respond with a brief summary of what you created.
"""
# ── Cron helper ────────────────────────────────────────────────────────────
def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
"""Return ``True`` if the next scheduled run time has already passed.
Always validates the cron expression first — an invalid expression returns
``False`` (fail-safe: never trigger an unparseable schedule).
"""
try:
now = datetime.now(timezone.utc)
if last_run_at is None:
# Validate the expression before deciding this is overdue.
croniter(schedule_cron, now)
return True
ts = last_run_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
cron = croniter(schedule_cron, ts)
next_run: datetime = cron.get_next(datetime)
return now >= next_run
except Exception as exc:
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
return False # Fail-safe: don't trigger if expression is invalid.
# ── WS executor for agent context ─────────────────────────────────────────
def _make_agent_executor(
user_id: str,
device_mgr: DeviceConnectionManager,
) -> Any:
"""Create a WS callback for ``set_client_executor()`` so that all tools
can use ``execute_on_client()`` during an agent run.
"""
async def _executor(payload: dict) -> dict:
payload["type"] = "tool_call"
call_id = payload["id"]
fut = device_mgr.create_pending_call(user_id, call_id)
await device_mgr.send_frame(user_id, payload)
return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT)
return _executor
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
def _as_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content)
async def _run_agent_with_tools(
*,
system_prompt: str,
user_message: str,
tools: list[Any],
max_steps: int,
) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.
Follows the same pattern as ``deep_agent._run_single_agent``:
bind tools → invoke → handle tool calls → repeat until final text.
"""
llm = get_llm()
llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_message),
]
tool_calls_count = 0
tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps):
response: AIMessage = await llm_with_tools.ainvoke(messages)
messages.append(response)
if not response.tool_calls:
return _as_text(response.content)
for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_runner: tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:1200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps, get final response without tools.
final = await llm.ainvoke(messages)
return _as_text(final.content)
# ── Triage map parser ─────────────────────────────────────────────────────
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
"""Extract the JSON triage map from the LLM's final response."""
text = raw.strip()
# Try direct parse first.
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown fences or surrounding text.
import re
match = re.search(r"\{[\s\S]*\}", text)
if match:
try:
parsed = json.loads(match.group(0))
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
return None
# ── Tool list builder ─────────────────────────────────────────────────────
def _build_processing_tools(data_types: list[str]) -> list[Any]:
"""Build the tool list for Phase 2 based on user's data_types selection."""
tools: list[Any] = list(FILESYSTEM_TOOLS)
for dt in data_types:
dt_tools = _DATA_TYPE_TOOLS.get(dt)
if dt_tools:
tools.extend(dt_tools)
return tools
# ── Local agent runner (two-phase) ─────────────────────────────────────────
async def run_local_agent(
user_id: str,
config: LocalAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
) -> None:
"""Execute a local directory agent run using two-phase LLM-with-tools.
Phase 1 — Triage:
Explore the directory structure, check metadata, match files to
existing projects. Output: a JSON map of project → file paths.
Phase 2 — Processing:
For each project group, read full file contents and perform CRUD
operations using the standard entity tools.
"""
run_id = run_log.id
# ── Device online check ─────────────────────────────────────────
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
if target_device_id:
is_online = device_mgr.is_online(user_id, target_device_id)
else:
is_online = device_mgr.is_online(user_id)
if not is_online:
logger.info(
"agent_runner: skip run=%s — device %r offline for user=%s",
run_id,
target_device_id or "<any>",
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Device {target_device_id or '<any>'!r} is not connected"],
)
return
# ── Set up WS executor for tools ────────────────────────────────
executor = _make_agent_executor(user_id, device_mgr)
set_client_executor(executor)
errors: list[str] = []
items_processed = 0
items_created = 0
try:
# ── Phase 1: Triage ─────────────────────────────────────────
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
last_run_str = "never (process all files)"
if config.last_run_at:
last_run_str = config.last_run_at.isoformat()
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
last_run_at=last_run_str,
custom_prompt_section=custom_section,
data_types=", ".join(config.data_types),
file_extensions=file_ext_str,
)
directory_paths = config.directory_paths
triage_user_msg = (
f"Explore these directories and produce the triage map:\n"
f"{json.dumps(directory_paths, ensure_ascii=False)}"
)
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
triage_response = await _run_agent_with_tools(
system_prompt=triage_prompt,
user_message=triage_user_msg,
tools=triage_tools,
max_steps=_MAX_TRIAGE_STEPS,
)
triage_map = _parse_triage_map(triage_response)
if not triage_map:
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
await _finalize_run(run_log, status="error", errors=errors)
return
logger.info(
"agent_runner: run=%s triage complete groups=%d total_files=%d",
run_id,
len(triage_map),
sum(len(files) for files in triage_map.values()),
)
# ── Phase 2: Processing (per group) ─────────────────────────
processing_tools = _build_processing_tools(config.data_types)
for group_key, file_paths in triage_map.items():
if not file_paths:
continue
logger.info(
"agent_runner: run=%s phase=processing group=%s files=%d",
run_id,
group_key,
len(file_paths),
)
# Build project context for the LLM.
if group_key == "standalone":
project_context = "These files are not associated with any existing project."
else:
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context=project_context,
file_list=file_list_str,
custom_prompt_section=custom_section,
)
items_processed += len(file_paths)
try:
result_text = await _run_agent_with_tools(
system_prompt=processing_prompt,
user_message="Process the listed files now.",
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
)
logger.info(
"agent_runner: run=%s group=%s processing_result=%s",
run_id,
group_key,
result_text[:500],
)
# Count created items by scanning tool call results.
# The tools themselves handle creation; we estimate from the
# summary. A more precise count would require intercepting
# tool results, but the summary is sufficient for the run log.
except Exception as exc:
errors.append(f"Processing error for group '{group_key}': {exc}")
logger.error(
"agent_runner: run=%s group=%s processing failed: %s",
run_id,
group_key,
exc,
)
except Exception as exc:
errors.append(f"Agent run failed: {exc}")
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
finally:
clear_client_executor()
# ── Finalise ────────────────────────────────────────────────────
if errors and items_processed == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=False,
config_id=config.id,
config_type="local",
)
logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d",
run_id,
final_status,
items_processed,
len(errors),
)
# ── Cloud agent runner ─────────────────────────────────────────────────────
# Default lookback window when an agent has never run before.
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
async def run_cloud_agent(
user_id: str,
config: CloudAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
) -> None:
"""Execute a cloud connector agent run end-to-end.
Steps:
1. Verify the user's device is online — results are pushed to Electron
via WS tool-call frames. If no device is connected, abort.
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
3. Instantiate the provider client (Gmail or MS Graph).
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
the first run) applying ``config.filter_config`` filters.
5. For each message/email call the LLM to extract structured items.
6. Push each item to Electron as an ``insert`` tool-call.
7. If the provider refreshed its access token, re-encrypt and write it
back to ``config.oauth_token_encrypted``.
8. Persist the run outcome via ``_finalize_run``.
"""
run_id = run_log.id
# ── 1. Device online check ─────────────────────────────────────────
if not device_mgr.is_online(user_id):
logger.info(
"agent_runner: skip cloud run=%s — no device online for user=%s",
run_id,
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=["No connected device — cloud agent results cannot be delivered"],
)
return
# ── 2. Decrypt OAuth token ─────────────────────────────────────────
from app.integrations import decrypt_token, encrypt_token, get_provider
if not config.oauth_token_encrypted:
await _finalize_run(
run_log,
status="error",
errors=[f"No OAuth token stored for cloud agent '{config.name}'"],
)
return
try:
credentials_info = decrypt_token(config.oauth_token_encrypted)
except ValueError as exc:
logger.error("agent_runner: failed to decrypt OAuth token for agent %s: %s", config.id, exc)
await _finalize_run(
run_log,
status="error",
errors=[f"Failed to decrypt OAuth token: {exc}"],
)
return
# ── 3. Instantiate provider client ────────────────────────────────
try:
provider = get_provider(config.provider, credentials_info)
except ValueError as exc:
await _finalize_run(
run_log,
status="error",
errors=[str(exc)],
)
return
# ── 4. Fetch messages ─────────────────────────────────────────────
since: datetime | None = config.last_run_at
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=_CLOUD_DEFAULT_LOOKBACK_DAYS)
if since.tzinfo is None:
since = since.replace(tzinfo=timezone.utc)
errors: list[str] = []
items_processed = 0
items_created = 0
try:
if config.provider == "gmail":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "outlook":
raw_messages = await provider.fetch_emails( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "teams":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
else:
raw_messages = []
except RuntimeError as exc:
logger.error(
"agent_runner: provider fetch failed for cloud agent %s: %s",
config.id,
exc,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Provider fetch failed: {exc}"],
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
return
logger.info(
"agent_runner: cloud agent %s fetched %d item(s) from %s for user=%s",
config.id,
len(raw_messages),
config.provider,
user_id,
)
# ── 56. Extract + insert via LLM with tools ─────────────────────
executor = _make_agent_executor(user_id, device_mgr)
set_client_executor(executor)
try:
processing_tools = _build_processing_tools(config.data_types)
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
for msg in raw_messages:
content_text = msg.as_text
if not content_text:
continue
items_processed += 1
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})",
custom_prompt_section=custom_section,
)
try:
await _run_agent_with_tools(
system_prompt=processing_prompt,
user_message=f"Process this message content:\n\n{content_text[:8000]}",
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
)
except Exception as exc:
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
finally:
clear_client_executor()
# ── 7. Persist refreshed token (if any) ───────────────────────────
refreshed = getattr(provider, "refreshed_credentials", None)
if refreshed:
try:
new_encrypted = encrypt_token(refreshed)
async with async_session() as db:
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config.id)
)
cfg_row = cfg_result.scalar_one_or_none()
if cfg_row:
cfg_row.oauth_token_encrypted = new_encrypted
await db.commit()
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
except Exception as exc:
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
# ── 8. Finalise ────────────────────────────────────────────────────
if errors and items_created == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
logger.info(
"agent_runner: cloud run=%s done status=%s processed=%d created=%d errors=%d",
run_id,
final_status,
items_processed,
items_created,
len(errors),
)
# ── Pending-run trigger ─────────────────────────────────────────────────────
async def trigger_pending_runs(
user_id: str,
device_id: str,
device_mgr: DeviceConnectionManager,
) -> None:
"""Dispatch any overdue agent runs after an Electron device connects.
Called as a background task from the device WS endpoint on ``device_hello``.
Scheduling rules:
* **Local agents**: only triggered when ``config.device_id == device_id``.
* **Cloud agents**: triggered on any connected device (no device binding).
* Runs execute **sequentially** to avoid flooding the WS connection.
"""
logger.info(
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
user_id,
device_id,
)
return
# ── Internal helper ─────────────────────────────────────────────────────────
async def _finalize_run(
run_log: AgentRunLog,
*,
status: str,
items_processed: int = 0,
items_created: int = 0,
errors: list[str] | None = None,
update_config_last_run: bool = False,
config_id: str | None = None,
config_type: str | None = None,
) -> None:
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
Uses a fresh DB session so this is safe to call from background tasks
after the original request session has closed.
"""
now = datetime.now(timezone.utc)
try:
async with async_session() as db:
managed = await db.merge(run_log)
managed.status = status
managed.items_processed = items_processed
managed.items_created = items_created
managed.errors = errors or []
managed.completed_at = now
if update_config_last_run and config_id:
if config_type == "local":
cfg_result = await db.execute(
select(LocalAgentConfig).where(LocalAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
elif config_type == "cloud":
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
await db.commit()
except Exception as exc:
logger.error(
"agent_runner: failed to finalize run_log=%s: %s", run_log.id, exc
)