feat(local-agent-v2): step 2+3 — unified runner + AgentConfig schema

Step 3 (prerequisite):
- app/schemas.py: add ContentTypeConfig + AgentConfig Pydantic models
- app/models.py: add agent_config (JSON, nullable) to LocalAgentConfig
- alembic migration a3b9c0d1e2f3: ADD COLUMN agent_config

Step 2 (runner refactor):
- Remove _classify_file() and _BATCH_FILE_CLASSIFIER_PROMPT (LLM classification step)
- Add Phase A: detect_content_type + preprocess (zero LLM, per file)
- Add _UNIFIED_PROCESSING_PROMPT (hot-swappable via Langfuse "unified_processing")
- Add helper functions: _format_projects, _format_metadata, _get_extraction_rules,
  _get_no_match_behavior
- Single LLM call per file with tools (classify + extract + create)
- Fix items_created: count create_* tool calls via _tool_calls_out param
- test_agent_runner_v2.py: 10 cases (2.1-2.10) with Langfuse eval scoring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto Musso
2026-04-07 15:00:32 +02:00
parent d91c98f86d
commit fa231a3642
5 changed files with 796 additions and 260 deletions

View File

@@ -0,0 +1,31 @@
"""add agent_config to local_agent_configs
Revision ID: a3b9c0d1e2f3
Revises: 9a1f2d0b6c7e
Create Date: 2026-04-07 00:00:00.000000
"""
from __future__ import annotations
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "a3b9c0d1e2f3"
down_revision: Union[str, None] = "9a1f2d0b6c7e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
"local_agent_configs",
sa.Column("agent_config", sa.JSON(), nullable=True),
)
def downgrade() -> None:
op.drop_column("local_agent_configs", "agent_config")

View File

@@ -2,12 +2,12 @@
Drives two agent types: Drives two agent types:
* **Local directory agent** — two-step execution per file: * **Local directory agent** — V2 unified flow per file:
Step 1 (Classification) uses code to fetch all projects and asks the LLM Phase A (Detect + Preprocess, zero LLM): Python detects the content type
to identify which project the file belongs to and which domains are relevant. and strips markup/noise, producing clean text + metadata.
Step 2 (Processing) fetches existing entities for that project/domains via Phase B (Single LLM call with tools): the LLM identifies the project,
code and runs an LLM with tools — existing data in context enforces checks for duplicates via list_* tools, and creates/updates records.
update-first naturally. ``items_created`` is counted from ``create_*`` tool calls.
* **Cloud connector agent** — fetches data from third-party APIs (Gmail, * **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron. Teams, Outlook) and pushes extracted items to Electron.
@@ -29,6 +29,7 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import os
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
@@ -46,6 +47,7 @@ from app.config.settings import settings
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
from app.core.preprocessors import detect_content_type, preprocess
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
@@ -81,83 +83,38 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"timelines": TIMELINE_TOOLS, "timelines": TIMELINE_TOOLS,
} }
# ── Step 1: Classification prompt ───────────────────────────────────────── # ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ──
_DOMAIN_DESCRIPTIONS: dict[str, str] = { _UNIFIED_PROCESSING_PROMPT = """\
"tasks": (
"Action items, to-dos, deliverables — anything that describes work to be done, "
"assigned to someone, or tracked with a due date or status."
),
"notes": (
"Documentation, meeting notes, summaries, reference material — "
"written content meant to be read and referenced rather than acted on."
),
"timelines": (
"Project milestones, deadlines, scheduled events — "
"specific dates that mark a point in the progress of a project."
),
"projects": (
"High-level project entities — only relevant if the file clearly introduces "
"a new project or updates the scope of an existing one."
),
}
_BATCH_FILE_CLASSIFIER_PROMPT = """\
You are a file classifier for a freelance project management tool.
Your job is to match a file to an existing project and identify which data domains to extract.
## Project matching rules (STRICT — follow in order)
1. Search the file content for any mention of a project name, client name, acronym, or topic
that overlaps with the existing projects listed below.
2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough.
3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort
when the file has zero meaningful connection to any listed project.
4. When in doubt, pick the closest match from the list.
## Response format
Respond ONLY with a JSON object — no markdown, no explanation:
{{"project_id": "<exact id from the list below, or new>", "new_project_name": "<concise 2-5 word name, only when project_id is new>", "domains": ["tasks", "notes"]}}
## Domain definitions (only consider domains in the allowed list)
{domain_definitions}
## Existing projects
{projects_list}
"""
# ── Step 2: Processing prompt ─────────────────────────────────────────────
_BATCH_PROCESSING_PROMPT = """\
You are a data extraction assistant for a freelance project management tool. You are a data extraction assistant for a freelance project management tool.
Your task: extract structured data from the file content and persist it using the available tools. ## Your process (follow this exact order)
## Mandatory process — follow this order for EVERY item you extract ### 1. Identify the project
File: {filename}
{metadata_section}
1. READ the existing records listed below for the relevant domain. Existing projects:
2. SEARCH for a match by title, topic, or semantic similarity. {projects_list}
3. If a match exists → call the update_* tool with the existing record's id.
4. If no match exists → call the create_* tool and set isAiSuggested=1.
NEVER call create_* without first checking the existing records. Match this file to an existing project using the filename and content clues.
NEVER duplicate a record that already exists under a different wording. If no project matches, {no_match_behavior}.
## Existing records (source of truth) ### 2. Check existing records
Once you identify the project, use list_tasks / list_notes / list_timelines
(filtered by projectId) to see what already exists.
NEVER create a record that already exists under the same or similar title.
{existing_context} ### 3. Extract and create / update
{extraction_rules}
## Context ### Rules
- Set isAiSuggested=1 on every new record.
Project: {project_context} - Set projectId on every record (use the id from the project list above).
Domains to extract: {data_types} - Update existing records when a match is found by title or topic.
- Do NOT invent data — only extract what is clearly stated in the content.
{custom_prompt_section} - Target entity types: {data_types}.
{global_rules}
""" """
# ── Cloud processing prompt (kept separate for cloud agent) ─────────────── # ── Cloud processing prompt (kept separate for cloud agent) ───────────────
@@ -273,8 +230,13 @@ async def _run_agent_with_tools(
user_id: str = "", user_id: str = "",
langfuse_prompt: Any = None, langfuse_prompt: Any = None,
agent_name: str = "batch-agent", agent_name: str = "batch-agent",
_tool_calls_out: list[str] | None = None,
) -> str: ) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.""" """Run an LLM agent with tool-calling, returning the final text response.
If *_tool_calls_out* is provided, the name of every tool called during the
run is appended to it (used by the caller to count ``create_*`` calls).
"""
lf = get_langfuse() lf = get_langfuse()
llm = get_llm() llm = get_llm()
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
@@ -332,6 +294,9 @@ async def _run_agent_with_tools(
json.dumps(call_args, ensure_ascii=True)[:800], json.dumps(call_args, ensure_ascii=True)[:800],
) )
if _tool_calls_out is not None:
_tool_calls_out.append(call_name)
tool_fn = tool_map.get(call_name) tool_fn = tool_map.get(call_name)
if tool_fn is None: if tool_fn is None:
tool_output = f"Unknown tool: {call_name}" tool_output = f"Unknown tool: {call_name}"
@@ -523,99 +488,66 @@ def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
return f"Existing {domain}:\n" + "\n".join(lines) return f"Existing {domain}:\n" + "\n".join(lines)
# ── Step 1: LLM file classifier ─────────────────────────────────────────── # ── V2 helper functions ───────────────────────────────────────────────────
async def _classify_file( def _format_projects(projects: list[dict]) -> str:
file_path: str, """Format the project list for the unified system prompt."""
file_content: str, if not projects:
projects: list[dict], return " (no projects yet)"
config_data_types: list[str], lines: list[str] = []
) -> tuple[str, list[str], str | None]: for p in projects:
"""Call the LLM to classify a file by project and relevant domains.
Returns ``(project_id_or_"new", domains, new_project_name_or_None)``.
- ``project_id`` is an existing project UUID, or ``"new"`` when no match found.
- ``new_project_name`` is only set when ``project_id == "new"``.
Falls back to ``("new", config_data_types, None)`` on any error.
"""
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
if not file_content.strip():
return fallback
valid_project_ids = {p["id"] for p in projects}
def _fmt_project(p: dict) -> str:
summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip() summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip()
summary_part = f"{summary[:100]}" if summary else "" summary_part = f"{summary[:100]}" if summary else ""
return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}" lines.append(
f" - id={p['id']} | name={p.get('name', '')} | "
f"status={p.get('status', '')}{summary_part}"
)
return "\n".join(lines)
projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)"
domain_definitions = "\n".join( def _format_metadata(metadata: dict) -> str:
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}" """Format preprocessor metadata as a compact context block."""
for d in config_data_types if not metadata:
if d in _DOMAIN_DESCRIPTIONS return ""
parts: list[str] = []
for key in ("subject", "from", "to", "date"):
if metadata.get(key):
parts.append(f"{key.capitalize()}: {metadata[key]}")
# any remaining keys
for key, val in metadata.items():
if key not in ("subject", "from", "to", "date") and val:
parts.append(f"{key}: {val}")
return "\n".join(parts)
def _get_extraction_rules(agent_config: dict, content_type: str) -> str:
"""Return the extraction_prompt for *content_type* from *agent_config*.
Falls back to a generic instruction when the type is not configured.
"""
for ct in agent_config.get("content_types", []):
if ct.get("id") == content_type:
prompt = ct.get("extraction_prompt", "").strip()
if prompt:
return prompt
return (
"Extract relevant information as tasks (action items), notes "
"(informational content), or timelines (dated events)."
) )
step1_template, step1_prompt_obj = get_prompt_or_fallback(
"batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT
)
system = step1_template.format(
domain_definitions=domain_definitions,
projects_list=projects_list,
)
lf = get_langfuse() def _get_no_match_behavior(agent_config: dict) -> str:
llm = get_llm() """Derive the 'no project match' instruction from global_rules."""
classifier_messages = [ rules = agent_config.get("global_rules", [])
SystemMessage(content=system), for rule in rules:
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), lower = rule.lower()
] if "no project" in lower or "no match" in lower or "skip" in lower:
try: return rule
if lf: return "create a new project with a concise name derived from the file content"
with lf.start_as_current_observation(
as_type="generation",
name="step1-classifier",
model=settings.LLM_ROUTER_MODEL,
prompt=step1_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
parsed = json.loads(raw.strip())
raw_project_id: str = str(parsed.get("project_id") or "new")
# Reject hallucinated UUIDs — only accept ids that exist in the fetched list.
project_id = raw_project_id if raw_project_id in valid_project_ids else "new"
new_project_name: str | None = (
str(parsed["new_project_name"]).strip() or None
if project_id == "new" and parsed.get("new_project_name")
else None
)
domains: list[str] = [
d for d in parsed.get("domains", [])
if d in config_data_types
]
if not domains:
domains = list(config_data_types)
return project_id, domains, new_project_name
except Exception as exc:
logger.warning(
"agent_runner: step1 classification failed for %r: %s", file_path, exc
)
return fallback
# ── Local agent runner (two-step per file) ──────────────────────────────── # ── Local agent runner (V2 — unified per-file flow) ───────────────────────
async def run_local_agent( async def run_local_agent(
@@ -625,16 +557,17 @@ async def run_local_agent(
device_mgr: DeviceConnectionManager, device_mgr: DeviceConnectionManager,
run_context: dict | None = None, run_context: dict | None = None,
) -> None: ) -> None:
"""Execute a local directory agent run using a two-step approach per file. """Execute a local directory agent run — V2 unified flow.
Step 1 — Classification (code + 1 LLM call per file, no tools): Phase A — Detect + Preprocess (zero LLM, per file):
Code scans directories and fetches all projects via WS. Python detects the content type from filename + content patterns and
For each file, LLM identifies the project and relevant domains. runs the appropriate handler (e.g. email_html) to produce clean text
and structured metadata.
Step 2 — Processing (code + 1 LLM call per file, with tools): Phase B — Single LLM call with tools (per file):
Code fetches existing entities for the identified project/domains. One LLM call handles project identification, duplicate checking, and
LLM receives file content + existing entities in context and uses record creation/update. ``create_*`` tool calls are counted to
tools to update existing records or create new ones. produce the accurate ``items_created`` metric.
""" """
run_id = run_log.id run_id = run_log.id
agent_id = (run_context or {}).get("agent_id") or config.id agent_id = (run_context or {}).get("agent_id") or config.id
@@ -669,12 +602,8 @@ async def run_local_agent(
errors: list[str] = [] errors: list[str] = []
items_processed = 0 items_processed = 0
items_created = 0 items_created = 0
agent_config: dict = config.agent_config or {}
custom_section = ( processing_tools = _build_processing_tools(config.data_types)
f"User instructions:\n{config.prompt_template}"
if config.prompt_template
else ""
)
try: try:
# ── Code: scan directories ─────────────────────────────────── # ── Code: scan directories ───────────────────────────────────
@@ -694,114 +623,80 @@ async def run_local_agent(
# ── Code: fetch all projects once ──────────────────────────── # ── Code: fetch all projects once ────────────────────────────
projects = await _fetch_projects() projects = await _fetch_projects()
projects_block = _format_projects(projects)
# Prompt template + Langfuse version linking (hot-swappable from UI).
unified_template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
for file_path in file_paths: for file_path in file_paths:
try: try:
# Read file content via code. # ── Phase A: read + detect + preprocess ─────────────
file_result = await execute_on_client( file_result = await execute_on_client(
action="read_file_content", data={"path": file_path} action="read_file_content", data={"path": file_path}
) )
file_content: str = file_result.get("content", "") raw_content: str = file_result.get("content", "")
if not file_content: if not raw_content.strip():
logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path) logger.debug(
"agent_runner: run=%s skipping empty file %r", run_id, file_path
)
continue continue
items_processed += 1 items_processed += 1
filename = os.path.basename(file_path)
content_type = detect_content_type(filename, raw_content)
preprocessed = preprocess(content_type, raw_content)
# Step 1 — classify file.
project_id, domains, new_project_name = await _classify_file(
file_path=file_path,
file_content=file_content,
projects=projects,
config_data_types=config.data_types,
)
logger.info( logger.info(
"agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s", "agent_runner: run=%s file=%r content_type=%s clean_len=%d",
run_id, run_id, file_path, content_type, len(preprocessed.clean_text),
file_path,
project_id,
new_project_name,
domains,
) )
# Step 2 — resolve project_id via CODE, then fetch entities. # ── Phase B: single LLM call ─────────────────────────
# Project creation is NEVER delegated to the Step 2 LLM. extraction_rules = _get_extraction_rules(agent_config, content_type)
if project_id == "new": no_match_behavior = _get_no_match_behavior(agent_config)
proj_name = new_project_name or "Untitled Project" global_rules_lines = "\n".join(
try: f"- {r}" for r in agent_config.get("global_rules", [])
proj_result = await execute_on_client(
action="insert",
table="projects",
data={"name": proj_name, "clientId": None},
) )
created = proj_result.get("row", {}) metadata_section = _format_metadata(preprocessed.metadata)
effective_project_id = created.get("id", "standalone")
# Add to local list so subsequent files can match it. system_prompt = unified_template.format(
if "id" in created: filename=filename,
projects.append(created) metadata_section=metadata_section,
logger.info( projects_list=projects_block,
"agent_runner: run=%s created project %r id=%s", no_match_behavior=no_match_behavior,
run_id, proj_name, effective_project_id, extraction_rules=extraction_rules,
) global_rules=global_rules_lines,
except Exception as exc: data_types=", ".join(config.data_types),
logger.warning(
"agent_runner: run=%s failed to create project %r: %s",
run_id, proj_name, exc,
)
effective_project_id = "standalone"
proj_name = "unknown"
project_context = (
f"Project: {proj_name} (id: {effective_project_id}). "
"Always set projectId to this id on every record you create."
)
else:
effective_project_id = project_id
proj = next((p for p in projects if p["id"] == project_id), None)
proj_name = proj.get("name", project_id) if proj else project_id
project_context = (
f"Project: {proj_name} (id: {project_id}). "
"Always set projectId to this id on every record you create."
) )
# "projects" domain is never passed to Step 2 — handled above in code. user_message = (
domains = [d for d in domains if d != "projects"] f"Process this file and extract relevant information.\n\n"
f"File: {file_path}\n\n"
existing_blocks: list[str] = [] f"Content:\n{preprocessed.clean_text}"
for domain in domains:
rows = await _fetch_domain_entities(domain, effective_project_id)
existing_blocks.append(_format_entities_for_context(domain, rows))
existing_context = "\n\n".join(existing_blocks)
step2_template, step2_prompt_obj = get_prompt_or_fallback(
"batch_processing", _BATCH_PROCESSING_PROMPT
)
system_prompt = step2_template.format(
existing_context=existing_context,
project_context=project_context,
data_types=", ".join(domains),
custom_prompt_section=custom_section,
) )
processing_tools = _build_processing_tools(domains) file_tool_calls: list[str] = []
result_text = await _run_agent_with_tools( result_text = await _run_agent_with_tools(
system_prompt=system_prompt, system_prompt=system_prompt,
user_message=( user_message=user_message,
f"Process this file and extract relevant information.\n\n"
f"File: {file_path}\n\nContent:\n{file_content}"
),
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
langfuse_prompt=step2_prompt_obj, langfuse_prompt=prompt_obj,
agent_name="step2-processor", agent_name="unified-processor",
_tool_calls_out=file_tool_calls,
) )
file_created = sum(
1 for name in file_tool_calls if name.startswith("create_")
)
items_created += file_created
logger.info( logger.info(
"agent_runner: run=%s file=%r result=%s", "agent_runner: run=%s file=%r created=%d result=%s",
run_id, run_id, file_path, file_created, result_text[:200],
file_path,
result_text[:200],
) )
except Exception as exc: except Exception as exc:
@@ -833,10 +728,11 @@ async def run_local_agent(
errors=errors, errors=errors,
) )
logger.info( logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d", "agent_runner: run=%s done status=%s processed=%d created=%d errors=%d",
run_id, run_id,
final_status, final_status,
items_processed, items_processed,
items_created,
len(errors), len(errors),
) )

View File

@@ -296,6 +296,7 @@ class LocalAgentConfig(Base):
directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list) directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list) data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="") prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
agent_config: Mapped[dict | None] = mapped_column(JSON, nullable=True)
file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list) file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *") schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *")
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)

View File

@@ -273,6 +273,27 @@ class WsFloatingDomain(BaseModel):
domain: WsDomain domain: WsDomain
# ── Agent Config V2 ───────────────────────────────────────────────────
class ContentTypeConfig(BaseModel):
"""Per-type extraction config produced by the journey chatbot."""
id: str
label: str = ""
detection_hint: str = ""
preprocessing: str = "generic" # handler name: "email_html", "plain_text", ...
extraction_prompt: str
class AgentConfig(BaseModel):
"""Structured agent configuration (replaces freeform prompt_template)."""
content_types: list[ContentTypeConfig] = []
global_rules: list[str] = []
data_types: list[str] = []
# ── Agent Catalog ───────────────────────────────────────────────────── # ── Agent Catalog ─────────────────────────────────────────────────────
class AgentCatalogItem(BaseModel): class AgentCatalogItem(BaseModel):

View File

@@ -0,0 +1,587 @@
"""Tests for Local Agent V2 runner (Step 2).
Covers the unified per-file flow:
Phase A — detect + preprocess (Python, zero LLM)
Phase B — single LLM call with tools (classify + extract + create)
Test cases:
2.1 Happy path: email with action → create_task called
2.2 Happy path: email informative → create_note called
2.3 Happy path: email with date → create_timeline called
2.4 Project matching via filename → correct project_id used
2.5 Project matching via content → correct project_id used
2.6 No project match + global rule → no create_* called
2.7 Deduplication → update_task, not create_task
2.8 items_created count (unit) → items_created == N create_* calls
2.9 Device offline (unit) → status=error
2.10 Empty file (unit) → items_processed=0, status=success
Run:
pytest tests/test_agent_runner_v2.py -v
pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only
pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.core.agent_runner import (
_format_metadata,
_format_projects,
_get_extraction_rules,
_get_no_match_behavior,
_is_overdue,
run_local_agent,
)
from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback
from app.models import AgentRunLog, LocalAgentConfig
from tests.conftest import TEST_USER_IDS
# ── Constants ─────────────────────────────────────────────────────────────
_USER_ID = TEST_USER_IDS["power"]
_AGENT_CONFIG = {
"content_types": [
{
"id": "email_html",
"label": "Email HTML",
"detection_hint": "HTML file with From/To/Subject headers",
"preprocessing": "email_html",
"extraction_prompt": (
"If the email contains a direct action request or task assignment → create a task. "
"If the email contains informational content, updates, or FYI → create a note. "
"If the email mentions a specific date for a meeting or deadline → create a timeline entry."
),
}
],
"global_rules": [
"Se il file non è riconducibile a nessun progetto, non creare alcuna entità."
],
"data_types": ["tasks", "notes", "timelines"],
}
_PROJECT_ALPHA = {"id": "proj-alpha", "name": "Project Alpha", "status": "active"}
_PROJECT_BETA = {"id": "proj-beta", "name": "Project Beta", "status": "active"}
# ── Sample email content ──────────────────────────────────────────────────
_ACTION_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> boss@company.com</p>
<p><b>To:</b> dev@company.com</p>
<p><b>Subject:</b> Fix the login bug</p>
<p><b>Date:</b> 2026-04-07</p>
<p>Hi,<br>Please fix the login bug in Project Alpha by Friday. High priority!</p>
</body></html>
"""
_INFO_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>To:</b> team@company.com</p>
<p><b>Subject:</b> FYI: New policy for Project Alpha</p>
<p>Just a heads-up that starting next week all code reviews must be done
within 24 hours for Project Alpha. No action needed from you now.</p>
</body></html>
"""
_DATE_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>Subject:</b> Project Alpha kick-off meeting</p>
<p>The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.</p>
</body></html>
"""
_NO_PROJECT_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> newsletter@ads.com</p>
<p><b>Subject:</b> Weekly newsletter</p>
<p>Check out our latest deals on electronics!</p>
</body></html>
"""
_EXISTING_TASK = {
"id": "task-existing",
"title": "Fix the login bug",
"status": "todo",
"priority": "medium",
}
# ── Test helpers ──────────────────────────────────────────────────────────
def _make_config(
agent_config: dict | None = None,
directory: str = "/emails",
device_id: str = "dev-001",
) -> LocalAgentConfig:
return LocalAgentConfig(
id=str(uuid.uuid4()),
user_id=_USER_ID,
device_id=device_id,
name="Test V2 Agent",
directory_paths=[directory],
data_types=["tasks", "notes", "timelines"],
prompt_template="",
agent_config=agent_config or _AGENT_CONFIG,
file_extensions=[".html", ".eml"],
schedule_cron="0 */6 * * *",
enabled=True,
last_run_at=None,
)
def _make_run_log(agent_id: str) -> AgentRunLog:
return AgentRunLog(
id=str(uuid.uuid4()),
agent_id=agent_id,
agent_type="local",
user_id=_USER_ID,
status="running",
started_at=datetime.now(timezone.utc),
)
def _make_manager(online: bool = True) -> DeviceConnectionManager:
mgr = DeviceConnectionManager()
if online:
ws = MagicMock()
ws.send_text = AsyncMock()
mgr.register(_USER_ID, "dev-001", ws)
return mgr
def _make_executor(
file_path: str,
file_content: str,
projects: list[dict] | None = None,
existing_tasks: list[dict] | None = None,
existing_notes: list[dict] | None = None,
existing_timelines: list[dict] | None = None,
) -> tuple[Any, list[dict]]:
"""Return (async_executor, captured_calls).
The executor handles all ``execute_on_client`` payloads:
directory listing, file reading, project/entity fetching, and CRUD.
"""
calls: list[dict] = []
_projects = projects or [_PROJECT_ALPHA, _PROJECT_BETA]
async def _executor(payload: dict) -> dict:
action = payload.get("action", "")
table = payload.get("table", "")
data = payload.get("data") or {}
calls.append({"action": action, "table": table, "data": data})
if action == "list_directory":
path = data.get("path", "") or payload.get("data", {}).get("path", "")
return {
"entries": [{"type": "file", "path": file_path}]
}
if action == "get_file_metadata":
return {"modifiedAt": None}
if action == "read_file_content":
return {"content": file_content}
if action == "select":
if table == "projects":
return {"rows": _projects}
if table == "tasks":
return {"rows": existing_tasks or []}
if table == "notes":
return {"rows": existing_notes or []}
if table == "timelines":
return {"rows": existing_timelines or []}
return {"rows": []}
if action == "insert":
return {"row": {"id": str(uuid.uuid4()), **data}}
if action == "update":
return {"success": True}
return {}
return _executor, calls
# ── Unit: helper functions ────────────────────────────────────────────────
def test_format_projects_empty():
assert "(no projects" in _format_projects([])
def test_format_projects_with_data():
result = _format_projects([_PROJECT_ALPHA])
assert "proj-alpha" in result
assert "Project Alpha" in result
def test_format_metadata_empty():
assert _format_metadata({}) == ""
def test_format_metadata_email():
meta = {"subject": "Fix bug", "from": "boss@co.com", "date": "2026-04-07"}
result = _format_metadata(meta)
assert "Fix bug" in result
assert "boss@co.com" in result
def test_get_extraction_rules_match():
rules = _get_extraction_rules(_AGENT_CONFIG, "email_html")
assert "task" in rules.lower()
def test_get_extraction_rules_fallback():
rules = _get_extraction_rules(_AGENT_CONFIG, "plain_text")
assert "extract" in rules.lower()
def test_get_no_match_behavior_from_global_rules():
behavior = _get_no_match_behavior(_AGENT_CONFIG)
# The global rule says "non creare alcuna entità" → skip behavior
assert behavior # non-empty
def test_get_no_match_behavior_default():
behavior = _get_no_match_behavior({})
assert "project" in behavior.lower()
# ── Unit: 2.9 — device offline ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_2_9_device_offline():
"""2.9 No device online → status=error, no executor created."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager(online=False)
with patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
assert kwargs["status"] == "error"
assert any("not connected" in e for e in kwargs.get("errors", []))
# ── Unit: 2.10 — empty file ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_2_10_empty_file():
"""2.10 File with empty content → skipped, items_processed=0, success."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/empty.html",
file_content="", # empty
projects=[_PROJECT_ALPHA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
assert kwargs["items_processed"] == 0
assert kwargs["status"] == "success"
assert kwargs["items_created"] == 0
# ── Unit: 2.8 — items_created count ─────────────────────────────────────
@pytest.mark.asyncio
async def test_2_8_items_created_count():
"""2.8 items_created == number of create_* tool calls per run."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, _calls = _make_executor(
file_path="/emails/action.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA],
)
# Simulate LLM calling create_task twice and update_note once.
async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str:
if _tool_calls_out is not None:
_tool_calls_out.extend(["create_task", "create_note", "update_task"])
return "Done."
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._run_agent_with_tools", side_effect=mock_run_agent), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
# Only create_task + create_note count (not update_task).
assert kwargs["items_created"] == 2
assert kwargs["items_processed"] == 1
# ── Eval: 2.12.7 (real LLM + Langfuse scoring) ──────────────────────────
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_1_email_to_task():
"""2.1 Action email → LLM calls create_task. Score: runner.email_to_task."""
lf = get_langfuse()
trace = lf.trace(
name="eval-runner-2.1-email-to-task",
metadata={"step": "2"},
) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_action.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"]
score = 1.0 if len(task_creates) >= 1 else 0.0
if lf and trace:
lf.score(
trace_id=trace.id,
name="runner.email_to_task",
value=score,
comment=f"task_creates={len(task_creates)} items_created={kwargs.get('items_created')}",
)
lf.flush()
assert score == 1.0, f"Expected at least 1 task created, got {len(task_creates)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_2_email_to_note():
"""2.2 Informational email → LLM calls create_note. Score: runner.email_to_note."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.2-email-to-note", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_info.html",
file_content=_INFO_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
note_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "notes"]
score = 1.0 if len(note_creates) >= 1 else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.email_to_note", value=score,
comment=f"note_creates={len(note_creates)}")
lf.flush()
assert score == 1.0, f"Expected at least 1 note created, got {len(note_creates)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_3_email_to_timeline():
"""2.3 Email with event date → LLM calls create_timeline. Score: runner.email_to_timeline."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.3-email-to-timeline", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_kickoff.html",
file_content=_DATE_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
tl_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "timelines"]
score = 1.0 if len(tl_creates) >= 1 else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.email_to_timeline", value=score,
comment=f"timeline_creates={len(tl_creates)}")
lf.flush()
assert score == 1.0, f"Expected at least 1 timeline created, got {len(tl_creates)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_4_project_matching_filename():
"""2.4 Filename contains 'ProjectAlpha' → LLM assigns to proj-alpha. Score: runner.project_filename."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.4-project-filename", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_report.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
# Check that project_id = proj-alpha was used in any insert
inserts = [c for c in calls if c["action"] == "insert"]
correct_project = any(
c.get("data", {}).get("projectId") == "proj-alpha"
for c in inserts
)
score = 1.0 if correct_project else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.project_filename", value=score)
lf.flush()
assert score == 1.0, "Expected inserts to use proj-alpha based on filename"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_5_project_matching_content():
"""2.5 Email body mentions 'Project Alpha' → correct project assigned. Score: runner.project_content."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.5-project-content", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/email_001.html", # generic filename, no project hint
file_content=_ACTION_EMAIL, # body mentions "Project Alpha"
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
inserts = [c for c in calls if c["action"] == "insert"]
correct_project = any(
c.get("data", {}).get("projectId") == "proj-alpha"
for c in inserts
)
score = 1.0 if correct_project else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.project_content", value=score)
lf.flush()
assert score == 1.0, "Expected inserts to use proj-alpha based on email body content"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_6_no_project_match_global_rule():
"""2.6 Newsletter email + global rule 'no project = no entities' → no creates. Score: runner.no_project."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.6-no-project", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/newsletter.html",
file_content=_NO_PROJECT_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
inserts = [c for c in calls if c["action"] == "insert"]
score = 1.0 if len(inserts) == 0 else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.no_project", value=score,
comment=f"inserts={len(inserts)}")
lf.flush()
assert score == 1.0, f"Expected 0 inserts for unmatched newsletter, got {len(inserts)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_7_deduplication():
"""2.7 Existing task with same title → LLM calls update_task, not create_task. Score: runner.dedup."""
lf = get_langfuse()
trace = lf.trace(name="eval-runner-2.7-dedup", metadata={"step": "2"}) if lf else None
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_followup.html",
file_content=_ACTION_EMAIL, # "Fix the login bug" — already exists
projects=[_PROJECT_ALPHA],
existing_tasks=[_EXISTING_TASK], # task already exists
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"]
task_updates = [c for c in calls if c["action"] == "update" and c.get("table") == "tasks"]
# Prefer update over create
score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0
if lf and trace:
lf.score(trace_id=trace.id, name="runner.dedup", value=score,
comment=f"creates={len(task_creates)} updates={len(task_updates)}")
lf.flush()
assert score == 1.0, (
f"Expected deduplication: creates={len(task_creates)}, updates={len(task_updates)}"
)