Files
api/app/core/agent_runner.py
roberto cfc9d7a942 refactor: replace orchestrator with LangGraph deep-agent supervisors
- Add app/core/deep_agent.py with Home and Floating supervisor graphs
  using LangGraph create_react_agent (hierarchical pattern)
- Strip ChatAgent classes from all 4 agent files, keep @tool functions
- Rewrite output_formatter.py for event-based (token/tool_end/mutations) stream
- Update device_ws.py to use run_home_stream/run_floating_stream
- Rewrite chat.py REST route to use run_home
- Add update_core_memory tool to both supervisors
- Add langgraph>=0.3.0 to requirements.txt
- Remove orchestrator.py, execution_plan.py, agent_registry.py, plans.py
- Remove PlanAction, PlanStep, ExecutionPlan, execution_mode from schemas
- Update all affected tests to match new API
- Remove 6 deprecated test files for deleted modules
- Clean up stale docstrings referencing removed orchestrator
2026-03-11 17:50:22 +01:00

719 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent run manager.
Drives two agent types:
* **Local directory agent** — sends an ``agent_run`` frame to the connected
Electron device, waits for the device to stream back file contents via
``agent_data`` frames, then calls the LLM to extract structured items from
each file and pushes inserts to Electron via tool-call round-trips.
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron. **This path is
a stub** — provider integrations are implemented in Step 3.6.
Usage
-----
Background tasks are spawned with ``asyncio.create_task()``::
asyncio.create_task(run_local_agent(user_id, config, run_log, device_manager))
asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager))
The ``trigger_pending_runs`` function is called by the device WS endpoint
when Electron sends ``device_hello``, so any overdue runs fire immediately
when the device reconnects.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from croniter import croniter
from langchain_core.messages import HumanMessage, SystemMessage
from sqlalchemy import select
from app.core.device_manager import DeviceConnectionManager
from app.core.llm import get_llm
from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
logger = logging.getLogger(__name__)
# ── Timeouts ───────────────────────────────────────────────────────────────
# Max seconds to wait for Electron to finish streaming file data.
_FILE_READ_TIMEOUT: int = 120
# Max seconds to wait for Electron to acknowledge a single tool-call insert.
_INSERT_TIMEOUT: int = 30
# ── Allowed tables & extraction schema hints ───────────────────────────────
_ALLOWED_TABLES: frozenset[str] = frozenset(
{"tasks", "notes", "timelines", "projects", "taskComments"}
)
# Field descriptions fed to the extraction LLM as concise schema references.
_TABLE_SCHEMAS: dict[str, str] = {
"tasks": (
"title (str, required), description (str), "
"status (todo|in_progress|done, default todo), "
"priority (high|medium|low, default medium), "
"assignee (JSON array string), dueDate (ms timestamp int), projectId (str)"
),
"notes": "title (str, required), content (str, markdown), projectId (str)",
"timelines": (
"title (str, required), projectId (str, required), date (ms timestamp int)"
),
"projects": "name (str, required), clientId (str)",
"taskComments": "taskId (str, required), author (str), content (str, required)",
}
_EXTRACTION_SYSTEM_PROMPT = """\
You are a data extraction assistant for a freelance project management tool.
Given a document, extract structured records matching the user's instructions.
Output a JSON array (no markdown fences, no explanation) of objects shaped:
[{{"table": "<table_name>", "data": {{...fields}}}}, ...]
Allowed table names and their fields:
{table_schemas}
Rules:
- Only extract tables listed in the "data_types" instructions.
- Use camelCase field names exactly as shown above.
- Omit optional fields you cannot determine; do not invent data.
- Never include id, createdAt, updatedAt, isAiSuggested, or isApproved.
- If nothing relevant is found, return an empty JSON array: []
- Return ONLY the JSON array.
"""
# ── Cron helper ────────────────────────────────────────────────────────────
def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
"""Return ``True`` if the next scheduled run time has already passed.
Always validates the cron expression first — an invalid expression returns
``False`` (fail-safe: never trigger an unparseable schedule).
"""
try:
now = datetime.now(timezone.utc)
if last_run_at is None:
# Validate the expression before deciding this is overdue.
croniter(schedule_cron, now)
return True
ts = last_run_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
cron = croniter(schedule_cron, ts)
next_run: datetime = cron.get_next(datetime)
return now >= next_run
except Exception as exc:
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
return False # Fail-safe: don't trigger if expression is invalid.
# ── LLM extraction ─────────────────────────────────────────────────────────
async def _extract_items_from_content(
prompt_template: str,
file_content: str,
data_types: list[str],
) -> list[dict[str, Any]]:
"""Call the LLM to extract structured records from *file_content*.
Returns a validated list of ``{table: str, data: dict}`` objects.
Items referencing tables not in *data_types* are discarded.
"""
allowed = [t for t in data_types if t in _ALLOWED_TABLES]
if not allowed:
return []
schema_text = "\n".join(
f" {table}: {_TABLE_SCHEMAS.get(table, '(unknown)')}" for table in allowed
)
system_prompt = _EXTRACTION_SYSTEM_PROMPT.format(table_schemas=schema_text)
user_prompt = (
f"User instructions: {prompt_template}\n\n"
f"Extract these record types: {', '.join(allowed)}\n\n"
f"Document:\n{file_content[:8000]}"
)
llm = get_llm()
raw = ""
try:
response = await llm.ainvoke(
[SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
)
raw = str(response.content).strip()
items: list[dict] = json.loads(raw)
if not isinstance(items, list):
raise ValueError("LLM response is not a JSON array")
except json.JSONDecodeError as exc:
logger.warning(
"agent_runner: LLM extraction returned invalid JSON: %s — snippet: %.200r",
exc,
raw,
)
return []
# Other exceptions (LLM API errors, network errors) propagate to the
# caller (run_local_agent) which records them per-file in the run log.
validated: list[dict[str, Any]] = []
for item in items:
table = item.get("table")
data = item.get("data")
if not isinstance(table, str) or table not in allowed:
continue
if not isinstance(data, dict) or not data:
continue
# Strip any server-generated or forbidden fields.
for _field in ("id", "createdAt", "updatedAt", "isAiSuggested", "isApproved"):
data.pop(_field, None)
validated.append({"table": table, "data": data})
return validated
# ── Tool-call insert helper ─────────────────────────────────────────────────
async def _send_insert_to_client(
user_id: str,
table: str,
data: dict[str, Any],
device_mgr: DeviceConnectionManager,
) -> dict[str, Any]:
"""Send an ``insert`` tool_call frame to Electron and await the tool_result.
All inserts include ``isAiSuggested=1, isApproved=0`` so the user can
review AI-produced records before they are treated as confirmed.
Raises ``asyncio.TimeoutError`` if Electron does not respond within
``_INSERT_TIMEOUT`` seconds. Raises ``RuntimeError`` if the device
disconnects before the frame can be sent.
"""
call_id = str(uuid.uuid4())
payload: dict[str, Any] = {
"type": "tool_call",
"id": call_id,
"action": "insert",
"table": table,
"data": {**data, "isAiSuggested": 1, "isApproved": 0},
}
fut = device_mgr.create_pending_call(user_id, call_id)
await device_mgr.send_frame(user_id, payload)
return await asyncio.wait_for(fut, timeout=_INSERT_TIMEOUT)
# ── Local agent runner ──────────────────────────────────────────────────────
async def run_local_agent(
user_id: str,
config: LocalAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
) -> None:
"""Execute a local directory agent run end-to-end.
Steps:
1. Verify the device identified by ``config.device_id`` is currently online.
2. Pre-create the agent_data queue so no incoming frames are lost.
3. Send ``agent_run`` frame to Electron (paths, extensions, prompt, data_types).
4. Consume ``agent_data`` frames until the ``None`` sentinel from
``agent_complete``.
5. For each received file call the LLM to extract ``{table, data}`` items.
6. Push each item to Electron as an ``insert`` tool-call; include
``isAiSuggested=1, isApproved=0`` so users can review AI suggestions.
7. Persist the run outcome (status, counts, errors) and update
``config.last_run_at``.
"""
run_id = run_log.id
# ── 1. Device online check ─────────────────────────────────────────
if not device_mgr.is_online(user_id, config.device_id):
logger.info(
"agent_runner: skip run=%s — device %r offline for user=%s",
run_id,
config.device_id,
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Device {config.device_id!r} is not connected"],
)
return
# ── 2. Pre-create agent_data queue ────────────────────────────────
try:
device_mgr.get_agent_data_queue(user_id, run_id)
except RuntimeError:
await _finalize_run(
run_log,
status="error",
errors=["Device disconnected before agent run could start"],
)
return
# ── 3. Send agent_run frame ────────────────────────────────────────
frame: dict[str, Any] = {
"type": "agent_run",
"run_id": run_id,
"agent_id": config.id,
"config": {
"paths": config.directory_paths,
"file_extensions": config.file_extensions,
"prompt_template": config.prompt_template,
"data_types": config.data_types,
},
}
try:
await device_mgr.send_frame(user_id, frame)
except RuntimeError as exc:
device_mgr.cleanup_agent_data_queue(user_id, run_id)
await _finalize_run(
run_log,
status="error",
errors=[f"Failed to send agent_run frame: {exc}"],
)
return
logger.info(
"agent_runner: sent agent_run run=%s agent=%s user=%s",
run_id,
config.id,
user_id,
)
# ── 4. Consume agent_data frames ──────────────────────────────────
files: list[dict[str, Any]] = []
errors: list[str] = []
try:
queue = device_mgr.get_agent_data_queue(user_id, run_id)
deadline = asyncio.get_event_loop().time() + _FILE_READ_TIMEOUT
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
errors.append("Timed out waiting for file data from device")
break
try:
frame_data = await asyncio.wait_for(queue.get(), timeout=remaining)
except asyncio.TimeoutError:
errors.append("Timed out waiting for file data from device")
break
if frame_data is None:
# Sentinel from agent_complete — stream is done.
break
files.extend(frame_data.get("files", []))
except RuntimeError as exc:
errors.append(f"Queue error reading agent data: {exc}")
# ── 56. Extract + insert ─────────────────────────────────────────
items_processed = 0
items_created = 0
for file_info in files:
file_path: str = file_info.get("path", "<unknown>")
content: str = file_info.get("content", "")
if not content:
continue
items_processed += 1
try:
extracted = await _extract_items_from_content(
config.prompt_template, content, config.data_types
)
except Exception as exc:
errors.append(f"LLM extraction error for {file_path!r}: {exc}")
continue
for item in extracted:
try:
result = await _send_insert_to_client(
user_id, item["table"], item["data"], device_mgr
)
if result.get("error"):
errors.append(
f"Insert failed ({item['table']}, {file_path!r}): {result['error']}"
)
else:
items_created += 1
except asyncio.TimeoutError:
errors.append(
f"Timed out awaiting insert ack ({item['table']}, {file_path!r})"
)
except RuntimeError as exc:
errors.append(f"Insert error ({item['table']}, {file_path!r}): {exc}")
# ── 7. Finalise ────────────────────────────────────────────────────
device_mgr.cleanup_agent_data_queue(user_id, run_id)
if errors and items_created == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=True,
config_id=config.id,
config_type="local",
)
logger.info(
"agent_runner: run=%s done status=%s processed=%d created=%d errors=%d",
run_id,
final_status,
items_processed,
items_created,
len(errors),
)
# ── Cloud agent runner ─────────────────────────────────────────────────────
# Default lookback window when an agent has never run before.
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
async def run_cloud_agent(
user_id: str,
config: CloudAgentConfig,
run_log: AgentRunLog,
device_mgr: DeviceConnectionManager,
) -> None:
"""Execute a cloud connector agent run end-to-end.
Steps:
1. Verify the user's device is online — results are pushed to Electron
via WS tool-call frames. If no device is connected, abort.
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
3. Instantiate the provider client (Gmail or MS Graph).
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
the first run) applying ``config.filter_config`` filters.
5. For each message/email call ``_extract_items_from_content`` with
``config.prompt_template`` to get structured ``{table, data}`` items.
6. Push each item to Electron as an ``insert`` tool-call.
7. If the provider refreshed its access token, re-encrypt and write it
back to ``config.oauth_token_encrypted``.
8. Persist the run outcome via ``_finalize_run``.
"""
run_id = run_log.id
# ── 1. Device online check ─────────────────────────────────────────
if not device_mgr.is_online(user_id):
logger.info(
"agent_runner: skip cloud run=%s — no device online for user=%s",
run_id,
user_id,
)
await _finalize_run(
run_log,
status="error",
errors=["No connected device — cloud agent results cannot be delivered"],
)
return
# ── 2. Decrypt OAuth token ─────────────────────────────────────────
from app.integrations import decrypt_token, encrypt_token, get_provider
if not config.oauth_token_encrypted:
await _finalize_run(
run_log,
status="error",
errors=[f"No OAuth token stored for cloud agent '{config.name}'"],
)
return
try:
credentials_info = decrypt_token(config.oauth_token_encrypted)
except ValueError as exc:
logger.error("agent_runner: failed to decrypt OAuth token for agent %s: %s", config.id, exc)
await _finalize_run(
run_log,
status="error",
errors=[f"Failed to decrypt OAuth token: {exc}"],
)
return
# ── 3. Instantiate provider client ────────────────────────────────
try:
provider = get_provider(config.provider, credentials_info)
except ValueError as exc:
await _finalize_run(
run_log,
status="error",
errors=[str(exc)],
)
return
# ── 4. Fetch messages ─────────────────────────────────────────────
since: datetime | None = config.last_run_at
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=_CLOUD_DEFAULT_LOOKBACK_DAYS)
if since.tzinfo is None:
since = since.replace(tzinfo=timezone.utc)
errors: list[str] = []
items_processed = 0
items_created = 0
try:
if config.provider == "gmail":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "outlook":
raw_messages = await provider.fetch_emails( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
elif config.provider == "teams":
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
filter_config=config.filter_config,
since=since,
)
else:
raw_messages = []
except RuntimeError as exc:
logger.error(
"agent_runner: provider fetch failed for cloud agent %s: %s",
config.id,
exc,
)
await _finalize_run(
run_log,
status="error",
errors=[f"Provider fetch failed: {exc}"],
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
return
logger.info(
"agent_runner: cloud agent %s fetched %d item(s) from %s for user=%s",
config.id,
len(raw_messages),
config.provider,
user_id,
)
# ── 56. Extract + insert ─────────────────────────────────────────
for msg in raw_messages:
content_text = msg.as_text
if not content_text:
continue
items_processed += 1
try:
extracted = await _extract_items_from_content(
config.prompt_template, content_text, config.data_types
)
except Exception as exc:
errors.append(f"LLM extraction error for message {msg.id!r}: {exc}")
continue
for item in extracted:
try:
result = await _send_insert_to_client(
user_id, item["table"], item["data"], device_mgr
)
if result.get("error"):
errors.append(
f"Insert failed ({item['table']}, msg={msg.id!r}): {result['error']}"
)
else:
items_created += 1
except asyncio.TimeoutError:
errors.append(
f"Timed out awaiting insert ack ({item['table']}, msg={msg.id!r})"
)
except RuntimeError as exc:
errors.append(f"Insert error ({item['table']}, msg={msg.id!r}): {exc}")
# ── 7. Persist refreshed token (if any) ───────────────────────────
refreshed = getattr(provider, "refreshed_credentials", None)
if refreshed:
try:
new_encrypted = encrypt_token(refreshed)
async with async_session() as db:
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config.id)
)
cfg_row = cfg_result.scalar_one_or_none()
if cfg_row:
cfg_row.oauth_token_encrypted = new_encrypted
await db.commit()
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
except Exception as exc:
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
# ── 8. Finalise ────────────────────────────────────────────────────
if errors and items_created == 0:
final_status = "error"
elif errors:
final_status = "partial"
else:
final_status = "success"
await _finalize_run(
run_log,
status=final_status,
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=True,
config_id=config.id,
config_type="cloud",
)
logger.info(
"agent_runner: cloud run=%s done status=%s processed=%d created=%d errors=%d",
run_id,
final_status,
items_processed,
items_created,
len(errors),
)
# ── Pending-run trigger ─────────────────────────────────────────────────────
async def trigger_pending_runs(
user_id: str,
device_id: str,
device_mgr: DeviceConnectionManager,
) -> None:
"""Dispatch any overdue agent runs after an Electron device connects.
Called as a background task from the device WS endpoint on ``device_hello``.
Scheduling rules:
* **Local agents**: only triggered when ``config.device_id == device_id``.
* **Cloud agents**: triggered on any connected device (no device binding).
* Runs execute **sequentially** to avoid flooding the WS connection.
"""
logger.info(
"agent_runner: scanning overdue runs for user=%s device=%s", user_id, device_id
)
async with async_session() as db:
local_result = await db.execute(
select(LocalAgentConfig).where(
LocalAgentConfig.user_id == user_id,
LocalAgentConfig.enabled == True, # noqa: E712
LocalAgentConfig.device_id == device_id,
)
)
local_configs: list[LocalAgentConfig] = list(local_result.scalars().all())
cloud_result = await db.execute(
select(CloudAgentConfig).where(
CloudAgentConfig.user_id == user_id,
CloudAgentConfig.enabled == True, # noqa: E712
)
)
cloud_configs: list[CloudAgentConfig] = list(cloud_result.scalars().all())
# Build ordered list of overdue (type, config) pairs.
pending: list[tuple[str, Any]] = []
for cfg in local_configs:
if _is_overdue(cfg.schedule_cron, cfg.last_run_at):
pending.append(("local", cfg))
for cfg in cloud_configs:
if _is_overdue(cfg.schedule_cron, cfg.last_run_at):
pending.append(("cloud", cfg))
if not pending:
logger.debug("agent_runner: no overdue runs for user=%s", user_id)
return
logger.info(
"agent_runner: %d overdue run(s) to dispatch for user=%s", len(pending), user_id
)
for agent_type, cfg in pending:
# Create a fresh run log for this scheduled dispatch.
run_log = AgentRunLog(
agent_id=cfg.id,
agent_type=agent_type,
user_id=user_id,
status="running",
)
async with async_session() as db:
db.add(run_log)
await db.commit()
await db.refresh(run_log)
if agent_type == "local":
await run_local_agent(user_id, cfg, run_log, device_mgr)
else:
await run_cloud_agent(user_id, cfg, run_log, device_mgr)
# ── Internal helper ─────────────────────────────────────────────────────────
async def _finalize_run(
run_log: AgentRunLog,
*,
status: str,
items_processed: int = 0,
items_created: int = 0,
errors: list[str] | None = None,
update_config_last_run: bool = False,
config_id: str | None = None,
config_type: str | None = None,
) -> None:
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
Uses a fresh DB session so this is safe to call from background tasks
after the original request session has closed.
"""
now = datetime.now(timezone.utc)
try:
async with async_session() as db:
managed = await db.merge(run_log)
managed.status = status
managed.items_processed = items_processed
managed.items_created = items_created
managed.errors = errors or []
managed.completed_at = now
if update_config_last_run and config_id:
if config_type == "local":
cfg_result = await db.execute(
select(LocalAgentConfig).where(LocalAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
elif config_type == "cloud":
cfg_result = await db.execute(
select(CloudAgentConfig).where(CloudAgentConfig.id == config_id)
)
cfg = cfg_result.scalar_one_or_none()
if cfg:
cfg.last_run_at = now
await db.commit()
except Exception as exc:
logger.error(
"agent_runner: failed to finalize run_log=%s: %s", run_log.id, exc
)