- Remove unused config_schema from AgentCatalogItem (schema + route) - Fix agent_setup system prompt: add extraction agent base behaviour context so journey LLM knows what is already handled and focuses on field mappings only; remove redundant data-types question (already known from user selection); derive data types list dynamically - Rewrite processing base prompt to use actual tool names (list_tasks, update_task, add_task_comment, list_notes, update_note, list_timelines, update_timeline, list_all_projects, create_project) and enforce update-first strategy before falling back to creation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
798 lines
30 KiB
Python
798 lines
30 KiB
Python
"""Agent run orchestrator.
|
||
|
||
Drives two agent types:
|
||
|
||
* **Local directory agent** — two-phase execution that mirrors the
|
||
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
|
||
user's directory via file-system tools and groups files by project.
|
||
Phase 2 (Processing) reads full file contents and performs CRUD
|
||
operations using the standard entity tools (tasks, notes, etc.).
|
||
|
||
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
|
||
Teams, Outlook) and pushes extracted items to Electron.
|
||
|
||
Usage
|
||
-----
|
||
Background tasks are spawned with ``asyncio.create_task()``::
|
||
|
||
asyncio.create_task(run_local_agent(user_id, config, run_log, device_manager))
|
||
asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager))
|
||
|
||
The ``trigger_pending_runs`` function is called by the device WS endpoint
|
||
when Electron sends ``device_hello``, so any overdue runs fire immediately
|
||
when the device reconnects.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import uuid
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
from croniter import croniter
|
||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||
from sqlalchemy import select
|
||
|
||
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
|
||
from app.agents.note_agent import NOTE_TOOLS
|
||
from app.agents.project_agent import PROJECT_TOOLS
|
||
from app.agents.task_agent import TASK_TOOLS
|
||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||
from app.core.device_manager import DeviceConnectionManager
|
||
from app.core.llm import get_llm
|
||
from app.core.ws_context import clear_client_executor, set_client_executor
|
||
from app.db import async_session
|
||
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Timeouts ───────────────────────────────────────────────────────────────
|
||
|
||
# Max seconds to wait for a single tool-call round-trip (FE → BE).
|
||
_TOOL_CALL_TIMEOUT: int = 30
|
||
# Max LLM reasoning steps per phase.
|
||
_MAX_TRIAGE_STEPS: int = 10
|
||
_MAX_PROCESSING_STEPS: int = 12
|
||
|
||
# ── Data-type to tool mapping ─────────────────────────────────────────────
|
||
|
||
_DATA_TYPE_TOOLS: dict[str, list[Any]] = {
|
||
"tasks": TASK_TOOLS,
|
||
"projects": PROJECT_TOOLS,
|
||
"notes": NOTE_TOOLS,
|
||
"timelines": TIMELINE_TOOLS,
|
||
}
|
||
|
||
# ── Triage prompt ─────────────────────────────────────────────────────────
|
||
|
||
_TRIAGE_SYSTEM_PROMPT = """\
|
||
You are a file triage assistant for a freelance project management tool.
|
||
Your job is to explore a local directory on the user's device, understand its
|
||
structure, and group files by project context.
|
||
|
||
You have access to these tools:
|
||
- list_directory: to map folder structure
|
||
- get_file_metadata: to check creation/modification dates
|
||
- read_file_content: to read brief snippets when needed for categorisation
|
||
- list_projects / list_all_projects / get_project: to fetch existing projects
|
||
from the user's workspace and match files to them
|
||
|
||
Instructions:
|
||
1. Start by calling list_directory on the configured root path.
|
||
2. Explore subdirectories as needed to understand the structure.
|
||
3. Use get_file_metadata to check modification dates. Skip files that have
|
||
NOT been modified since: {last_run_at}.
|
||
4. Call list_all_projects to get the user's existing projects.
|
||
5. Match files to existing projects by name, folder structure, or content hints.
|
||
6. If files don't match any existing project, group them under "standalone".
|
||
|
||
{custom_prompt_section}
|
||
|
||
Target entity types to extract: {data_types}
|
||
File extensions to consider: {file_extensions}
|
||
|
||
When you have finished exploring, output ONLY a JSON object (no markdown
|
||
fences, no explanation) mapping project IDs or "standalone" to file path
|
||
arrays:
|
||
|
||
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
|
||
|
||
Return ONLY the JSON object as your final message.
|
||
"""
|
||
|
||
# ── Processing prompt ─────────────────────────────────────────────────────
|
||
|
||
_PROCESSING_BASE_PROMPT = """\
|
||
You are a data extraction and management assistant for a freelance project
|
||
management tool.
|
||
|
||
Available tools:
|
||
Filesystem : read_file_content, list_directory, get_file_metadata
|
||
Tasks : list_tasks, create_task, update_task, add_task_comment
|
||
Notes : list_notes, get_note, create_note, update_note
|
||
Timelines : list_timelines, create_timeline, update_timeline
|
||
Projects : list_all_projects, get_project, create_project, update_project
|
||
|
||
Your task:
|
||
1. Read the full content of each file below using read_file_content.
|
||
2. For each piece of information found, ALWAYS try to match and update an
|
||
existing record before creating a new one.
|
||
3. ONLY act on these entity types: {data_types}.
|
||
4. Do NOT invent data. Only extract what is clearly present in the files.
|
||
5. If a file contains no relevant data for the target entity types, skip it.
|
||
|
||
Update-first rules (apply in this order):
|
||
Tasks:
|
||
- Call list_tasks to find a match by title or context.
|
||
- If found: call add_task_comment (author "Adiuva"), update_task to set
|
||
assignees, state (ToDo / In Progress / Completed), or other fields.
|
||
- If NOT found: call create_task with isAiSuggested=1, isApproved=0.
|
||
Timelines:
|
||
- Call list_timelines to find a match by title or date.
|
||
- If found: call update_timeline to edit fields or mark it complete.
|
||
- If NOT found: call create_timeline with isAiSuggested=1, isApproved=0.
|
||
Notes:
|
||
- Call list_notes to find a match by title or topic, then get_note to
|
||
read its current content.
|
||
- If found: call update_note with the merged content.
|
||
- If NOT found: call create_note with isAiSuggested=1, isApproved=0.
|
||
Projects:
|
||
- Call list_all_projects to check for a match first.
|
||
- Only call create_project if the information is clearly significant and
|
||
no existing project matches. Set isAiSuggested=1, isApproved=0.
|
||
|
||
{project_context}
|
||
|
||
Files to process:
|
||
{file_list}
|
||
|
||
{custom_prompt_section}
|
||
|
||
After processing all files, respond with a brief summary of what you updated
|
||
and what you created.
|
||
"""
|
||
|
||
|
||
# ── Cron helper ────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
|
||
"""Return ``True`` if the next scheduled run time has already passed.
|
||
|
||
Always validates the cron expression first — an invalid expression returns
|
||
``False`` (fail-safe: never trigger an unparseable schedule).
|
||
"""
|
||
try:
|
||
now = datetime.now(timezone.utc)
|
||
if last_run_at is None:
|
||
# Validate the expression before deciding this is overdue.
|
||
croniter(schedule_cron, now)
|
||
return True
|
||
ts = last_run_at
|
||
if ts.tzinfo is None:
|
||
ts = ts.replace(tzinfo=timezone.utc)
|
||
cron = croniter(schedule_cron, ts)
|
||
next_run: datetime = cron.get_next(datetime)
|
||
return now >= next_run
|
||
except Exception as exc:
|
||
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
|
||
return False # Fail-safe: don't trigger if expression is invalid.
|
||
|
||
|
||
# ── WS executor for agent context ─────────────────────────────────────────
|
||
|
||
|
||
def _make_agent_executor(
|
||
user_id: str,
|
||
device_mgr: DeviceConnectionManager,
|
||
) -> Any:
|
||
"""Create a WS callback for ``set_client_executor()`` so that all tools
|
||
can use ``execute_on_client()`` during an agent run.
|
||
"""
|
||
async def _executor(payload: dict) -> dict:
|
||
payload["type"] = "tool_call"
|
||
call_id = payload["id"]
|
||
fut = device_mgr.create_pending_call(user_id, call_id)
|
||
await device_mgr.send_frame(user_id, payload)
|
||
return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT)
|
||
return _executor
|
||
|
||
|
||
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
|
||
|
||
|
||
def _as_text(content: Any) -> str:
|
||
if content is None:
|
||
return ""
|
||
if isinstance(content, str):
|
||
return content
|
||
if isinstance(content, list):
|
||
parts: list[str] = []
|
||
for item in content:
|
||
if isinstance(item, str):
|
||
parts.append(item)
|
||
elif isinstance(item, dict):
|
||
text = item.get("text")
|
||
if isinstance(text, str):
|
||
parts.append(text)
|
||
return "".join(parts)
|
||
return str(content)
|
||
|
||
|
||
async def _run_agent_with_tools(
|
||
*,
|
||
system_prompt: str,
|
||
user_message: str,
|
||
tools: list[Any],
|
||
max_steps: int,
|
||
) -> str:
|
||
"""Run an LLM agent with tool-calling, returning the final text response.
|
||
|
||
Follows the same pattern as ``deep_agent._run_single_agent``:
|
||
bind tools → invoke → handle tool calls → repeat until final text.
|
||
"""
|
||
llm = get_llm()
|
||
llm_with_tools = llm.bind_tools(tools)
|
||
messages: list[Any] = [
|
||
SystemMessage(content=system_prompt),
|
||
HumanMessage(content=user_message),
|
||
]
|
||
|
||
tool_calls_count = 0
|
||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||
|
||
for _ in range(max_steps):
|
||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||
messages.append(response)
|
||
|
||
if not response.tool_calls:
|
||
return _as_text(response.content)
|
||
|
||
for call in response.tool_calls:
|
||
tool_calls_count += 1
|
||
call_id = str(call.get("id", ""))
|
||
call_name = str(call.get("name", ""))
|
||
call_args = call.get("args", {})
|
||
logger.info(
|
||
"agent_runner: tool_call name=%s args=%s",
|
||
call_name,
|
||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||
)
|
||
|
||
tool_fn = tool_map.get(call_name)
|
||
if tool_fn is None:
|
||
tool_output = f"Unknown tool: {call_name}"
|
||
else:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
|
||
logger.info(
|
||
"agent_runner: tool_result name=%s output=%s",
|
||
call_name,
|
||
str(tool_output)[:1200],
|
||
)
|
||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||
|
||
# Fallback: exceeded max steps, get final response without tools.
|
||
final = await llm.ainvoke(messages)
|
||
return _as_text(final.content)
|
||
|
||
|
||
# ── Triage map parser ─────────────────────────────────────────────────────
|
||
|
||
|
||
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
|
||
"""Extract the JSON triage map from the LLM's final response."""
|
||
text = raw.strip()
|
||
# Try direct parse first.
|
||
try:
|
||
parsed = json.loads(text)
|
||
if isinstance(parsed, dict):
|
||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Try extracting JSON from markdown fences or surrounding text.
|
||
import re
|
||
match = re.search(r"\{[\s\S]*\}", text)
|
||
if match:
|
||
try:
|
||
parsed = json.loads(match.group(0))
|
||
if isinstance(parsed, dict):
|
||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
||
except json.JSONDecodeError:
|
||
pass
|
||
return None
|
||
|
||
|
||
# ── Tool list builder ─────────────────────────────────────────────────────
|
||
|
||
|
||
def _build_processing_tools(data_types: list[str]) -> list[Any]:
|
||
"""Build the tool list for Phase 2 based on user's data_types selection."""
|
||
tools: list[Any] = list(FILESYSTEM_TOOLS)
|
||
for dt in data_types:
|
||
dt_tools = _DATA_TYPE_TOOLS.get(dt)
|
||
if dt_tools:
|
||
tools.extend(dt_tools)
|
||
return tools
|
||
|
||
|
||
# ── Local agent runner (two-phase) ─────────────────────────────────────────
|
||
|
||
|
||
async def run_local_agent(
|
||
user_id: str,
|
||
config: LocalAgentConfig,
|
||
run_log: AgentRunLog,
|
||
device_mgr: DeviceConnectionManager,
|
||
) -> None:
|
||
"""Execute a local directory agent run using two-phase LLM-with-tools.
|
||
|
||
Phase 1 — Triage:
|
||
Explore the directory structure, check metadata, match files to
|
||
existing projects. Output: a JSON map of project → file paths.
|
||
|
||
Phase 2 — Processing:
|
||
For each project group, read full file contents and perform CRUD
|
||
operations using the standard entity tools.
|
||
"""
|
||
run_id = run_log.id
|
||
|
||
# ── Device online check ─────────────────────────────────────────
|
||
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
|
||
if target_device_id:
|
||
is_online = device_mgr.is_online(user_id, target_device_id)
|
||
else:
|
||
is_online = device_mgr.is_online(user_id)
|
||
|
||
if not is_online:
|
||
logger.info(
|
||
"agent_runner: skip run=%s — device %r offline for user=%s",
|
||
run_id,
|
||
target_device_id or "<any>",
|
||
user_id,
|
||
)
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=[f"Device {target_device_id or '<any>'!r} is not connected"],
|
||
)
|
||
return
|
||
|
||
# ── Set up WS executor for tools ────────────────────────────────
|
||
executor = _make_agent_executor(user_id, device_mgr)
|
||
set_client_executor(executor)
|
||
|
||
errors: list[str] = []
|
||
items_processed = 0
|
||
items_created = 0
|
||
|
||
try:
|
||
# ── Phase 1: Triage ─────────────────────────────────────────
|
||
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
|
||
|
||
last_run_str = "never (process all files)"
|
||
if config.last_run_at:
|
||
last_run_str = config.last_run_at.isoformat()
|
||
|
||
custom_section = ""
|
||
if config.prompt_template:
|
||
custom_section = f"User instructions:\n{config.prompt_template}"
|
||
|
||
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
|
||
|
||
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
|
||
last_run_at=last_run_str,
|
||
custom_prompt_section=custom_section,
|
||
data_types=", ".join(config.data_types),
|
||
file_extensions=file_ext_str,
|
||
)
|
||
|
||
directory_paths = config.directory_paths
|
||
triage_user_msg = (
|
||
f"Explore these directories and produce the triage map:\n"
|
||
f"{json.dumps(directory_paths, ensure_ascii=False)}"
|
||
)
|
||
|
||
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
|
||
|
||
triage_response = await _run_agent_with_tools(
|
||
system_prompt=triage_prompt,
|
||
user_message=triage_user_msg,
|
||
tools=triage_tools,
|
||
max_steps=_MAX_TRIAGE_STEPS,
|
||
)
|
||
|
||
triage_map = _parse_triage_map(triage_response)
|
||
if not triage_map:
|
||
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
|
||
await _finalize_run(run_log, status="error", errors=errors)
|
||
return
|
||
|
||
logger.info(
|
||
"agent_runner: run=%s triage complete groups=%d total_files=%d",
|
||
run_id,
|
||
len(triage_map),
|
||
sum(len(files) for files in triage_map.values()),
|
||
)
|
||
|
||
# ── Phase 2: Processing (per group) ─────────────────────────
|
||
processing_tools = _build_processing_tools(config.data_types)
|
||
|
||
for group_key, file_paths in triage_map.items():
|
||
if not file_paths:
|
||
continue
|
||
|
||
logger.info(
|
||
"agent_runner: run=%s phase=processing group=%s files=%d",
|
||
run_id,
|
||
group_key,
|
||
len(file_paths),
|
||
)
|
||
|
||
# Build project context for the LLM.
|
||
if group_key == "standalone":
|
||
project_context = "These files are not associated with any existing project."
|
||
else:
|
||
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
|
||
|
||
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
|
||
|
||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
||
data_types=", ".join(config.data_types),
|
||
project_context=project_context,
|
||
file_list=file_list_str,
|
||
custom_prompt_section=custom_section,
|
||
)
|
||
|
||
items_processed += len(file_paths)
|
||
|
||
try:
|
||
result_text = await _run_agent_with_tools(
|
||
system_prompt=processing_prompt,
|
||
user_message="Process the listed files now.",
|
||
tools=processing_tools,
|
||
max_steps=_MAX_PROCESSING_STEPS,
|
||
)
|
||
logger.info(
|
||
"agent_runner: run=%s group=%s processing_result=%s",
|
||
run_id,
|
||
group_key,
|
||
result_text[:500],
|
||
)
|
||
# Count created items by scanning tool call results.
|
||
# The tools themselves handle creation; we estimate from the
|
||
# summary. A more precise count would require intercepting
|
||
# tool results, but the summary is sufficient for the run log.
|
||
except Exception as exc:
|
||
errors.append(f"Processing error for group '{group_key}': {exc}")
|
||
logger.error(
|
||
"agent_runner: run=%s group=%s processing failed: %s",
|
||
run_id,
|
||
group_key,
|
||
exc,
|
||
)
|
||
|
||
except Exception as exc:
|
||
errors.append(f"Agent run failed: {exc}")
|
||
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
|
||
finally:
|
||
clear_client_executor()
|
||
|
||
# ── Finalise ────────────────────────────────────────────────────
|
||
if errors and items_processed == 0:
|
||
final_status = "error"
|
||
elif errors:
|
||
final_status = "partial"
|
||
else:
|
||
final_status = "success"
|
||
|
||
await _finalize_run(
|
||
run_log,
|
||
status=final_status,
|
||
items_processed=items_processed,
|
||
items_created=items_created,
|
||
errors=errors,
|
||
update_config_last_run=False,
|
||
config_id=config.id,
|
||
config_type="local",
|
||
)
|
||
logger.info(
|
||
"agent_runner: run=%s done status=%s processed=%d errors=%d",
|
||
run_id,
|
||
final_status,
|
||
items_processed,
|
||
len(errors),
|
||
)
|
||
|
||
|
||
# ── Cloud agent runner ─────────────────────────────────────────────────────
|
||
|
||
# Default lookback window when an agent has never run before.
|
||
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
|
||
|
||
|
||
async def run_cloud_agent(
|
||
user_id: str,
|
||
config: CloudAgentConfig,
|
||
run_log: AgentRunLog,
|
||
device_mgr: DeviceConnectionManager,
|
||
) -> None:
|
||
"""Execute a cloud connector agent run end-to-end.
|
||
|
||
Steps:
|
||
|
||
1. Verify the user's device is online — results are pushed to Electron
|
||
via WS tool-call frames. If no device is connected, abort.
|
||
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
|
||
3. Instantiate the provider client (Gmail or MS Graph).
|
||
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
|
||
the first run) applying ``config.filter_config`` filters.
|
||
5. For each message/email call the LLM to extract structured items.
|
||
6. Push each item to Electron as an ``insert`` tool-call.
|
||
7. If the provider refreshed its access token, re-encrypt and write it
|
||
back to ``config.oauth_token_encrypted``.
|
||
8. Persist the run outcome via ``_finalize_run``.
|
||
"""
|
||
run_id = run_log.id
|
||
|
||
# ── 1. Device online check ─────────────────────────────────────────
|
||
if not device_mgr.is_online(user_id):
|
||
logger.info(
|
||
"agent_runner: skip cloud run=%s — no device online for user=%s",
|
||
run_id,
|
||
user_id,
|
||
)
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=["No connected device — cloud agent results cannot be delivered"],
|
||
)
|
||
return
|
||
|
||
# ── 2. Decrypt OAuth token ─────────────────────────────────────────
|
||
from app.integrations import decrypt_token, encrypt_token, get_provider
|
||
|
||
if not config.oauth_token_encrypted:
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=[f"No OAuth token stored for cloud agent '{config.name}'"],
|
||
)
|
||
return
|
||
|
||
try:
|
||
credentials_info = decrypt_token(config.oauth_token_encrypted)
|
||
except ValueError as exc:
|
||
logger.error("agent_runner: failed to decrypt OAuth token for agent %s: %s", config.id, exc)
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=[f"Failed to decrypt OAuth token: {exc}"],
|
||
)
|
||
return
|
||
|
||
# ── 3. Instantiate provider client ────────────────────────────────
|
||
try:
|
||
provider = get_provider(config.provider, credentials_info)
|
||
except ValueError as exc:
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=[str(exc)],
|
||
)
|
||
return
|
||
|
||
# ── 4. Fetch messages ─────────────────────────────────────────────
|
||
since: datetime | None = config.last_run_at
|
||
if since is None:
|
||
since = datetime.now(timezone.utc) - timedelta(days=_CLOUD_DEFAULT_LOOKBACK_DAYS)
|
||
if since.tzinfo is None:
|
||
since = since.replace(tzinfo=timezone.utc)
|
||
|
||
errors: list[str] = []
|
||
items_processed = 0
|
||
items_created = 0
|
||
|
||
try:
|
||
if config.provider == "gmail":
|
||
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
|
||
filter_config=config.filter_config,
|
||
since=since,
|
||
)
|
||
elif config.provider == "outlook":
|
||
raw_messages = await provider.fetch_emails( # type: ignore[union-attr]
|
||
filter_config=config.filter_config,
|
||
since=since,
|
||
)
|
||
elif config.provider == "teams":
|
||
raw_messages = await provider.fetch_messages( # type: ignore[union-attr]
|
||
filter_config=config.filter_config,
|
||
since=since,
|
||
)
|
||
else:
|
||
raw_messages = []
|
||
except RuntimeError as exc:
|
||
logger.error(
|
||
"agent_runner: provider fetch failed for cloud agent %s: %s",
|
||
config.id,
|
||
exc,
|
||
)
|
||
await _finalize_run(
|
||
run_log,
|
||
status="error",
|
||
errors=[f"Provider fetch failed: {exc}"],
|
||
update_config_last_run=True,
|
||
config_id=config.id,
|
||
config_type="cloud",
|
||
)
|
||
return
|
||
|
||
logger.info(
|
||
"agent_runner: cloud agent %s fetched %d item(s) from %s for user=%s",
|
||
config.id,
|
||
len(raw_messages),
|
||
config.provider,
|
||
user_id,
|
||
)
|
||
|
||
# ── 5–6. Extract + insert via LLM with tools ─────────────────────
|
||
executor = _make_agent_executor(user_id, device_mgr)
|
||
set_client_executor(executor)
|
||
|
||
try:
|
||
processing_tools = _build_processing_tools(config.data_types)
|
||
custom_section = ""
|
||
if config.prompt_template:
|
||
custom_section = f"User instructions:\n{config.prompt_template}"
|
||
|
||
for msg in raw_messages:
|
||
content_text = msg.as_text
|
||
if not content_text:
|
||
continue
|
||
items_processed += 1
|
||
|
||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
||
data_types=", ".join(config.data_types),
|
||
project_context="Determine the appropriate project from the message context.",
|
||
file_list=f"Message from {config.provider} (id: {msg.id})",
|
||
custom_prompt_section=custom_section,
|
||
)
|
||
|
||
try:
|
||
await _run_agent_with_tools(
|
||
system_prompt=processing_prompt,
|
||
user_message=f"Process this message content:\n\n{content_text[:8000]}",
|
||
tools=processing_tools,
|
||
max_steps=_MAX_PROCESSING_STEPS,
|
||
)
|
||
except Exception as exc:
|
||
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
|
||
finally:
|
||
clear_client_executor()
|
||
|
||
# ── 7. Persist refreshed token (if any) ───────────────────────────
|
||
refreshed = getattr(provider, "refreshed_credentials", None)
|
||
if refreshed:
|
||
try:
|
||
new_encrypted = encrypt_token(refreshed)
|
||
async with async_session() as db:
|
||
cfg_result = await db.execute(
|
||
select(CloudAgentConfig).where(CloudAgentConfig.id == config.id)
|
||
)
|
||
cfg_row = cfg_result.scalar_one_or_none()
|
||
if cfg_row:
|
||
cfg_row.oauth_token_encrypted = new_encrypted
|
||
await db.commit()
|
||
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
|
||
except Exception as exc:
|
||
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
|
||
|
||
# ── 8. Finalise ────────────────────────────────────────────────────
|
||
if errors and items_created == 0:
|
||
final_status = "error"
|
||
elif errors:
|
||
final_status = "partial"
|
||
else:
|
||
final_status = "success"
|
||
|
||
await _finalize_run(
|
||
run_log,
|
||
status=final_status,
|
||
items_processed=items_processed,
|
||
items_created=items_created,
|
||
errors=errors,
|
||
update_config_last_run=True,
|
||
config_id=config.id,
|
||
config_type="cloud",
|
||
)
|
||
logger.info(
|
||
"agent_runner: cloud run=%s done status=%s processed=%d created=%d errors=%d",
|
||
run_id,
|
||
final_status,
|
||
items_processed,
|
||
items_created,
|
||
len(errors),
|
||
)
|
||
|
||
|
||
# ── Pending-run trigger ─────────────────────────────────────────────────────
|
||
|
||
|
||
async def trigger_pending_runs(
|
||
user_id: str,
|
||
device_id: str,
|
||
device_mgr: DeviceConnectionManager,
|
||
) -> None:
|
||
"""Dispatch any overdue agent runs after an Electron device connects.
|
||
|
||
Called as a background task from the device WS endpoint on ``device_hello``.
|
||
|
||
Scheduling rules:
|
||
|
||
* **Local agents**: only triggered when ``config.device_id == device_id``.
|
||
* **Cloud agents**: triggered on any connected device (no device binding).
|
||
* Runs execute **sequentially** to avoid flooding the WS connection.
|
||
"""
|
||
logger.info(
|
||
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
|
||
user_id,
|
||
device_id,
|
||
)
|
||
return
|
||
|
||
|
||
# ── Internal helper ─────────────────────────────────────────────────────────
|
||
|
||
|
||
async def _finalize_run(
|
||
run_log: AgentRunLog,
|
||
*,
|
||
status: str,
|
||
items_processed: int = 0,
|
||
items_created: int = 0,
|
||
errors: list[str] | None = None,
|
||
update_config_last_run: bool = False,
|
||
config_id: str | None = None,
|
||
config_type: str | None = None,
|
||
) -> None:
|
||
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
|
||
|
||
Uses a fresh DB session so this is safe to call from background tasks
|
||
after the original request session has closed.
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
try:
|
||
async with async_session() as db:
|
||
managed = await db.merge(run_log)
|
||
managed.status = status
|
||
managed.items_processed = items_processed
|
||
managed.items_created = items_created
|
||
managed.errors = errors or []
|
||
managed.completed_at = now
|
||
|
||
if update_config_last_run and config_id:
|
||
if config_type == "local":
|
||
cfg_result = await db.execute(
|
||
select(LocalAgentConfig).where(LocalAgentConfig.id == config_id)
|
||
)
|
||
cfg = cfg_result.scalar_one_or_none()
|
||
if cfg:
|
||
cfg.last_run_at = now
|
||
elif config_type == "cloud":
|
||
cfg_result = await db.execute(
|
||
select(CloudAgentConfig).where(CloudAgentConfig.id == config_id)
|
||
)
|
||
cfg = cfg_result.scalar_one_or_none()
|
||
if cfg:
|
||
cfg.last_run_at = now
|
||
|
||
await db.commit()
|
||
except Exception as exc:
|
||
logger.error(
|
||
"agent_runner: failed to finalize run_log=%s: %s", run_log.id, exc
|
||
)
|