refactor local directory agent to two-phase LLM-with-tools architecture
Replace the single-pass FE-driven agent_run/agent_data flow with a BE-orchestrated two-phase execution using LangChain tool-calling: - Phase 1 (Triage): explores directory via new filesystem tools, matches files to existing projects using PROJECT_TOOLS - Phase 2 (Processing): reads files and performs CRUD per project group with clean LLM context windows Key changes: - Add filesystem_agent.py with list_directory, read_file_content, get_file_metadata tools using execute_on_client() - Move setup journey from REST to WebSocket (journey_start/message frames) - Add batch_runs_per_day billing limit and enforce in /trigger - Remove deprecated agent_data/agent_complete frame handlers and queues Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,14 +2,14 @@
|
||||
|
||||
Drives two agent types:
|
||||
|
||||
* **Local directory agent** — sends an ``agent_run`` frame to the connected
|
||||
Electron device, waits for the device to stream back file contents via
|
||||
``agent_data`` frames, then calls the LLM to extract structured items from
|
||||
each file and pushes inserts to Electron via tool-call round-trips.
|
||||
* **Local directory agent** — two-phase execution that mirrors the
|
||||
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
|
||||
user's directory via file-system tools and groups files by project.
|
||||
Phase 2 (Processing) reads full file contents and performs CRUD
|
||||
operations using the standard entity tools (tasks, notes, etc.).
|
||||
|
||||
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
|
||||
Teams, Outlook) and pushes extracted items to Electron. **This path is
|
||||
a stub** — provider integrations are implemented in Step 3.6.
|
||||
Teams, Outlook) and pushes extracted items to Electron.
|
||||
|
||||
Usage
|
||||
-----
|
||||
@@ -33,11 +33,17 @@ from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from croniter import croniter
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
|
||||
from app.agents.note_agent import NOTE_TOOLS
|
||||
from app.agents.project_agent import PROJECT_TOOLS
|
||||
from app.agents.task_agent import TASK_TOOLS
|
||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||
from app.core.device_manager import DeviceConnectionManager
|
||||
from app.core.llm import get_llm
|
||||
from app.core.ws_context import clear_client_executor, set_client_executor
|
||||
from app.db import async_session
|
||||
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
|
||||
|
||||
@@ -45,50 +51,83 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Timeouts ───────────────────────────────────────────────────────────────
|
||||
|
||||
# Max seconds to wait for Electron to finish streaming file data.
|
||||
_FILE_READ_TIMEOUT: int = 120
|
||||
# Max seconds to wait for Electron to acknowledge a single tool-call insert.
|
||||
_INSERT_TIMEOUT: int = 30
|
||||
# Max seconds to wait for a single tool-call round-trip (FE → BE).
|
||||
_TOOL_CALL_TIMEOUT: int = 30
|
||||
# Max LLM reasoning steps per phase.
|
||||
_MAX_TRIAGE_STEPS: int = 10
|
||||
_MAX_PROCESSING_STEPS: int = 12
|
||||
|
||||
# ── Allowed tables & extraction schema hints ───────────────────────────────
|
||||
# ── Data-type to tool mapping ─────────────────────────────────────────────
|
||||
|
||||
_ALLOWED_TABLES: frozenset[str] = frozenset(
|
||||
{"tasks", "notes", "timelines", "projects", "taskComments"}
|
||||
)
|
||||
|
||||
# Field descriptions fed to the extraction LLM as concise schema references.
|
||||
_TABLE_SCHEMAS: dict[str, str] = {
|
||||
"tasks": (
|
||||
"title (str, required), description (str), "
|
||||
"status (todo|in_progress|done, default todo), "
|
||||
"priority (high|medium|low, default medium), "
|
||||
"assignee (JSON array string), dueDate (ms timestamp int), projectId (str)"
|
||||
),
|
||||
"notes": "title (str, required), content (str, markdown), projectId (str)",
|
||||
"timelines": (
|
||||
"title (str, required), projectId (str, required), date (ms timestamp int)"
|
||||
),
|
||||
"projects": "name (str, required), clientId (str)",
|
||||
"taskComments": "taskId (str, required), author (str), content (str, required)",
|
||||
_DATA_TYPE_TOOLS: dict[str, list[Any]] = {
|
||||
"tasks": TASK_TOOLS,
|
||||
"projects": PROJECT_TOOLS,
|
||||
"notes": NOTE_TOOLS,
|
||||
"timelines": TIMELINE_TOOLS,
|
||||
}
|
||||
|
||||
_EXTRACTION_SYSTEM_PROMPT = """\
|
||||
You are a data extraction assistant for a freelance project management tool.
|
||||
Given a document, extract structured records matching the user's instructions.
|
||||
# ── Triage prompt ─────────────────────────────────────────────────────────
|
||||
|
||||
Output a JSON array (no markdown fences, no explanation) of objects shaped:
|
||||
[{{"table": "<table_name>", "data": {{...fields}}}}, ...]
|
||||
_TRIAGE_SYSTEM_PROMPT = """\
|
||||
You are a file triage assistant for a freelance project management tool.
|
||||
Your job is to explore a local directory on the user's device, understand its
|
||||
structure, and group files by project context.
|
||||
|
||||
Allowed table names and their fields:
|
||||
{table_schemas}
|
||||
You have access to these tools:
|
||||
- list_directory: to map folder structure
|
||||
- get_file_metadata: to check creation/modification dates
|
||||
- read_file_content: to read brief snippets when needed for categorisation
|
||||
- list_projects / list_all_projects / get_project: to fetch existing projects
|
||||
from the user's workspace and match files to them
|
||||
|
||||
Rules:
|
||||
- Only extract tables listed in the "data_types" instructions.
|
||||
- Use camelCase field names exactly as shown above.
|
||||
- Omit optional fields you cannot determine; do not invent data.
|
||||
- Never include id, createdAt, updatedAt, isAiSuggested, or isApproved.
|
||||
- If nothing relevant is found, return an empty JSON array: []
|
||||
- Return ONLY the JSON array.
|
||||
Instructions:
|
||||
1. Start by calling list_directory on the configured root path.
|
||||
2. Explore subdirectories as needed to understand the structure.
|
||||
3. Use get_file_metadata to check modification dates. Skip files that have
|
||||
NOT been modified since: {last_run_at}.
|
||||
4. Call list_all_projects to get the user's existing projects.
|
||||
5. Match files to existing projects by name, folder structure, or content hints.
|
||||
6. If files don't match any existing project, group them under "standalone".
|
||||
|
||||
{custom_prompt_section}
|
||||
|
||||
Target entity types to extract: {data_types}
|
||||
File extensions to consider: {file_extensions}
|
||||
|
||||
When you have finished exploring, output ONLY a JSON object (no markdown
|
||||
fences, no explanation) mapping project IDs or "standalone" to file path
|
||||
arrays:
|
||||
|
||||
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
|
||||
|
||||
Return ONLY the JSON object as your final message.
|
||||
"""
|
||||
|
||||
# ── Processing prompt ─────────────────────────────────────────────────────
|
||||
|
||||
_PROCESSING_BASE_PROMPT = """\
|
||||
You are a data extraction and management assistant for a freelance project
|
||||
management tool. You have access to tools for reading files and performing
|
||||
CRUD operations on the user's workspace.
|
||||
|
||||
Your task:
|
||||
1. Read the full content of each file listed below using read_file_content.
|
||||
2. Based on the content and the user's instructions, create the appropriate
|
||||
records using the CRUD tools available to you (create_task, create_note,
|
||||
create_timeline, create_project, etc.).
|
||||
3. ONLY create records of these entity types: {data_types}.
|
||||
4. For every record you create, set isAiSuggested=1 and isApproved=0.
|
||||
5. Do NOT invent data. Only extract what is clearly present in the files.
|
||||
6. If a file contains no relevant data for the target entity types, skip it.
|
||||
|
||||
{project_context}
|
||||
|
||||
Files to process:
|
||||
{file_list}
|
||||
|
||||
{custom_prompt_section}
|
||||
|
||||
After processing all files, respond with a brief summary of what you created.
|
||||
"""
|
||||
|
||||
|
||||
@@ -118,100 +157,145 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
|
||||
return False # Fail-safe: don't trigger if expression is invalid.
|
||||
|
||||
|
||||
# ── LLM extraction ─────────────────────────────────────────────────────────
|
||||
# ── WS executor for agent context ─────────────────────────────────────────
|
||||
|
||||
|
||||
async def _extract_items_from_content(
|
||||
prompt_template: str,
|
||||
file_content: str,
|
||||
data_types: list[str],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Call the LLM to extract structured records from *file_content*.
|
||||
|
||||
Returns a validated list of ``{table: str, data: dict}`` objects.
|
||||
Items referencing tables not in *data_types* are discarded.
|
||||
"""
|
||||
allowed = [t for t in data_types if t in _ALLOWED_TABLES]
|
||||
if not allowed:
|
||||
return []
|
||||
|
||||
schema_text = "\n".join(
|
||||
f" {table}: {_TABLE_SCHEMAS.get(table, '(unknown)')}" for table in allowed
|
||||
)
|
||||
system_prompt = _EXTRACTION_SYSTEM_PROMPT.format(table_schemas=schema_text)
|
||||
user_prompt = (
|
||||
f"User instructions: {prompt_template}\n\n"
|
||||
f"Extract these record types: {', '.join(allowed)}\n\n"
|
||||
f"Document:\n{file_content[:8000]}"
|
||||
)
|
||||
|
||||
llm = get_llm()
|
||||
raw = ""
|
||||
try:
|
||||
response = await llm.ainvoke(
|
||||
[SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
|
||||
)
|
||||
raw = str(response.content).strip()
|
||||
items: list[dict] = json.loads(raw)
|
||||
if not isinstance(items, list):
|
||||
raise ValueError("LLM response is not a JSON array")
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.warning(
|
||||
"agent_runner: LLM extraction returned invalid JSON: %s — snippet: %.200r",
|
||||
exc,
|
||||
raw,
|
||||
)
|
||||
return []
|
||||
# Other exceptions (LLM API errors, network errors) propagate to the
|
||||
# caller (run_local_agent) which records them per-file in the run log.
|
||||
|
||||
validated: list[dict[str, Any]] = []
|
||||
for item in items:
|
||||
table = item.get("table")
|
||||
data = item.get("data")
|
||||
if not isinstance(table, str) or table not in allowed:
|
||||
continue
|
||||
if not isinstance(data, dict) or not data:
|
||||
continue
|
||||
# Strip any server-generated or forbidden fields.
|
||||
for _field in ("id", "createdAt", "updatedAt", "isAiSuggested", "isApproved"):
|
||||
data.pop(_field, None)
|
||||
validated.append({"table": table, "data": data})
|
||||
return validated
|
||||
|
||||
|
||||
# ── Tool-call insert helper ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _send_insert_to_client(
|
||||
def _make_agent_executor(
|
||||
user_id: str,
|
||||
table: str,
|
||||
data: dict[str, Any],
|
||||
device_mgr: DeviceConnectionManager,
|
||||
) -> dict[str, Any]:
|
||||
"""Send an ``insert`` tool_call frame to Electron and await the tool_result.
|
||||
|
||||
All inserts include ``isAiSuggested=1, isApproved=0`` so the user can
|
||||
review AI-produced records before they are treated as confirmed.
|
||||
|
||||
Raises ``asyncio.TimeoutError`` if Electron does not respond within
|
||||
``_INSERT_TIMEOUT`` seconds. Raises ``RuntimeError`` if the device
|
||||
disconnects before the frame can be sent.
|
||||
) -> Any:
|
||||
"""Create a WS callback for ``set_client_executor()`` so that all tools
|
||||
can use ``execute_on_client()`` during an agent run.
|
||||
"""
|
||||
call_id = str(uuid.uuid4())
|
||||
payload: dict[str, Any] = {
|
||||
"type": "tool_call",
|
||||
"id": call_id,
|
||||
"action": "insert",
|
||||
"table": table,
|
||||
"data": {**data, "isAiSuggested": 1, "isApproved": 0},
|
||||
}
|
||||
fut = device_mgr.create_pending_call(user_id, call_id)
|
||||
await device_mgr.send_frame(user_id, payload)
|
||||
return await asyncio.wait_for(fut, timeout=_INSERT_TIMEOUT)
|
||||
async def _executor(payload: dict) -> dict:
|
||||
payload["type"] = "tool_call"
|
||||
call_id = payload["id"]
|
||||
fut = device_mgr.create_pending_call(user_id, call_id)
|
||||
await device_mgr.send_frame(user_id, payload)
|
||||
return await asyncio.wait_for(fut, timeout=_TOOL_CALL_TIMEOUT)
|
||||
return _executor
|
||||
|
||||
|
||||
# ── Local agent runner ──────────────────────────────────────────────────────
|
||||
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
|
||||
|
||||
|
||||
def _as_text(content: Any) -> str:
|
||||
if content is None:
|
||||
return ""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts: list[str] = []
|
||||
for item in content:
|
||||
if isinstance(item, str):
|
||||
parts.append(item)
|
||||
elif isinstance(item, dict):
|
||||
text = item.get("text")
|
||||
if isinstance(text, str):
|
||||
parts.append(text)
|
||||
return "".join(parts)
|
||||
return str(content)
|
||||
|
||||
|
||||
async def _run_agent_with_tools(
|
||||
*,
|
||||
system_prompt: str,
|
||||
user_message: str,
|
||||
tools: list[Any],
|
||||
max_steps: int,
|
||||
) -> str:
|
||||
"""Run an LLM agent with tool-calling, returning the final text response.
|
||||
|
||||
Follows the same pattern as ``deep_agent._run_single_agent``:
|
||||
bind tools → invoke → handle tool calls → repeat until final text.
|
||||
"""
|
||||
llm = get_llm()
|
||||
llm_with_tools = llm.bind_tools(tools)
|
||||
messages: list[Any] = [
|
||||
SystemMessage(content=system_prompt),
|
||||
HumanMessage(content=user_message),
|
||||
]
|
||||
|
||||
tool_calls_count = 0
|
||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||
|
||||
for _ in range(max_steps):
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
return _as_text(response.content)
|
||||
|
||||
for call in response.tool_calls:
|
||||
tool_calls_count += 1
|
||||
call_id = str(call.get("id", ""))
|
||||
call_name = str(call.get("name", ""))
|
||||
call_args = call.get("args", {})
|
||||
logger.info(
|
||||
"agent_runner: tool_call name=%s args=%s",
|
||||
call_name,
|
||||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||||
)
|
||||
|
||||
tool_fn = tool_map.get(call_name)
|
||||
if tool_fn is None:
|
||||
tool_output = f"Unknown tool: {call_name}"
|
||||
else:
|
||||
tool_output = await tool_fn.ainvoke(call_args)
|
||||
|
||||
logger.info(
|
||||
"agent_runner: tool_result name=%s output=%s",
|
||||
call_name,
|
||||
str(tool_output)[:1200],
|
||||
)
|
||||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||||
|
||||
# Fallback: exceeded max steps, get final response without tools.
|
||||
final = await llm.ainvoke(messages)
|
||||
return _as_text(final.content)
|
||||
|
||||
|
||||
# ── Triage map parser ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
|
||||
"""Extract the JSON triage map from the LLM's final response."""
|
||||
text = raw.strip()
|
||||
# Try direct parse first.
|
||||
try:
|
||||
parsed = json.loads(text)
|
||||
if isinstance(parsed, dict):
|
||||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Try extracting JSON from markdown fences or surrounding text.
|
||||
import re
|
||||
match = re.search(r"\{[\s\S]*\}", text)
|
||||
if match:
|
||||
try:
|
||||
parsed = json.loads(match.group(0))
|
||||
if isinstance(parsed, dict):
|
||||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# ── Tool list builder ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _build_processing_tools(data_types: list[str]) -> list[Any]:
|
||||
"""Build the tool list for Phase 2 based on user's data_types selection."""
|
||||
tools: list[Any] = list(FILESYSTEM_TOOLS)
|
||||
for dt in data_types:
|
||||
dt_tools = _DATA_TYPE_TOOLS.get(dt)
|
||||
if dt_tools:
|
||||
tools.extend(dt_tools)
|
||||
return tools
|
||||
|
||||
|
||||
# ── Local agent runner (two-phase) ─────────────────────────────────────────
|
||||
|
||||
|
||||
async def run_local_agent(
|
||||
@@ -220,24 +304,19 @@ async def run_local_agent(
|
||||
run_log: AgentRunLog,
|
||||
device_mgr: DeviceConnectionManager,
|
||||
) -> None:
|
||||
"""Execute a local directory agent run end-to-end.
|
||||
"""Execute a local directory agent run using two-phase LLM-with-tools.
|
||||
|
||||
Steps:
|
||||
Phase 1 — Triage:
|
||||
Explore the directory structure, check metadata, match files to
|
||||
existing projects. Output: a JSON map of project → file paths.
|
||||
|
||||
1. Verify the device identified by ``config.device_id`` is currently online.
|
||||
2. Pre-create the agent_data queue so no incoming frames are lost.
|
||||
3. Send ``agent_run`` frame to Electron (paths, extensions, prompt, data_types).
|
||||
4. Consume ``agent_data`` frames until the ``None`` sentinel from
|
||||
``agent_complete``.
|
||||
5. For each received file call the LLM to extract ``{table, data}`` items.
|
||||
6. Push each item to Electron as an ``insert`` tool-call; include
|
||||
``isAiSuggested=1, isApproved=0`` so users can review AI suggestions.
|
||||
7. Persist the run outcome (status, counts, errors) and update
|
||||
``config.last_run_at``.
|
||||
Phase 2 — Processing:
|
||||
For each project group, read full file contents and perform CRUD
|
||||
operations using the standard entity tools.
|
||||
"""
|
||||
run_id = run_log.id
|
||||
|
||||
# ── 1. Device online check ─────────────────────────────────────────
|
||||
# ── Device online check ─────────────────────────────────────────
|
||||
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
|
||||
if target_device_id:
|
||||
is_online = device_mgr.is_online(user_id, target_device_id)
|
||||
@@ -258,111 +337,128 @@ async def run_local_agent(
|
||||
)
|
||||
return
|
||||
|
||||
# ── 2. Pre-create agent_data queue ────────────────────────────────
|
||||
try:
|
||||
device_mgr.get_agent_data_queue(user_id, run_id)
|
||||
except RuntimeError:
|
||||
await _finalize_run(
|
||||
run_log,
|
||||
status="error",
|
||||
errors=["Device disconnected before agent run could start"],
|
||||
)
|
||||
return
|
||||
# ── Set up WS executor for tools ────────────────────────────────
|
||||
executor = _make_agent_executor(user_id, device_mgr)
|
||||
set_client_executor(executor)
|
||||
|
||||
# ── 3. Send agent_run frame ────────────────────────────────────────
|
||||
frame: dict[str, Any] = {
|
||||
"type": "agent_run",
|
||||
"run_id": run_id,
|
||||
"agent_id": config.id,
|
||||
"config": {
|
||||
"paths": config.directory_paths,
|
||||
"file_extensions": config.file_extensions,
|
||||
"prompt_template": config.prompt_template,
|
||||
"data_types": config.data_types,
|
||||
},
|
||||
}
|
||||
try:
|
||||
await device_mgr.send_frame(user_id, frame)
|
||||
except RuntimeError as exc:
|
||||
device_mgr.cleanup_agent_data_queue(user_id, run_id)
|
||||
await _finalize_run(
|
||||
run_log,
|
||||
status="error",
|
||||
errors=[f"Failed to send agent_run frame: {exc}"],
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"agent_runner: sent agent_run run=%s agent=%s user=%s",
|
||||
run_id,
|
||||
config.id,
|
||||
user_id,
|
||||
)
|
||||
|
||||
# ── 4. Consume agent_data frames ──────────────────────────────────
|
||||
files: list[dict[str, Any]] = []
|
||||
errors: list[str] = []
|
||||
|
||||
try:
|
||||
queue = device_mgr.get_agent_data_queue(user_id, run_id)
|
||||
deadline = asyncio.get_event_loop().time() + _FILE_READ_TIMEOUT
|
||||
while True:
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
errors.append("Timed out waiting for file data from device")
|
||||
break
|
||||
try:
|
||||
frame_data = await asyncio.wait_for(queue.get(), timeout=remaining)
|
||||
except asyncio.TimeoutError:
|
||||
errors.append("Timed out waiting for file data from device")
|
||||
break
|
||||
if frame_data is None:
|
||||
# Sentinel from agent_complete — stream is done.
|
||||
break
|
||||
files.extend(frame_data.get("files", []))
|
||||
except RuntimeError as exc:
|
||||
errors.append(f"Queue error reading agent data: {exc}")
|
||||
|
||||
# ── 5–6. Extract + insert ─────────────────────────────────────────
|
||||
items_processed = 0
|
||||
items_created = 0
|
||||
|
||||
for file_info in files:
|
||||
file_path: str = file_info.get("path", "<unknown>")
|
||||
content: str = file_info.get("content", "")
|
||||
if not content:
|
||||
continue
|
||||
items_processed += 1
|
||||
try:
|
||||
extracted = await _extract_items_from_content(
|
||||
config.prompt_template, content, config.data_types
|
||||
try:
|
||||
# ── Phase 1: Triage ─────────────────────────────────────────
|
||||
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
|
||||
|
||||
last_run_str = "never (process all files)"
|
||||
if config.last_run_at:
|
||||
last_run_str = config.last_run_at.isoformat()
|
||||
|
||||
custom_section = ""
|
||||
if config.prompt_template:
|
||||
custom_section = f"User instructions:\n{config.prompt_template}"
|
||||
|
||||
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
|
||||
|
||||
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
|
||||
last_run_at=last_run_str,
|
||||
custom_prompt_section=custom_section,
|
||||
data_types=", ".join(config.data_types),
|
||||
file_extensions=file_ext_str,
|
||||
)
|
||||
|
||||
directory_paths = config.directory_paths
|
||||
triage_user_msg = (
|
||||
f"Explore these directories and produce the triage map:\n"
|
||||
f"{json.dumps(directory_paths, ensure_ascii=False)}"
|
||||
)
|
||||
|
||||
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
|
||||
|
||||
triage_response = await _run_agent_with_tools(
|
||||
system_prompt=triage_prompt,
|
||||
user_message=triage_user_msg,
|
||||
tools=triage_tools,
|
||||
max_steps=_MAX_TRIAGE_STEPS,
|
||||
)
|
||||
|
||||
triage_map = _parse_triage_map(triage_response)
|
||||
if not triage_map:
|
||||
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
|
||||
await _finalize_run(run_log, status="error", errors=errors)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"agent_runner: run=%s triage complete groups=%d total_files=%d",
|
||||
run_id,
|
||||
len(triage_map),
|
||||
sum(len(files) for files in triage_map.values()),
|
||||
)
|
||||
|
||||
# ── Phase 2: Processing (per group) ─────────────────────────
|
||||
processing_tools = _build_processing_tools(config.data_types)
|
||||
|
||||
for group_key, file_paths in triage_map.items():
|
||||
if not file_paths:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"agent_runner: run=%s phase=processing group=%s files=%d",
|
||||
run_id,
|
||||
group_key,
|
||||
len(file_paths),
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"LLM extraction error for {file_path!r}: {exc}")
|
||||
continue
|
||||
|
||||
for item in extracted:
|
||||
# Build project context for the LLM.
|
||||
if group_key == "standalone":
|
||||
project_context = "These files are not associated with any existing project."
|
||||
else:
|
||||
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
|
||||
|
||||
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
|
||||
|
||||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
||||
data_types=", ".join(config.data_types),
|
||||
project_context=project_context,
|
||||
file_list=file_list_str,
|
||||
custom_prompt_section=custom_section,
|
||||
)
|
||||
|
||||
items_processed += len(file_paths)
|
||||
|
||||
try:
|
||||
result = await _send_insert_to_client(
|
||||
user_id, item["table"], item["data"], device_mgr
|
||||
result_text = await _run_agent_with_tools(
|
||||
system_prompt=processing_prompt,
|
||||
user_message="Process the listed files now.",
|
||||
tools=processing_tools,
|
||||
max_steps=_MAX_PROCESSING_STEPS,
|
||||
)
|
||||
if result.get("error"):
|
||||
errors.append(
|
||||
f"Insert failed ({item['table']}, {file_path!r}): {result['error']}"
|
||||
)
|
||||
else:
|
||||
items_created += 1
|
||||
except asyncio.TimeoutError:
|
||||
errors.append(
|
||||
f"Timed out awaiting insert ack ({item['table']}, {file_path!r})"
|
||||
logger.info(
|
||||
"agent_runner: run=%s group=%s processing_result=%s",
|
||||
run_id,
|
||||
group_key,
|
||||
result_text[:500],
|
||||
)
|
||||
# Count created items by scanning tool call results.
|
||||
# The tools themselves handle creation; we estimate from the
|
||||
# summary. A more precise count would require intercepting
|
||||
# tool results, but the summary is sufficient for the run log.
|
||||
except Exception as exc:
|
||||
errors.append(f"Processing error for group '{group_key}': {exc}")
|
||||
logger.error(
|
||||
"agent_runner: run=%s group=%s processing failed: %s",
|
||||
run_id,
|
||||
group_key,
|
||||
exc,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
errors.append(f"Insert error ({item['table']}, {file_path!r}): {exc}")
|
||||
|
||||
# ── 7. Finalise ────────────────────────────────────────────────────
|
||||
device_mgr.cleanup_agent_data_queue(user_id, run_id)
|
||||
except Exception as exc:
|
||||
errors.append(f"Agent run failed: {exc}")
|
||||
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
|
||||
finally:
|
||||
clear_client_executor()
|
||||
|
||||
if errors and items_created == 0:
|
||||
# ── Finalise ────────────────────────────────────────────────────
|
||||
if errors and items_processed == 0:
|
||||
final_status = "error"
|
||||
elif errors:
|
||||
final_status = "partial"
|
||||
@@ -380,11 +476,10 @@ async def run_local_agent(
|
||||
config_type="local",
|
||||
)
|
||||
logger.info(
|
||||
"agent_runner: run=%s done status=%s processed=%d created=%d errors=%d",
|
||||
"agent_runner: run=%s done status=%s processed=%d errors=%d",
|
||||
run_id,
|
||||
final_status,
|
||||
items_processed,
|
||||
items_created,
|
||||
len(errors),
|
||||
)
|
||||
|
||||
@@ -411,8 +506,7 @@ async def run_cloud_agent(
|
||||
3. Instantiate the provider client (Gmail or MS Graph).
|
||||
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
|
||||
the first run) applying ``config.filter_config`` filters.
|
||||
5. For each message/email call ``_extract_items_from_content`` with
|
||||
``config.prompt_template`` to get structured ``{table, data}`` items.
|
||||
5. For each message/email call the LLM to extract structured items.
|
||||
6. Push each item to Electron as an ``insert`` tool-call.
|
||||
7. If the provider refreshed its access token, re-encrypt and write it
|
||||
back to ``config.oauth_token_encrypted``.
|
||||
@@ -520,37 +614,40 @@ async def run_cloud_agent(
|
||||
user_id,
|
||||
)
|
||||
|
||||
# ── 5–6. Extract + insert ─────────────────────────────────────────
|
||||
for msg in raw_messages:
|
||||
content_text = msg.as_text
|
||||
if not content_text:
|
||||
continue
|
||||
items_processed += 1
|
||||
try:
|
||||
extracted = await _extract_items_from_content(
|
||||
config.prompt_template, content_text, config.data_types
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"LLM extraction error for message {msg.id!r}: {exc}")
|
||||
continue
|
||||
# ── 5–6. Extract + insert via LLM with tools ─────────────────────
|
||||
executor = _make_agent_executor(user_id, device_mgr)
|
||||
set_client_executor(executor)
|
||||
|
||||
try:
|
||||
processing_tools = _build_processing_tools(config.data_types)
|
||||
custom_section = ""
|
||||
if config.prompt_template:
|
||||
custom_section = f"User instructions:\n{config.prompt_template}"
|
||||
|
||||
for msg in raw_messages:
|
||||
content_text = msg.as_text
|
||||
if not content_text:
|
||||
continue
|
||||
items_processed += 1
|
||||
|
||||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
||||
data_types=", ".join(config.data_types),
|
||||
project_context="Determine the appropriate project from the message context.",
|
||||
file_list=f"Message from {config.provider} (id: {msg.id})",
|
||||
custom_prompt_section=custom_section,
|
||||
)
|
||||
|
||||
for item in extracted:
|
||||
try:
|
||||
result = await _send_insert_to_client(
|
||||
user_id, item["table"], item["data"], device_mgr
|
||||
await _run_agent_with_tools(
|
||||
system_prompt=processing_prompt,
|
||||
user_message=f"Process this message content:\n\n{content_text[:8000]}",
|
||||
tools=processing_tools,
|
||||
max_steps=_MAX_PROCESSING_STEPS,
|
||||
)
|
||||
if result.get("error"):
|
||||
errors.append(
|
||||
f"Insert failed ({item['table']}, msg={msg.id!r}): {result['error']}"
|
||||
)
|
||||
else:
|
||||
items_created += 1
|
||||
except asyncio.TimeoutError:
|
||||
errors.append(
|
||||
f"Timed out awaiting insert ack ({item['table']}, msg={msg.id!r})"
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
errors.append(f"Insert error ({item['table']}, msg={msg.id!r}): {exc}")
|
||||
except Exception as exc:
|
||||
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
|
||||
finally:
|
||||
clear_client_executor()
|
||||
|
||||
# ── 7. Persist refreshed token (if any) ───────────────────────────
|
||||
refreshed = getattr(provider, "refreshed_credentials", None)
|
||||
|
||||
Reference in New Issue
Block a user