From fa231a3642c864c0f8abd043a922c7d08fe3912f Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Tue, 7 Apr 2026 15:00:32 +0200 Subject: [PATCH] =?UTF-8?q?feat(local-agent-v2):=20step=202+3=20=E2=80=94?= =?UTF-8?q?=20unified=20runner=20+=20AgentConfig=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Step 3 (prerequisite): - app/schemas.py: add ContentTypeConfig + AgentConfig Pydantic models - app/models.py: add agent_config (JSON, nullable) to LocalAgentConfig - alembic migration a3b9c0d1e2f3: ADD COLUMN agent_config Step 2 (runner refactor): - Remove _classify_file() and _BATCH_FILE_CLASSIFIER_PROMPT (LLM classification step) - Add Phase A: detect_content_type + preprocess (zero LLM, per file) - Add _UNIFIED_PROCESSING_PROMPT (hot-swappable via Langfuse "unified_processing") - Add helper functions: _format_projects, _format_metadata, _get_extraction_rules, _get_no_match_behavior - Single LLM call per file with tools (classify + extract + create) - Fix items_created: count create_* tool calls via _tool_calls_out param - test_agent_runner_v2.py: 10 cases (2.1-2.10) with Langfuse eval scoring Co-Authored-By: Claude Sonnet 4.6 --- ...d1e2f3_add_agent_config_to_local_agents.py | 31 + app/core/agent_runner.py | 416 +++++-------- app/models.py | 1 + app/schemas.py | 21 + tests/test_agent_runner_v2.py | 587 ++++++++++++++++++ 5 files changed, 796 insertions(+), 260 deletions(-) create mode 100644 alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py create mode 100644 tests/test_agent_runner_v2.py diff --git a/alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py b/alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py new file mode 100644 index 0000000..f56b18e --- /dev/null +++ b/alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py @@ -0,0 +1,31 @@ +"""add agent_config to local_agent_configs + +Revision ID: a3b9c0d1e2f3 +Revises: 9a1f2d0b6c7e +Create Date: 2026-04-07 00:00:00.000000 + +""" +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "a3b9c0d1e2f3" +down_revision: Union[str, None] = "9a1f2d0b6c7e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "local_agent_configs", + sa.Column("agent_config", sa.JSON(), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("local_agent_configs", "agent_config") diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index a89b281..4adf1cb 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -2,12 +2,12 @@ Drives two agent types: -* **Local directory agent** — two-step execution per file: - Step 1 (Classification) uses code to fetch all projects and asks the LLM - to identify which project the file belongs to and which domains are relevant. - Step 2 (Processing) fetches existing entities for that project/domains via - code and runs an LLM with tools — existing data in context enforces - update-first naturally. +* **Local directory agent** — V2 unified flow per file: + Phase A (Detect + Preprocess, zero LLM): Python detects the content type + and strips markup/noise, producing clean text + metadata. + Phase B (Single LLM call with tools): the LLM identifies the project, + checks for duplicates via list_* tools, and creates/updates records. + ``items_created`` is counted from ``create_*`` tool calls. * **Cloud connector agent** — fetches data from third-party APIs (Gmail, Teams, Outlook) and pushes extracted items to Electron. @@ -29,6 +29,7 @@ from __future__ import annotations import asyncio import json import logging +import os import uuid from datetime import datetime, timedelta, timezone from typing import Any @@ -46,6 +47,7 @@ from app.config.settings import settings from app.core.device_manager import DeviceConnectionManager from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_llm +from app.core.preprocessors import detect_content_type, preprocess from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.db import async_session from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig @@ -81,83 +83,38 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = { "timelines": TIMELINE_TOOLS, } -# ── Step 1: Classification prompt ───────────────────────────────────────── +# ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ── -_DOMAIN_DESCRIPTIONS: dict[str, str] = { - "tasks": ( - "Action items, to-dos, deliverables — anything that describes work to be done, " - "assigned to someone, or tracked with a due date or status." - ), - "notes": ( - "Documentation, meeting notes, summaries, reference material — " - "written content meant to be read and referenced rather than acted on." - ), - "timelines": ( - "Project milestones, deadlines, scheduled events — " - "specific dates that mark a point in the progress of a project." - ), - "projects": ( - "High-level project entities — only relevant if the file clearly introduces " - "a new project or updates the scope of an existing one." - ), -} - -_BATCH_FILE_CLASSIFIER_PROMPT = """\ -You are a file classifier for a freelance project management tool. - -Your job is to match a file to an existing project and identify which data domains to extract. - -## Project matching rules (STRICT — follow in order) - -1. Search the file content for any mention of a project name, client name, acronym, or topic - that overlaps with the existing projects listed below. -2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough. -3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort - when the file has zero meaningful connection to any listed project. -4. When in doubt, pick the closest match from the list. - -## Response format - -Respond ONLY with a JSON object — no markdown, no explanation: - -{{"project_id": "", "new_project_name": "", "domains": ["tasks", "notes"]}} - -## Domain definitions (only consider domains in the allowed list) - -{domain_definitions} - -## Existing projects - -{projects_list} -""" - -# ── Step 2: Processing prompt ───────────────────────────────────────────── - -_BATCH_PROCESSING_PROMPT = """\ +_UNIFIED_PROCESSING_PROMPT = """\ You are a data extraction assistant for a freelance project management tool. -Your task: extract structured data from the file content and persist it using the available tools. +## Your process (follow this exact order) -## Mandatory process — follow this order for EVERY item you extract +### 1. Identify the project +File: {filename} +{metadata_section} -1. READ the existing records listed below for the relevant domain. -2. SEARCH for a match by title, topic, or semantic similarity. -3. If a match exists → call the update_* tool with the existing record's id. -4. If no match exists → call the create_* tool and set isAiSuggested=1. +Existing projects: +{projects_list} -NEVER call create_* without first checking the existing records. -NEVER duplicate a record that already exists under a different wording. +Match this file to an existing project using the filename and content clues. +If no project matches, {no_match_behavior}. -## Existing records (source of truth) +### 2. Check existing records +Once you identify the project, use list_tasks / list_notes / list_timelines +(filtered by projectId) to see what already exists. +NEVER create a record that already exists under the same or similar title. -{existing_context} +### 3. Extract and create / update +{extraction_rules} -## Context - -Project: {project_context} -Domains to extract: {data_types} - -{custom_prompt_section} +### Rules +- Set isAiSuggested=1 on every new record. +- Set projectId on every record (use the id from the project list above). +- Update existing records when a match is found by title or topic. +- Do NOT invent data — only extract what is clearly stated in the content. +- Target entity types: {data_types}. +{global_rules} """ # ── Cloud processing prompt (kept separate for cloud agent) ─────────────── @@ -273,8 +230,13 @@ async def _run_agent_with_tools( user_id: str = "", langfuse_prompt: Any = None, agent_name: str = "batch-agent", + _tool_calls_out: list[str] | None = None, ) -> str: - """Run an LLM agent with tool-calling, returning the final text response.""" + """Run an LLM agent with tool-calling, returning the final text response. + + If *_tool_calls_out* is provided, the name of every tool called during the + run is appended to it (used by the caller to count ``create_*`` calls). + """ lf = get_langfuse() llm = get_llm() llm_with_tools = llm.bind_tools(tools) @@ -332,6 +294,9 @@ async def _run_agent_with_tools( json.dumps(call_args, ensure_ascii=True)[:800], ) + if _tool_calls_out is not None: + _tool_calls_out.append(call_name) + tool_fn = tool_map.get(call_name) if tool_fn is None: tool_output = f"Unknown tool: {call_name}" @@ -523,99 +488,66 @@ def _format_entities_for_context(domain: str, rows: list[dict]) -> str: return f"Existing {domain}:\n" + "\n".join(lines) -# ── Step 1: LLM file classifier ─────────────────────────────────────────── +# ── V2 helper functions ─────────────────────────────────────────────────── -async def _classify_file( - file_path: str, - file_content: str, - projects: list[dict], - config_data_types: list[str], -) -> tuple[str, list[str], str | None]: - """Call the LLM to classify a file by project and relevant domains. - - Returns ``(project_id_or_"new", domains, new_project_name_or_None)``. - - ``project_id`` is an existing project UUID, or ``"new"`` when no match found. - - ``new_project_name`` is only set when ``project_id == "new"``. - Falls back to ``("new", config_data_types, None)`` on any error. - """ - fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None) - - if not file_content.strip(): - return fallback - - valid_project_ids = {p["id"] for p in projects} - - def _fmt_project(p: dict) -> str: +def _format_projects(projects: list[dict]) -> str: + """Format the project list for the unified system prompt.""" + if not projects: + return " (no projects yet)" + lines: list[str] = [] + for p in projects: summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip() summary_part = f" — {summary[:100]}" if summary else "" - return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}" - - projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)" - - domain_definitions = "\n".join( - f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}" - for d in config_data_types - if d in _DOMAIN_DESCRIPTIONS - ) - - step1_template, step1_prompt_obj = get_prompt_or_fallback( - "batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT - ) - system = step1_template.format( - domain_definitions=domain_definitions, - projects_list=projects_list, - ) - - lf = get_langfuse() - llm = get_llm() - classifier_messages = [ - SystemMessage(content=system), - HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), - ] - try: - if lf: - with lf.start_as_current_observation( - as_type="generation", - name="step1-classifier", - model=settings.LLM_ROUTER_MODEL, - prompt=step1_prompt_obj, - input=classifier_messages, - ) as gen: - response = await llm.ainvoke(classifier_messages) - gen.update(output=_as_text(response.content), usage=extract_usage(response)) - else: - response = await llm.ainvoke(classifier_messages) - raw = _as_text(response.content).strip() - # Strip markdown fences if the model wraps the JSON. - if raw.startswith("```"): - raw = raw.split("```")[1] - if raw.startswith("json"): - raw = raw[4:] - parsed = json.loads(raw.strip()) - raw_project_id: str = str(parsed.get("project_id") or "new") - # Reject hallucinated UUIDs — only accept ids that exist in the fetched list. - project_id = raw_project_id if raw_project_id in valid_project_ids else "new" - new_project_name: str | None = ( - str(parsed["new_project_name"]).strip() or None - if project_id == "new" and parsed.get("new_project_name") - else None + lines.append( + f" - id={p['id']} | name={p.get('name', '')} | " + f"status={p.get('status', '')}{summary_part}" ) - domains: list[str] = [ - d for d in parsed.get("domains", []) - if d in config_data_types - ] - if not domains: - domains = list(config_data_types) - return project_id, domains, new_project_name - except Exception as exc: - logger.warning( - "agent_runner: step1 classification failed for %r: %s", file_path, exc - ) - return fallback + return "\n".join(lines) -# ── Local agent runner (two-step per file) ──────────────────────────────── +def _format_metadata(metadata: dict) -> str: + """Format preprocessor metadata as a compact context block.""" + if not metadata: + return "" + parts: list[str] = [] + for key in ("subject", "from", "to", "date"): + if metadata.get(key): + parts.append(f"{key.capitalize()}: {metadata[key]}") + # any remaining keys + for key, val in metadata.items(): + if key not in ("subject", "from", "to", "date") and val: + parts.append(f"{key}: {val}") + return "\n".join(parts) + + +def _get_extraction_rules(agent_config: dict, content_type: str) -> str: + """Return the extraction_prompt for *content_type* from *agent_config*. + + Falls back to a generic instruction when the type is not configured. + """ + for ct in agent_config.get("content_types", []): + if ct.get("id") == content_type: + prompt = ct.get("extraction_prompt", "").strip() + if prompt: + return prompt + return ( + "Extract relevant information as tasks (action items), notes " + "(informational content), or timelines (dated events)." + ) + + +def _get_no_match_behavior(agent_config: dict) -> str: + """Derive the 'no project match' instruction from global_rules.""" + rules = agent_config.get("global_rules", []) + for rule in rules: + lower = rule.lower() + if "no project" in lower or "no match" in lower or "skip" in lower: + return rule + return "create a new project with a concise name derived from the file content" + + +# ── Local agent runner (V2 — unified per-file flow) ─────────────────────── async def run_local_agent( @@ -625,16 +557,17 @@ async def run_local_agent( device_mgr: DeviceConnectionManager, run_context: dict | None = None, ) -> None: - """Execute a local directory agent run using a two-step approach per file. + """Execute a local directory agent run — V2 unified flow. - Step 1 — Classification (code + 1 LLM call per file, no tools): - Code scans directories and fetches all projects via WS. - For each file, LLM identifies the project and relevant domains. + Phase A — Detect + Preprocess (zero LLM, per file): + Python detects the content type from filename + content patterns and + runs the appropriate handler (e.g. email_html) to produce clean text + and structured metadata. - Step 2 — Processing (code + 1 LLM call per file, with tools): - Code fetches existing entities for the identified project/domains. - LLM receives file content + existing entities in context and uses - tools to update existing records or create new ones. + Phase B — Single LLM call with tools (per file): + One LLM call handles project identification, duplicate checking, and + record creation/update. ``create_*`` tool calls are counted to + produce the accurate ``items_created`` metric. """ run_id = run_log.id agent_id = (run_context or {}).get("agent_id") or config.id @@ -669,12 +602,8 @@ async def run_local_agent( errors: list[str] = [] items_processed = 0 items_created = 0 - - custom_section = ( - f"User instructions:\n{config.prompt_template}" - if config.prompt_template - else "" - ) + agent_config: dict = config.agent_config or {} + processing_tools = _build_processing_tools(config.data_types) try: # ── Code: scan directories ─────────────────────────────────── @@ -694,114 +623,80 @@ async def run_local_agent( # ── Code: fetch all projects once ──────────────────────────── projects = await _fetch_projects() + projects_block = _format_projects(projects) + + # Prompt template + Langfuse version linking (hot-swappable from UI). + unified_template, prompt_obj = get_prompt_or_fallback( + "unified_processing", _UNIFIED_PROCESSING_PROMPT + ) for file_path in file_paths: try: - # Read file content via code. + # ── Phase A: read + detect + preprocess ───────────── file_result = await execute_on_client( action="read_file_content", data={"path": file_path} ) - file_content: str = file_result.get("content", "") - if not file_content: - logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path) + raw_content: str = file_result.get("content", "") + if not raw_content.strip(): + logger.debug( + "agent_runner: run=%s skipping empty file %r", run_id, file_path + ) continue items_processed += 1 + filename = os.path.basename(file_path) + content_type = detect_content_type(filename, raw_content) + preprocessed = preprocess(content_type, raw_content) - # Step 1 — classify file. - project_id, domains, new_project_name = await _classify_file( - file_path=file_path, - file_content=file_content, - projects=projects, - config_data_types=config.data_types, - ) logger.info( - "agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s", - run_id, - file_path, - project_id, - new_project_name, - domains, + "agent_runner: run=%s file=%r content_type=%s clean_len=%d", + run_id, file_path, content_type, len(preprocessed.clean_text), ) - # Step 2 — resolve project_id via CODE, then fetch entities. - # Project creation is NEVER delegated to the Step 2 LLM. - if project_id == "new": - proj_name = new_project_name or "Untitled Project" - try: - proj_result = await execute_on_client( - action="insert", - table="projects", - data={"name": proj_name, "clientId": None}, - ) - created = proj_result.get("row", {}) - effective_project_id = created.get("id", "standalone") - # Add to local list so subsequent files can match it. - if "id" in created: - projects.append(created) - logger.info( - "agent_runner: run=%s created project %r id=%s", - run_id, proj_name, effective_project_id, - ) - except Exception as exc: - logger.warning( - "agent_runner: run=%s failed to create project %r: %s", - run_id, proj_name, exc, - ) - effective_project_id = "standalone" - proj_name = "unknown" - project_context = ( - f"Project: {proj_name} (id: {effective_project_id}). " - "Always set projectId to this id on every record you create." - ) - else: - effective_project_id = project_id - proj = next((p for p in projects if p["id"] == project_id), None) - proj_name = proj.get("name", project_id) if proj else project_id - project_context = ( - f"Project: {proj_name} (id: {project_id}). " - "Always set projectId to this id on every record you create." - ) - - # "projects" domain is never passed to Step 2 — handled above in code. - domains = [d for d in domains if d != "projects"] - - existing_blocks: list[str] = [] - for domain in domains: - rows = await _fetch_domain_entities(domain, effective_project_id) - existing_blocks.append(_format_entities_for_context(domain, rows)) - - existing_context = "\n\n".join(existing_blocks) - - step2_template, step2_prompt_obj = get_prompt_or_fallback( - "batch_processing", _BATCH_PROCESSING_PROMPT + # ── Phase B: single LLM call ───────────────────────── + extraction_rules = _get_extraction_rules(agent_config, content_type) + no_match_behavior = _get_no_match_behavior(agent_config) + global_rules_lines = "\n".join( + f"- {r}" for r in agent_config.get("global_rules", []) ) - system_prompt = step2_template.format( - existing_context=existing_context, - project_context=project_context, - data_types=", ".join(domains), - custom_prompt_section=custom_section, + metadata_section = _format_metadata(preprocessed.metadata) + + system_prompt = unified_template.format( + filename=filename, + metadata_section=metadata_section, + projects_list=projects_block, + no_match_behavior=no_match_behavior, + extraction_rules=extraction_rules, + global_rules=global_rules_lines, + data_types=", ".join(config.data_types), ) - processing_tools = _build_processing_tools(domains) + user_message = ( + f"Process this file and extract relevant information.\n\n" + f"File: {file_path}\n\n" + f"Content:\n{preprocessed.clean_text}" + ) + file_tool_calls: list[str] = [] result_text = await _run_agent_with_tools( system_prompt=system_prompt, - user_message=( - f"Process this file and extract relevant information.\n\n" - f"File: {file_path}\n\nContent:\n{file_content}" - ), + user_message=user_message, tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, user_id=user_id, - langfuse_prompt=step2_prompt_obj, - agent_name="step2-processor", + langfuse_prompt=prompt_obj, + agent_name="unified-processor", + _tool_calls_out=file_tool_calls, ) + + file_created = sum( + 1 for name in file_tool_calls if name.startswith("create_") + ) + items_created += file_created + logger.info( - "agent_runner: run=%s file=%r result=%s", - run_id, - file_path, - result_text[:200], + "agent_runner: run=%s file=%r created=%d result=%s", + run_id, file_path, file_created, result_text[:200], ) except Exception as exc: @@ -833,10 +728,11 @@ async def run_local_agent( errors=errors, ) logger.info( - "agent_runner: run=%s done status=%s processed=%d errors=%d", + "agent_runner: run=%s done status=%s processed=%d created=%d errors=%d", run_id, final_status, items_processed, + items_created, len(errors), ) diff --git a/app/models.py b/app/models.py index 93cdfab..7a9b732 100644 --- a/app/models.py +++ b/app/models.py @@ -296,6 +296,7 @@ class LocalAgentConfig(Base): directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list) data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list) prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="") + agent_config: Mapped[dict | None] = mapped_column(JSON, nullable=True) file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list) schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *") enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) diff --git a/app/schemas.py b/app/schemas.py index 39143c4..77568dd 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -273,6 +273,27 @@ class WsFloatingDomain(BaseModel): domain: WsDomain +# ── Agent Config V2 ─────────────────────────────────────────────────── + + +class ContentTypeConfig(BaseModel): + """Per-type extraction config produced by the journey chatbot.""" + + id: str + label: str = "" + detection_hint: str = "" + preprocessing: str = "generic" # handler name: "email_html", "plain_text", ... + extraction_prompt: str + + +class AgentConfig(BaseModel): + """Structured agent configuration (replaces freeform prompt_template).""" + + content_types: list[ContentTypeConfig] = [] + global_rules: list[str] = [] + data_types: list[str] = [] + + # ── Agent Catalog ───────────────────────────────────────────────────── class AgentCatalogItem(BaseModel): diff --git a/tests/test_agent_runner_v2.py b/tests/test_agent_runner_v2.py new file mode 100644 index 0000000..fae88d9 --- /dev/null +++ b/tests/test_agent_runner_v2.py @@ -0,0 +1,587 @@ +"""Tests for Local Agent V2 runner (Step 2). + +Covers the unified per-file flow: + Phase A — detect + preprocess (Python, zero LLM) + Phase B — single LLM call with tools (classify + extract + create) + +Test cases: + 2.1 Happy path: email with action → create_task called + 2.2 Happy path: email informative → create_note called + 2.3 Happy path: email with date → create_timeline called + 2.4 Project matching via filename → correct project_id used + 2.5 Project matching via content → correct project_id used + 2.6 No project match + global rule → no create_* called + 2.7 Deduplication → update_task, not create_task + 2.8 items_created count (unit) → items_created == N create_* calls + 2.9 Device offline (unit) → status=error + 2.10 Empty file (unit) → items_processed=0, status=success + +Run: + pytest tests/test_agent_runner_v2.py -v + pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only + pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.core.agent_runner import ( + _format_metadata, + _format_projects, + _get_extraction_rules, + _get_no_match_behavior, + _is_overdue, + run_local_agent, +) +from app.core.device_manager import DeviceConnectionManager +from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback +from app.models import AgentRunLog, LocalAgentConfig +from tests.conftest import TEST_USER_IDS + +# ── Constants ───────────────────────────────────────────────────────────── + +_USER_ID = TEST_USER_IDS["power"] + +_AGENT_CONFIG = { + "content_types": [ + { + "id": "email_html", + "label": "Email HTML", + "detection_hint": "HTML file with From/To/Subject headers", + "preprocessing": "email_html", + "extraction_prompt": ( + "If the email contains a direct action request or task assignment → create a task. " + "If the email contains informational content, updates, or FYI → create a note. " + "If the email mentions a specific date for a meeting or deadline → create a timeline entry." + ), + } + ], + "global_rules": [ + "Se il file non è riconducibile a nessun progetto, non creare alcuna entità." + ], + "data_types": ["tasks", "notes", "timelines"], +} + +_PROJECT_ALPHA = {"id": "proj-alpha", "name": "Project Alpha", "status": "active"} +_PROJECT_BETA = {"id": "proj-beta", "name": "Project Beta", "status": "active"} + +# ── Sample email content ────────────────────────────────────────────────── + +_ACTION_EMAIL = """\ + +

From: boss@company.com

+

To: dev@company.com

+

Subject: Fix the login bug

+

Date: 2026-04-07

+

Hi,
Please fix the login bug in Project Alpha by Friday. High priority!

+ +""" + +_INFO_EMAIL = """\ + +

From: pm@company.com

+

To: team@company.com

+

Subject: FYI: New policy for Project Alpha

+

Just a heads-up that starting next week all code reviews must be done +within 24 hours for Project Alpha. No action needed from you now.

+ +""" + +_DATE_EMAIL = """\ + +

From: pm@company.com

+

Subject: Project Alpha kick-off meeting

+

The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.

+ +""" + +_NO_PROJECT_EMAIL = """\ + +

From: newsletter@ads.com

+

Subject: Weekly newsletter

+

Check out our latest deals on electronics!

+ +""" + +_EXISTING_TASK = { + "id": "task-existing", + "title": "Fix the login bug", + "status": "todo", + "priority": "medium", +} + + +# ── Test helpers ────────────────────────────────────────────────────────── + + +def _make_config( + agent_config: dict | None = None, + directory: str = "/emails", + device_id: str = "dev-001", +) -> LocalAgentConfig: + return LocalAgentConfig( + id=str(uuid.uuid4()), + user_id=_USER_ID, + device_id=device_id, + name="Test V2 Agent", + directory_paths=[directory], + data_types=["tasks", "notes", "timelines"], + prompt_template="", + agent_config=agent_config or _AGENT_CONFIG, + file_extensions=[".html", ".eml"], + schedule_cron="0 */6 * * *", + enabled=True, + last_run_at=None, + ) + + +def _make_run_log(agent_id: str) -> AgentRunLog: + return AgentRunLog( + id=str(uuid.uuid4()), + agent_id=agent_id, + agent_type="local", + user_id=_USER_ID, + status="running", + started_at=datetime.now(timezone.utc), + ) + + +def _make_manager(online: bool = True) -> DeviceConnectionManager: + mgr = DeviceConnectionManager() + if online: + ws = MagicMock() + ws.send_text = AsyncMock() + mgr.register(_USER_ID, "dev-001", ws) + return mgr + + +def _make_executor( + file_path: str, + file_content: str, + projects: list[dict] | None = None, + existing_tasks: list[dict] | None = None, + existing_notes: list[dict] | None = None, + existing_timelines: list[dict] | None = None, +) -> tuple[Any, list[dict]]: + """Return (async_executor, captured_calls). + + The executor handles all ``execute_on_client`` payloads: + directory listing, file reading, project/entity fetching, and CRUD. + """ + calls: list[dict] = [] + _projects = projects or [_PROJECT_ALPHA, _PROJECT_BETA] + + async def _executor(payload: dict) -> dict: + action = payload.get("action", "") + table = payload.get("table", "") + data = payload.get("data") or {} + calls.append({"action": action, "table": table, "data": data}) + + if action == "list_directory": + path = data.get("path", "") or payload.get("data", {}).get("path", "") + return { + "entries": [{"type": "file", "path": file_path}] + } + + if action == "get_file_metadata": + return {"modifiedAt": None} + + if action == "read_file_content": + return {"content": file_content} + + if action == "select": + if table == "projects": + return {"rows": _projects} + if table == "tasks": + return {"rows": existing_tasks or []} + if table == "notes": + return {"rows": existing_notes or []} + if table == "timelines": + return {"rows": existing_timelines or []} + return {"rows": []} + + if action == "insert": + return {"row": {"id": str(uuid.uuid4()), **data}} + + if action == "update": + return {"success": True} + + return {} + + return _executor, calls + + +# ── Unit: helper functions ──────────────────────────────────────────────── + + +def test_format_projects_empty(): + assert "(no projects" in _format_projects([]) + + +def test_format_projects_with_data(): + result = _format_projects([_PROJECT_ALPHA]) + assert "proj-alpha" in result + assert "Project Alpha" in result + + +def test_format_metadata_empty(): + assert _format_metadata({}) == "" + + +def test_format_metadata_email(): + meta = {"subject": "Fix bug", "from": "boss@co.com", "date": "2026-04-07"} + result = _format_metadata(meta) + assert "Fix bug" in result + assert "boss@co.com" in result + + +def test_get_extraction_rules_match(): + rules = _get_extraction_rules(_AGENT_CONFIG, "email_html") + assert "task" in rules.lower() + + +def test_get_extraction_rules_fallback(): + rules = _get_extraction_rules(_AGENT_CONFIG, "plain_text") + assert "extract" in rules.lower() + + +def test_get_no_match_behavior_from_global_rules(): + behavior = _get_no_match_behavior(_AGENT_CONFIG) + # The global rule says "non creare alcuna entità" → skip behavior + assert behavior # non-empty + + +def test_get_no_match_behavior_default(): + behavior = _get_no_match_behavior({}) + assert "project" in behavior.lower() + + +# ── Unit: 2.9 — device offline ─────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_2_9_device_offline(): + """2.9 No device online → status=error, no executor created.""" + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager(online=False) + + with patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: + await run_local_agent(_USER_ID, config, run_log, mgr) + + _, kwargs = mock_fin.call_args + assert kwargs["status"] == "error" + assert any("not connected" in e for e in kwargs.get("errors", [])) + + +# ── Unit: 2.10 — empty file ────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_2_10_empty_file(): + """2.10 File with empty content → skipped, items_processed=0, success.""" + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/empty.html", + file_content="", # empty + projects=[_PROJECT_ALPHA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: + await run_local_agent(_USER_ID, config, run_log, mgr) + + _, kwargs = mock_fin.call_args + assert kwargs["items_processed"] == 0 + assert kwargs["status"] == "success" + assert kwargs["items_created"] == 0 + + +# ── Unit: 2.8 — items_created count ───────────────────────────────────── + + +@pytest.mark.asyncio +async def test_2_8_items_created_count(): + """2.8 items_created == number of create_* tool calls per run.""" + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, _calls = _make_executor( + file_path="/emails/action.html", + file_content=_ACTION_EMAIL, + projects=[_PROJECT_ALPHA], + ) + + # Simulate LLM calling create_task twice and update_note once. + async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str: + if _tool_calls_out is not None: + _tool_calls_out.extend(["create_task", "create_note", "update_task"]) + return "Done." + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._run_agent_with_tools", side_effect=mock_run_agent), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: + await run_local_agent(_USER_ID, config, run_log, mgr) + + _, kwargs = mock_fin.call_args + # Only create_task + create_note count (not update_task). + assert kwargs["items_created"] == 2 + assert kwargs["items_processed"] == 1 + + +# ── Eval: 2.1–2.7 (real LLM + Langfuse scoring) ────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_1_email_to_task(): + """2.1 Action email → LLM calls create_task. Score: runner.email_to_task.""" + lf = get_langfuse() + trace = lf.trace( + name="eval-runner-2.1-email-to-task", + metadata={"step": "2"}, + ) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/ProjectAlpha_action.html", + file_content=_ACTION_EMAIL, + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: + await run_local_agent(_USER_ID, config, run_log, mgr) + + _, kwargs = mock_fin.call_args + task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"] + score = 1.0 if len(task_creates) >= 1 else 0.0 + + if lf and trace: + lf.score( + trace_id=trace.id, + name="runner.email_to_task", + value=score, + comment=f"task_creates={len(task_creates)} items_created={kwargs.get('items_created')}", + ) + lf.flush() + + assert score == 1.0, f"Expected at least 1 task created, got {len(task_creates)}" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_2_email_to_note(): + """2.2 Informational email → LLM calls create_note. Score: runner.email_to_note.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.2-email-to-note", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/ProjectAlpha_info.html", + file_content=_INFO_EMAIL, + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): + await run_local_agent(_USER_ID, config, run_log, mgr) + + note_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "notes"] + score = 1.0 if len(note_creates) >= 1 else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.email_to_note", value=score, + comment=f"note_creates={len(note_creates)}") + lf.flush() + + assert score == 1.0, f"Expected at least 1 note created, got {len(note_creates)}" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_3_email_to_timeline(): + """2.3 Email with event date → LLM calls create_timeline. Score: runner.email_to_timeline.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.3-email-to-timeline", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/ProjectAlpha_kickoff.html", + file_content=_DATE_EMAIL, + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): + await run_local_agent(_USER_ID, config, run_log, mgr) + + tl_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "timelines"] + score = 1.0 if len(tl_creates) >= 1 else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.email_to_timeline", value=score, + comment=f"timeline_creates={len(tl_creates)}") + lf.flush() + + assert score == 1.0, f"Expected at least 1 timeline created, got {len(tl_creates)}" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_4_project_matching_filename(): + """2.4 Filename contains 'ProjectAlpha' → LLM assigns to proj-alpha. Score: runner.project_filename.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.4-project-filename", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/ProjectAlpha_report.html", + file_content=_ACTION_EMAIL, + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): + await run_local_agent(_USER_ID, config, run_log, mgr) + + # Check that project_id = proj-alpha was used in any insert + inserts = [c for c in calls if c["action"] == "insert"] + correct_project = any( + c.get("data", {}).get("projectId") == "proj-alpha" + for c in inserts + ) + score = 1.0 if correct_project else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.project_filename", value=score) + lf.flush() + + assert score == 1.0, "Expected inserts to use proj-alpha based on filename" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_5_project_matching_content(): + """2.5 Email body mentions 'Project Alpha' → correct project assigned. Score: runner.project_content.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.5-project-content", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/email_001.html", # generic filename, no project hint + file_content=_ACTION_EMAIL, # body mentions "Project Alpha" + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): + await run_local_agent(_USER_ID, config, run_log, mgr) + + inserts = [c for c in calls if c["action"] == "insert"] + correct_project = any( + c.get("data", {}).get("projectId") == "proj-alpha" + for c in inserts + ) + score = 1.0 if correct_project else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.project_content", value=score) + lf.flush() + + assert score == 1.0, "Expected inserts to use proj-alpha based on email body content" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_6_no_project_match_global_rule(): + """2.6 Newsletter email + global rule 'no project = no entities' → no creates. Score: runner.no_project.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.6-no-project", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/newsletter.html", + file_content=_NO_PROJECT_EMAIL, + projects=[_PROJECT_ALPHA, _PROJECT_BETA], + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: + await run_local_agent(_USER_ID, config, run_log, mgr) + + _, kwargs = mock_fin.call_args + inserts = [c for c in calls if c["action"] == "insert"] + score = 1.0 if len(inserts) == 0 else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.no_project", value=score, + comment=f"inserts={len(inserts)}") + lf.flush() + + assert score == 1.0, f"Expected 0 inserts for unmatched newsletter, got {len(inserts)}" + + +@pytest.mark.asyncio +@pytest.mark.eval +async def test_2_7_deduplication(): + """2.7 Existing task with same title → LLM calls update_task, not create_task. Score: runner.dedup.""" + lf = get_langfuse() + trace = lf.trace(name="eval-runner-2.7-dedup", metadata={"step": "2"}) if lf else None + + config = _make_config() + run_log = _make_run_log(config.id) + mgr = _make_manager() + + executor, calls = _make_executor( + file_path="/emails/ProjectAlpha_followup.html", + file_content=_ACTION_EMAIL, # "Fix the login bug" — already exists + projects=[_PROJECT_ALPHA], + existing_tasks=[_EXISTING_TASK], # task already exists + ) + + with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ + patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): + await run_local_agent(_USER_ID, config, run_log, mgr) + + task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"] + task_updates = [c for c in calls if c["action"] == "update" and c.get("table") == "tasks"] + # Prefer update over create + score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0 + + if lf and trace: + lf.score(trace_id=trace.id, name="runner.dedup", value=score, + comment=f"creates={len(task_creates)} updates={len(task_updates)}") + lf.flush() + + assert score == 1.0, ( + f"Expected deduplication: creates={len(task_creates)}, updates={len(task_updates)}" + )