Rewrite run_local_agent: code-based flow, concurrency guard, remove isApproved

- Replace LLM-driven triage with code-based directory scan and project fetch
- Two-step LLM approach: Step 1 classifies file→project+domains, Step 2 processes with tools
- Add domain descriptions to Step 1 prompt for better extraction accuracy
- Add _running_agents set for per-agent concurrency guard (one running instance per agent)
- Return 409 from route before DB write when agent already running
- Remove is_approved from task_agent create/update tools and system prompt
- Remove is_approved from timeline_agent create/update tools and system prompt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto Musso
2026-03-20 22:21:30 +01:00
parent 6c450805cb
commit 58bc6efd4b
4 changed files with 423 additions and 246 deletions

View File

@@ -2,11 +2,12 @@
Drives two agent types:
* **Local directory agent** — two-phase execution that mirrors the
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
user's directory via file-system tools and groups files by project.
Phase 2 (Processing) reads full file contents and performs CRUD
operations using the standard entity tools (tasks, notes, etc.).
* **Local directory agent** — two-step execution per file:
Step 1 (Classification) uses code to fetch all projects and asks the LLM
to identify which project the file belongs to and which domains are relevant.
Step 2 (Processing) fetches existing entities for that project/domains via
code and runs an LLM with tools — existing data in context enforces
update-first naturally.
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron.
@@ -43,19 +44,30 @@ from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.device_manager import DeviceConnectionManager
from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, set_client_executor
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
logger = logging.getLogger(__name__)
# ── Concurrency guard ─────────────────────────────────────────────────────
# Tracks agent IDs that currently have a run in progress.
# Prevents multiple simultaneous runs of the same agent within a single process.
_running_agents: set[str] = set()
def is_agent_running(agent_id: str) -> bool:
"""Return ``True`` if *agent_id* already has a run in progress."""
return agent_id in _running_agents
# ── Timeouts ───────────────────────────────────────────────────────────────
# Max seconds to wait for a single tool-call round-trip (FE → BE).
_TOOL_CALL_TIMEOUT: int = 30
# Max LLM reasoning steps per phase.
_MAX_TRIAGE_STEPS: int = 10
# Max LLM reasoning steps for Step 2 processing.
_MAX_PROCESSING_STEPS: int = 12
# Max directory recursion depth during scan.
_MAX_SCAN_DEPTH: int = 5
# ── Data-type to tool mapping ─────────────────────────────────────────────
@@ -66,46 +78,72 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"timelines": TIMELINE_TOOLS,
}
# ── Triage prompt ─────────────────────────────────────────────────────────
# ── Step 1: Classification prompt ─────────────────────────────────────────
_TRIAGE_SYSTEM_PROMPT = """\
You are a file triage assistant for a freelance project management tool.
Your job is to explore a local directory on the user's device, understand its
structure, and group files by project context.
_DOMAIN_DESCRIPTIONS: dict[str, str] = {
"tasks": (
"Action items, to-dos, deliverables — anything that describes work to be done, "
"assigned to someone, or tracked with a due date or status."
),
"notes": (
"Documentation, meeting notes, summaries, reference material — "
"written content meant to be read and referenced rather than acted on."
),
"timelines": (
"Project milestones, deadlines, scheduled events — "
"specific dates that mark a point in the progress of a project."
),
"projects": (
"High-level project entities — only relevant if the file clearly introduces "
"a new project or updates the scope of an existing one."
),
}
You have access to these tools:
- list_directory: to map folder structure
- get_file_metadata: to check creation/modification dates
- read_file_content: to read brief snippets when needed for categorisation
- list_projects / list_all_projects / get_project: to fetch existing projects
from the user's workspace and match files to them
_STEP1_SYSTEM_PROMPT = """\
You are a file classifier for a freelance project management tool.
Instructions:
1. Start by calling list_directory on the configured root path.
2. Explore subdirectories as needed to understand the structure.
3. Use get_file_metadata to check modification dates. Skip files that have
NOT been modified since: {last_run_at}.
4. Call list_all_projects to get the user's existing projects.
5. Match files to existing projects by name, folder structure, or content hints.
6. If files don't match any existing project, group them under "standalone".
Given a file's content and a list of existing projects, your job is to:
1. Identify which project this file belongs to (or "standalone" if none match).
2. Identify which data domains are relevant to extract from this file,
limited to the allowed domains listed below.
{custom_prompt_section}
Domain definitions (only consider domains in the allowed list):
{domain_definitions}
Target entity types to extract: {data_types}
File extensions to consider: {file_extensions}
Respond ONLY with a JSON object — no markdown, no explanation:
When you have finished exploring, output ONLY a JSON object (no markdown
fences, no explanation) mapping project IDs or "standalone" to file path
arrays:
{{"project_id": "<uuid> or standalone", "domains": ["tasks", "notes"]}}
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
Return ONLY the JSON object as your final message.
Existing projects:
{projects_list}
"""
# ── Processing prompt ─────────────────────────────────────────────────────
# ── Step 2: Processing prompt ─────────────────────────────────────────────
_PROCESSING_BASE_PROMPT = """\
_PROCESSING_SYSTEM_PROMPT = """\
You are a data extraction assistant for a freelance project management tool.
Your task is to read the file content provided and create or update records
using the available tools.
IMPORTANT — update-first rules:
The existing records below are the source of truth.
If an existing record semantically matches the content (by title, topic,
or context), update it instead of creating a duplicate.
Only create a new record when no existing match is found.
Set isAiSuggested=1 on all new records.
{existing_context}
Project context: {project_context}
Target domains: {data_types}
{custom_prompt_section}
"""
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
_CLOUD_PROCESSING_PROMPT = """\
You are a data extraction and management assistant for a freelance project
management tool.
@@ -124,26 +162,6 @@ Your task:
4. Do NOT invent data. Only extract what is clearly present in the files.
5. If a file contains no relevant data for the target entity types, skip it.
Update-first rules (apply in this order):
Tasks:
- Call list_tasks to find a match by title or context.
- If found: call add_task_comment (author "Adiuva"), update_task to set
assignees, state (ToDo / In Progress / Completed), or other fields.
- If NOT found: call create_task with isAiSuggested=1, isApproved=0.
Timelines:
- Call list_timelines to find a match by title or date.
- If found: call update_timeline to edit fields or mark it complete.
- If NOT found: call create_timeline with isAiSuggested=1, isApproved=0.
Notes:
- Call list_notes to find a match by title or topic, then get_note to
read its current content.
- If found: call update_note with the merged content.
- If NOT found: call create_note with isAiSuggested=1, isApproved=0.
Projects:
- Call list_all_projects to check for a match first.
- Only call create_project if the information is clearly significant and
no existing project matches. Set isAiSuggested=1, isApproved=0.
{project_context}
Files to process:
@@ -168,7 +186,6 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
try:
now = datetime.now(timezone.utc)
if last_run_at is None:
# Validate the expression before deciding this is overdue.
croniter(schedule_cron, now)
return True
ts = last_run_at
@@ -179,7 +196,7 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
return now >= next_run
except Exception as exc:
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
return False # Fail-safe: don't trigger if expression is invalid.
return False
# ── WS executor for agent context ─────────────────────────────────────────
@@ -207,7 +224,7 @@ def _make_agent_executor(
return _executor
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
# ── LLM tool-calling loop ─────────────────────────────────────────────────
def _as_text(content: Any) -> str:
@@ -235,11 +252,7 @@ async def _run_agent_with_tools(
tools: list[Any],
max_steps: int,
) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.
Follows the same pattern as ``deep_agent._run_single_agent``:
bind tools → invoke → handle tool calls → repeat until final text.
"""
"""Run an LLM agent with tool-calling, returning the final text response."""
llm = get_llm()
llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [
@@ -247,7 +260,6 @@ async def _run_agent_with_tools(
HumanMessage(content=user_message),
]
tool_calls_count = 0
tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps):
@@ -258,7 +270,6 @@ async def _run_agent_with_tools(
return _as_text(response.content)
for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
@@ -277,47 +288,19 @@ async def _run_agent_with_tools(
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:1200],
str(tool_output)[:200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps, get final response without tools.
final = await llm.ainvoke(messages)
return _as_text(final.content)
# ── Triage map parser ─────────────────────────────────────────────────────
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
"""Extract the JSON triage map from the LLM's final response."""
text = raw.strip()
# Try direct parse first.
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown fences or surrounding text.
import re
match = re.search(r"\{[\s\S]*\}", text)
if match:
try:
parsed = json.loads(match.group(0))
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
return None
# ── Tool list builder ─────────────────────────────────────────────────────
def _build_processing_tools(data_types: list[str]) -> list[Any]:
"""Build the tool list for Phase 2 based on user's data_types selection."""
"""Build the tool list for processing based on user's data_types selection."""
tools: list[Any] = list(FILESYSTEM_TOOLS)
for dt in data_types:
dt_tools = _DATA_TYPE_TOOLS.get(dt)
@@ -326,7 +309,223 @@ def _build_processing_tools(data_types: list[str]) -> list[Any]:
return tools
# ── Local agent runner (two-phase) ─────────────────────────────────────────
# ── Code-based directory scanner ─────────────────────────────────────────
async def _scan_directories(
paths: list[str],
extensions: list[str],
last_run_at: datetime | None,
) -> list[str]:
"""Walk directories via WS tool calls and return filtered file paths.
Recursion is capped at ``_MAX_SCAN_DEPTH``. Files are filtered by
extension (if configured) and by modification date (if ``last_run_at``
is set). Fails open: if metadata cannot be read, the file is included.
"""
all_files: list[str] = []
ext_set = {e.lstrip(".").lower() for e in extensions} if extensions else set()
async def _walk(path: str, depth: int) -> None:
if depth > _MAX_SCAN_DEPTH:
return
try:
result = await execute_on_client(action="list_directory", data={"path": path})
except Exception as exc:
logger.warning("agent_runner: list_directory failed %r: %s", path, exc)
return
for entry in result.get("entries", []):
entry_path = entry.get("path", "")
if not entry_path:
continue
if entry.get("type") == "directory":
await _walk(entry_path, depth + 1)
elif entry.get("type") == "file":
if ext_set:
dot_pos = entry_path.rfind(".")
file_ext = entry_path[dot_pos + 1:].lower() if dot_pos != -1 else ""
if file_ext not in ext_set:
continue
all_files.append(entry_path)
for root in paths:
await _walk(root, depth=0)
if last_run_at is None:
return all_files
# Filter by modification date.
last_run_ms = int(last_run_at.timestamp() * 1000)
filtered: list[str] = []
for file_path in all_files:
try:
meta = await execute_on_client(action="get_file_metadata", data={"path": file_path})
modified_at = meta.get("modifiedAt")
if modified_at is None:
filtered.append(file_path)
continue
if isinstance(modified_at, (int, float)):
mod_ms = int(modified_at)
else:
mod_ms = int(datetime.fromisoformat(str(modified_at)).timestamp() * 1000)
if mod_ms > last_run_ms:
filtered.append(file_path)
except Exception:
filtered.append(file_path) # fail-open
return filtered
# ── Code-based entity fetchers ────────────────────────────────────────────
async def _fetch_projects() -> list[dict]:
"""Fetch all projects from the Electron client via WS."""
try:
result = await execute_on_client(action="select", table="projects")
return result.get("rows", [])
except Exception as exc:
logger.warning("agent_runner: failed to fetch projects: %s", exc)
return []
_DOMAIN_TABLE: dict[str, str] = {
"tasks": "tasks",
"notes": "notes",
"timelines": "timelines",
"projects": "projects",
}
async def _fetch_domain_entities(domain: str, project_id: str) -> list[dict]:
"""Fetch existing rows for a domain, scoped to a project where applicable."""
table = _DOMAIN_TABLE.get(domain)
if not table:
return []
filters: dict[str, Any] = {}
if project_id != "standalone" and domain != "projects":
filters["projectId"] = project_id
try:
result = await execute_on_client(
action="select",
table=table,
filters=filters if filters else None,
)
return result.get("rows", [])
except Exception as exc:
logger.warning("agent_runner: failed to fetch %s: %s", domain, exc)
return []
def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
"""Format existing entity rows as a readable context block for the LLM.
Includes enough detail per record for the LLM to make a confident
update-vs-create decision without overwhelming the context.
Note content is truncated to 200 chars to stay within token budget.
"""
if not rows:
return f"No existing {domain}."
lines: list[str] = []
for r in rows:
if domain == "tasks":
desc = r.get("description") or ""
desc_part = f"{desc[:120]}" if desc else ""
assignee = r.get("assignee") or r.get("assignees") or ""
due = r.get("dueDate") or r.get("due_date") or ""
meta = ", ".join(filter(None, [
f"priority: {r.get('priority', '')}" if r.get("priority") else "",
f"assignee: {assignee}" if assignee else "",
f"due: {due}" if due else "",
]))
lines.append(
f" - [{r.get('status', '?')}] {r.get('title', '')}{desc_part}"
f" ({meta}, id: {r['id']})"
)
elif domain == "notes":
snippet = (r.get("content") or "")[:200].replace("\n", " ")
snippet_part = f"\n Preview: {snippet}" if snippet else ""
lines.append(
f" - {r.get('title', '')} (id: {r['id']}){snippet_part}"
)
elif domain == "timelines":
lines.append(
f" - {r.get('title', '')} date={r.get('date', '')} (id: {r['id']})"
)
elif domain == "projects":
summary = (r.get("aiSummary") or r.get("ai_summary") or "")[:120]
summary_part = f"{summary}" if summary else ""
lines.append(
f" - {r.get('name', '')} [{r.get('status', '')}]{summary_part}"
f" (id: {r['id']})"
)
return f"Existing {domain}:\n" + "\n".join(lines)
# ── Step 1: LLM file classifier ───────────────────────────────────────────
async def _classify_file(
file_path: str,
file_content: str,
projects: list[dict],
config_data_types: list[str],
) -> tuple[str, list[str]]:
"""Call the LLM to classify a file by project and relevant domains.
Returns ``(project_id_or_"standalone", domains)``.
Falls back to ``("standalone", config_data_types)`` on any error.
"""
fallback = ("standalone", list(config_data_types))
if not file_content.strip():
return fallback
projects_list = "\n".join(
f" - {p.get('name', '')} (id: {p['id']}, status: {p.get('status', '')})"
for p in projects
) or " (none — all files are standalone)"
domain_definitions = "\n".join(
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}"
for d in config_data_types
if d in _DOMAIN_DESCRIPTIONS
)
system = _STEP1_SYSTEM_PROMPT.format(
domain_definitions=domain_definitions,
projects_list=projects_list,
)
llm = get_llm()
try:
response = await llm.ainvoke([
SystemMessage(content=system),
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
])
raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
parsed = json.loads(raw.strip())
project_id: str = str(parsed.get("project_id") or "standalone")
domains: list[str] = [
d for d in parsed.get("domains", [])
if d in config_data_types
]
if not domains:
domains = list(config_data_types)
return project_id, domains
except Exception as exc:
logger.warning(
"agent_runner: step1 classification failed for %r: %s", file_path, exc
)
return fallback
# ── Local agent runner (two-step per file) ────────────────────────────────
async def run_local_agent(
@@ -336,24 +535,28 @@ async def run_local_agent(
device_mgr: DeviceConnectionManager,
run_context: dict | None = None,
) -> None:
"""Execute a local directory agent run using two-phase LLM-with-tools.
"""Execute a local directory agent run using a two-step approach per file.
Phase 1 — Triage:
Explore the directory structure, check metadata, match files to
existing projects. Output: a JSON map of project → file paths.
Step 1 — Classification (code + 1 LLM call per file, no tools):
Code scans directories and fetches all projects via WS.
For each file, LLM identifies the project and relevant domains.
Phase 2 — Processing:
For each project group, read full file contents and perform CRUD
operations using the standard entity tools.
Step 2 — Processing (code + 1 LLM call per file, with tools):
Code fetches existing entities for the identified project/domains.
LLM receives file content + existing entities in context and uses
tools to update existing records or create new ones.
"""
run_id = run_log.id
agent_id = (run_context or {}).get("agent_id") or config.id
_running_agents.add(agent_id)
# ── Device online check ─────────────────────────────────────────
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
if target_device_id:
is_online = device_mgr.is_online(user_id, target_device_id)
else:
is_online = device_mgr.is_online(user_id)
is_online = (
device_mgr.is_online(user_id, target_device_id)
if target_device_id
else device_mgr.is_online(user_id)
)
if not is_online:
logger.info(
@@ -377,116 +580,112 @@ async def run_local_agent(
items_processed = 0
items_created = 0
custom_section = (
f"User instructions:\n{config.prompt_template}"
if config.prompt_template
else ""
)
try:
# ── Phase 1: Triage ─────────────────────────────────────────
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
last_run_str = "never (process all files)"
if config.last_run_at:
last_run_str = config.last_run_at.isoformat()
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
last_run_at=last_run_str,
custom_prompt_section=custom_section,
data_types=", ".join(config.data_types),
file_extensions=file_ext_str,
# ── Code: scan directories ───────────────────────────────────
logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id)
file_paths = await _scan_directories(
paths=config.directory_paths,
extensions=config.file_extensions or [],
last_run_at=config.last_run_at,
)
logger.info(
"agent_runner: run=%s found %d file(s) after filtering", run_id, len(file_paths)
)
directory_paths = config.directory_paths
triage_user_msg = (
f"Explore these directories and produce the triage map:\n"
f"{json.dumps(directory_paths, ensure_ascii=False)}"
)
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
triage_response = await _run_agent_with_tools(
system_prompt=triage_prompt,
user_message=triage_user_msg,
tools=triage_tools,
max_steps=_MAX_TRIAGE_STEPS,
)
triage_map = _parse_triage_map(triage_response)
if not triage_map:
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
await _finalize_run(run_log, status="error", errors=errors)
if not file_paths:
await _finalize_run(run_log, status="success", items_processed=0, items_created=0)
return
logger.info(
"agent_runner: run=%s triage complete groups=%d total_files=%d",
run_id,
len(triage_map),
sum(len(files) for files in triage_map.values()),
)
# ── Code: fetch all projects once ────────────────────────────
projects = await _fetch_projects()
# ── Phase 2: Processing (per group) ─────────────────────────
# ── Per-file processing ──────────────────────────────────────
processing_tools = _build_processing_tools(config.data_types)
for group_key, file_paths in triage_map.items():
if not file_paths:
continue
logger.info(
"agent_runner: run=%s phase=processing group=%s files=%d",
run_id,
group_key,
len(file_paths),
)
# Build project context for the LLM.
if group_key == "standalone":
project_context = "These files are not associated with any existing project."
else:
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context=project_context,
file_list=file_list_str,
custom_prompt_section=custom_section,
)
items_processed += len(file_paths)
for file_path in file_paths:
try:
# Read file content via code.
file_result = await execute_on_client(
action="read_file_content", data={"path": file_path}
)
file_content: str = file_result.get("content", "")
if not file_content:
logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path)
continue
items_processed += 1
# Step 1 — classify file.
project_id, domains = await _classify_file(
file_path=file_path,
file_content=file_content,
projects=projects,
config_data_types=config.data_types,
)
logger.info(
"agent_runner: run=%s file=%r → project=%s domains=%s",
run_id,
file_path,
project_id,
domains,
)
# Step 2 — fetch existing entities for this project + domains.
existing_blocks: list[str] = []
for domain in domains:
rows = await _fetch_domain_entities(domain, project_id)
existing_blocks.append(_format_entities_for_context(domain, rows))
existing_context = "\n\n".join(existing_blocks)
if project_id == "standalone":
project_context = "This file is not associated with any existing project."
else:
project_context = (
f"This file belongs to project ID: {project_id}. "
"Use this project_id when creating records."
)
system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
existing_context=existing_context,
project_context=project_context,
data_types=", ".join(domains),
custom_prompt_section=custom_section,
)
result_text = await _run_agent_with_tools(
system_prompt=processing_prompt,
user_message="Process the listed files now.",
system_prompt=system_prompt,
user_message=(
f"Process this file and extract relevant information.\n\n"
f"File: {file_path}\n\nContent:\n{file_content}"
),
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
)
logger.info(
"agent_runner: run=%s group=%s processing_result=%s",
"agent_runner: run=%s file=%r result=%s",
run_id,
group_key,
result_text[:500],
file_path,
result_text[:200],
)
# Count created items by scanning tool call results.
# The tools themselves handle creation; we estimate from the
# summary. A more precise count would require intercepting
# tool results, but the summary is sufficient for the run log.
except Exception as exc:
errors.append(f"Processing error for group '{group_key}': {exc}")
errors.append(f"Error processing '{file_path}': {exc}")
logger.error(
"agent_runner: run=%s group=%s processing failed: %s",
run_id,
group_key,
exc,
"agent_runner: run=%s file=%r failed: %s", run_id, file_path, exc
)
except Exception as exc:
errors.append(f"Agent run failed: {exc}")
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
finally:
_running_agents.discard(agent_id)
clear_client_executor()
# ── Finalise ────────────────────────────────────────────────────
@@ -503,9 +702,6 @@ async def run_local_agent(
items_processed=items_processed,
items_created=items_created,
errors=errors,
update_config_last_run=False,
config_id=config.id,
config_type="local",
)
logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d",
@@ -515,8 +711,7 @@ async def run_local_agent(
len(errors),
)
# Notify the Electron client that the run is complete so it can close
# the run record in its local SQLite.
# Notify Electron that the run is complete.
if run_context and device_mgr.is_online(user_id):
try:
await device_mgr.send_frame(user_id, {
@@ -525,12 +720,13 @@ async def run_local_agent(
"status": final_status,
})
except Exception as exc:
logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc)
logger.warning(
"agent_runner: run=%s failed to send run_complete: %s", run_id, exc
)
# ── Cloud agent runner ─────────────────────────────────────────────────────
# Default lookback window when an agent has never run before.
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
@@ -544,8 +740,7 @@ async def run_cloud_agent(
Steps:
1. Verify the user's device is online — results are pushed to Electron
via WS tool-call frames. If no device is connected, abort.
1. Verify the user's device is online.
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
3. Instantiate the provider client (Gmail or MS Graph).
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
@@ -598,11 +793,7 @@ async def run_cloud_agent(
try:
provider = get_provider(config.provider, credentials_info)
except ValueError as exc:
await _finalize_run(
run_log,
status="error",
errors=[str(exc)],
)
await _finalize_run(run_log, status="error", errors=[str(exc)])
return
# ── 4. Fetch messages ─────────────────────────────────────────────
@@ -636,9 +827,7 @@ async def run_cloud_agent(
raw_messages = []
except RuntimeError as exc:
logger.error(
"agent_runner: provider fetch failed for cloud agent %s: %s",
config.id,
exc,
"agent_runner: provider fetch failed for cloud agent %s: %s", config.id, exc
)
await _finalize_run(
run_log,
@@ -664,9 +853,11 @@ async def run_cloud_agent(
try:
processing_tools = _build_processing_tools(config.data_types)
custom_section = ""
if config.prompt_template:
custom_section = f"User instructions:\n{config.prompt_template}"
custom_section = (
f"User instructions:\n{config.prompt_template}"
if config.prompt_template
else ""
)
for msg in raw_messages:
content_text = msg.as_text
@@ -674,7 +865,7 @@ async def run_cloud_agent(
continue
items_processed += 1
processing_prompt = _PROCESSING_BASE_PROMPT.format(
processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})",
@@ -708,7 +899,11 @@ async def run_cloud_agent(
await db.commit()
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
except Exception as exc:
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
logger.warning(
"agent_runner: failed to persist refreshed token for agent %s: %s",
config.id,
exc,
)
# ── 8. Finalise ────────────────────────────────────────────────────
if errors and items_created == 0:
@@ -749,12 +944,6 @@ async def trigger_pending_runs(
"""Dispatch any overdue agent runs after an Electron device connects.
Called as a background task from the device WS endpoint on ``device_hello``.
Scheduling rules:
* **Local agents**: only triggered when ``config.device_id == device_id``.
* **Cloud agents**: triggered on any connected device (no device binding).
* Runs execute **sequentially** to avoid flooding the WS connection.
"""
logger.info(
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
@@ -778,11 +967,7 @@ async def _finalize_run(
config_id: str | None = None,
config_type: str | None = None,
) -> None:
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
Uses a fresh DB session so this is safe to call from background tasks
after the original request session has closed.
"""
"""Persist the run outcome and optionally update ``last_run_at`` on the config."""
now = datetime.now(timezone.utc)
try:
async with async_session() as db: