"""Tests for Local Agent V2 runner (Step 2). Covers the unified per-file flow: Phase A — detect + preprocess (Python, zero LLM) Phase B — single LLM call with tools (classify + extract + create) Test cases: 2.1 Happy path: email with action → create_task called 2.2 Happy path: email informative → create_note called 2.3 Happy path: email with date → create_timeline called 2.4 Project matching via filename → correct project_id used 2.5 Project matching via content → correct project_id used 2.6 No project match + global rule → no create_* called 2.7 Deduplication → update_task, not create_task 2.8 items_created count (unit) → items_created == N create_* calls 2.9 Device offline (unit) → status=error 2.10 Empty file (unit) → items_processed=0, status=success Run: pytest tests/test_agent_runner_v2.py -v pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only """ from __future__ import annotations import uuid from datetime import datetime, timezone from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.core.agent_runner import ( _format_metadata, _format_projects, _get_extraction_rules, _get_no_match_behavior, _is_overdue, run_local_agent, ) from app.core.device_manager import DeviceConnectionManager from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback from app.models import AgentRunLog, LocalAgentConfig from tests.conftest import TEST_USER_IDS # ── Constants ───────────────────────────────────────────────────────────── _USER_ID = TEST_USER_IDS["power"] _AGENT_CONFIG = { "content_types": [ { "id": "email_html", "label": "Email HTML", "detection_hint": "HTML file with From/To/Subject headers", "preprocessing": "email_html", "extraction_prompt": ( "If the email contains a direct action request or task assignment → create a task. " "If the email contains informational content, updates, or FYI → create a note. " "If the email mentions a specific date for a meeting or deadline → create a timeline entry." ), } ], "global_rules": [ "Se il file non è riconducibile a nessun progetto, non creare alcuna entità." ], "data_types": ["tasks", "notes", "timelines"], } _PROJECT_ALPHA = {"id": "proj-alpha", "name": "Project Alpha", "status": "active"} _PROJECT_BETA = {"id": "proj-beta", "name": "Project Beta", "status": "active"} # ── Sample email content ────────────────────────────────────────────────── _ACTION_EMAIL = """\

From: boss@company.com

To: dev@company.com

Subject: Fix the login bug

Date: 2026-04-07

Hi,
Please fix the login bug in Project Alpha by Friday. High priority!

""" _INFO_EMAIL = """\

From: pm@company.com

To: team@company.com

Subject: FYI: New policy for Project Alpha

Just a heads-up that starting next week all code reviews must be done within 24 hours for Project Alpha. No action needed from you now.

""" _DATE_EMAIL = """\

From: pm@company.com

Subject: Project Alpha kick-off meeting

The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.

""" _NO_PROJECT_EMAIL = """\

From: newsletter@ads.com

Subject: Weekly newsletter

Check out our latest deals on electronics!

""" _EXISTING_TASK = { "id": "task-existing", "title": "Fix the login bug", "status": "todo", "priority": "medium", } # ── Test helpers ────────────────────────────────────────────────────────── def _make_config( agent_config: dict | None = None, directory: str = "/emails", device_id: str = "dev-001", ) -> LocalAgentConfig: return LocalAgentConfig( id=str(uuid.uuid4()), user_id=_USER_ID, device_id=device_id, name="Test V2 Agent", directory_paths=[directory], data_types=["tasks", "notes", "timelines"], prompt_template="", agent_config=agent_config or _AGENT_CONFIG, file_extensions=[".html", ".eml"], schedule_cron="0 */6 * * *", enabled=True, last_run_at=None, ) def _make_run_log(agent_id: str) -> AgentRunLog: return AgentRunLog( id=str(uuid.uuid4()), agent_id=agent_id, agent_type="local", user_id=_USER_ID, status="running", started_at=datetime.now(timezone.utc), ) def _make_manager(online: bool = True) -> DeviceConnectionManager: mgr = DeviceConnectionManager() if online: ws = MagicMock() ws.send_text = AsyncMock() mgr.register(_USER_ID, "dev-001", ws) return mgr def _make_executor( file_path: str, file_content: str, projects: list[dict] | None = None, existing_tasks: list[dict] | None = None, existing_notes: list[dict] | None = None, existing_timelines: list[dict] | None = None, ) -> tuple[Any, list[dict]]: """Return (async_executor, captured_calls). The executor handles all ``execute_on_client`` payloads: directory listing, file reading, project/entity fetching, and CRUD. """ calls: list[dict] = [] _projects = projects or [_PROJECT_ALPHA, _PROJECT_BETA] async def _executor(payload: dict) -> dict: action = payload.get("action", "") table = payload.get("table", "") data = payload.get("data") or {} calls.append({"action": action, "table": table, "data": data}) if action == "list_directory": path = data.get("path", "") or payload.get("data", {}).get("path", "") return { "entries": [{"type": "file", "path": file_path}] } if action == "get_file_metadata": return {"modifiedAt": None} if action == "read_file_content": return {"content": file_content} if action == "select": if table == "projects": return {"rows": _projects} if table == "tasks": return {"rows": existing_tasks or []} if table == "notes": return {"rows": existing_notes or []} if table == "timelines": return {"rows": existing_timelines or []} return {"rows": []} if action == "insert": return {"row": {"id": str(uuid.uuid4()), **data}} if action == "update": return {"success": True} return {} return _executor, calls # ── Unit: helper functions ──────────────────────────────────────────────── def test_format_projects_empty(): assert "(no projects" in _format_projects([]) def test_format_projects_with_data(): result = _format_projects([_PROJECT_ALPHA]) assert "proj-alpha" in result assert "Project Alpha" in result def test_format_metadata_empty(): assert _format_metadata({}) == "" def test_format_metadata_email(): meta = {"subject": "Fix bug", "from": "boss@co.com", "date": "2026-04-07"} result = _format_metadata(meta) assert "Fix bug" in result assert "boss@co.com" in result def test_get_extraction_rules_match(): rules = _get_extraction_rules(_AGENT_CONFIG, "email_html") assert "task" in rules.lower() def test_get_extraction_rules_fallback(): rules = _get_extraction_rules(_AGENT_CONFIG, "plain_text") assert "extract" in rules.lower() def test_get_no_match_behavior_from_global_rules(): behavior = _get_no_match_behavior(_AGENT_CONFIG) # The global rule says "non creare alcuna entità" → skip behavior assert behavior # non-empty def test_get_no_match_behavior_default(): behavior = _get_no_match_behavior({}) assert "project" in behavior.lower() # ── Unit: 2.9 — device offline ─────────────────────────────────────────── @pytest.mark.asyncio async def test_2_9_device_offline(): """2.9 No device online → status=error, no executor created.""" config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager(online=False) with patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: await run_local_agent(_USER_ID, config, run_log, mgr) _, kwargs = mock_fin.call_args assert kwargs["status"] == "error" assert any("not connected" in e for e in kwargs.get("errors", [])) # ── Unit: 2.10 — empty file ────────────────────────────────────────────── @pytest.mark.asyncio async def test_2_10_empty_file(): """2.10 File with empty content → skipped, items_processed=0, success.""" config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/empty.html", file_content="", # empty projects=[_PROJECT_ALPHA], ) with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: await run_local_agent(_USER_ID, config, run_log, mgr) _, kwargs = mock_fin.call_args assert kwargs["items_processed"] == 0 assert kwargs["status"] == "success" assert kwargs["items_created"] == 0 # ── Unit: 2.8 — items_created count ───────────────────────────────────── @pytest.mark.asyncio async def test_2_8_items_created_count(): """2.8 items_created == number of create_* tool calls per run.""" config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, _calls = _make_executor( file_path="/emails/action.html", file_content=_ACTION_EMAIL, projects=[_PROJECT_ALPHA], ) # Simulate LLM calling create_task twice and update_note once. async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str: if _tool_calls_out is not None: _tool_calls_out.extend(["create_task", "create_note", "update_task"]) return "Done." with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._run_agent_with_tools", side_effect=mock_run_agent), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: await run_local_agent(_USER_ID, config, run_log, mgr) _, kwargs = mock_fin.call_args # Only create_task + create_note count (not update_task). assert kwargs["items_created"] == 2 assert kwargs["items_processed"] == 1 # ── Eval: 2.1–2.7 (real LLM + Langfuse scoring) ────────────────────────── # # Langfuse V3 pattern: # lf.start_as_current_observation(name=...) as context manager → obs object # obs.score(name=..., value=...) (not lf.score(trace_id=...)) # contextlib.nullcontext() when lf is None → obs is None, no-op # ───────────────────────────────────────────────────────────────────────── @pytest.mark.asyncio @pytest.mark.eval async def test_2_1_email_to_task(): """2.1 Action email → LLM calls create_task. Score: runner.email_to_task.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/ProjectAlpha_action.html", file_content=_ACTION_EMAIL, projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.1-email-to-task", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin: await run_local_agent(_USER_ID, config, run_log, mgr) _, kwargs = mock_fin.call_args task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"] score = 1.0 if len(task_creates) >= 1 else 0.0 if obs is not None: obs.score( name="runner.email_to_task", value=score, comment=f"task_creates={len(task_creates)} items_created={kwargs.get('items_created')}", ) if lf: lf.flush() assert score == 1.0, f"Expected at least 1 task created, got {len(task_creates)}" @pytest.mark.asyncio @pytest.mark.eval async def test_2_2_email_to_note(): """2.2 Informational email → LLM calls create_note. Score: runner.email_to_note.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/ProjectAlpha_info.html", file_content=_INFO_EMAIL, projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.2-email-to-note", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) note_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "notes"] score = 1.0 if len(note_creates) >= 1 else 0.0 if obs is not None: obs.score(name="runner.email_to_note", value=score, comment=f"note_creates={len(note_creates)}") if lf: lf.flush() assert score == 1.0, f"Expected at least 1 note created, got {len(note_creates)}" @pytest.mark.asyncio @pytest.mark.eval async def test_2_3_email_to_timeline(): """2.3 Email with event date → LLM calls create_timeline. Score: runner.email_to_timeline.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/ProjectAlpha_kickoff.html", file_content=_DATE_EMAIL, projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.3-email-to-timeline", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) tl_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "timelines"] score = 1.0 if len(tl_creates) >= 1 else 0.0 if obs is not None: obs.score(name="runner.email_to_timeline", value=score, comment=f"timeline_creates={len(tl_creates)}") if lf: lf.flush() assert score == 1.0, f"Expected at least 1 timeline created, got {len(tl_creates)}" @pytest.mark.asyncio @pytest.mark.eval async def test_2_4_project_matching_filename(): """2.4 Filename contains 'ProjectAlpha' → LLM assigns to proj-alpha. Score: runner.project_filename.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/ProjectAlpha_report.html", file_content=_ACTION_EMAIL, projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.4-project-filename", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) inserts = [c for c in calls if c["action"] == "insert"] correct_project = any( c.get("data", {}).get("projectId") == "proj-alpha" for c in inserts ) score = 1.0 if correct_project else 0.0 if obs is not None: obs.score(name="runner.project_filename", value=score) if lf: lf.flush() assert score == 1.0, "Expected inserts to use proj-alpha based on filename" @pytest.mark.asyncio @pytest.mark.eval async def test_2_5_project_matching_content(): """2.5 Email body mentions 'Project Alpha' → correct project assigned. Score: runner.project_content.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/email_001.html", # generic filename, no project hint file_content=_ACTION_EMAIL, # body mentions "Project Alpha" projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.5-project-content", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) inserts = [c for c in calls if c["action"] == "insert"] correct_project = any( c.get("data", {}).get("projectId") == "proj-alpha" for c in inserts ) score = 1.0 if correct_project else 0.0 if obs is not None: obs.score(name="runner.project_content", value=score) if lf: lf.flush() assert score == 1.0, "Expected inserts to use proj-alpha based on email body content" @pytest.mark.asyncio @pytest.mark.eval async def test_2_6_no_project_match_global_rule(): """2.6 Newsletter email + global rule 'no project = no entities' → no creates. Score: runner.no_project.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/newsletter.html", file_content=_NO_PROJECT_EMAIL, projects=[_PROJECT_ALPHA, _PROJECT_BETA], ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.6-no-project", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) inserts = [c for c in calls if c["action"] == "insert"] score = 1.0 if len(inserts) == 0 else 0.0 if obs is not None: obs.score(name="runner.no_project", value=score, comment=f"inserts={len(inserts)}") if lf: lf.flush() assert score == 1.0, f"Expected 0 inserts for unmatched newsletter, got {len(inserts)}" @pytest.mark.asyncio @pytest.mark.eval async def test_2_7_deduplication(): """2.7 Existing task with same title → LLM calls update_task, not create_task. Score: runner.dedup.""" from contextlib import nullcontext lf = get_langfuse() config = _make_config() run_log = _make_run_log(config.id) mgr = _make_manager() executor, calls = _make_executor( file_path="/emails/ProjectAlpha_followup.html", file_content=_ACTION_EMAIL, # "Fix the login bug" — already exists projects=[_PROJECT_ALPHA], existing_tasks=[_EXISTING_TASK], # task already exists ) obs_ctx = lf.start_as_current_observation( name="eval-runner-2.7-dedup", metadata={"step": "2"} ) if lf else nullcontext() with obs_ctx as obs: with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \ patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock): await run_local_agent(_USER_ID, config, run_log, mgr) task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"] task_updates = [c for c in calls if c["action"] == "update" and c.get("table") == "tasks"] score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0 if obs is not None: obs.score(name="runner.dedup", value=score, comment=f"creates={len(task_creates)} updates={len(task_updates)}") if lf: lf.flush() assert score == 1.0, ( f"Expected deduplication: creates={len(task_creates)}, updates={len(task_updates)}" )