fix(langfuse): remove invalid user_id/session_id kwargs from start_as_current_observation

Langfuse V3 does not accept user_id/session_id on observation-level calls.
Moved to metadata dict in agent_runner, deep_agent, and agent_setup.

refactor(tests): fixture-based pattern for agent_runner_v2 eval tests

- cases.yaml + data/ fixtures under tests/fixtures/agent_runner_v2/
- pytest_generate_tests parametrizes test_eval_runner from YAML
- _resolve_projects() handles symbolic names and inline dicts
- _evaluate_case() centralizes all assertion logic
- --runner-dir CLI option for custom fixture folders

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto Musso
2026-04-08 00:45:15 +02:00
parent d8add7e8cb
commit e672b58b6f
9 changed files with 235 additions and 321 deletions

View File

@@ -4,32 +4,36 @@ Covers the unified per-file flow:
Phase A — detect + preprocess (Python, zero LLM)
Phase B — single LLM call with tools (classify + extract + create)
Test cases:
2.1 Happy path: email with action → create_task called
2.2 Happy path: email informative → create_note called
2.3 Happy path: email with date → create_timeline called
2.4 Project matching via filename → correct project_id used
2.5 Project matching via content → correct project_id used
2.6 No project match + global rule → no create_* called
2.7 Deduplication → update_task, not create_task
2.8 items_created count (unit) → items_created == N create_* calls
2.9 Device offline (unit) → status=error
2.10 Empty file (unit) → items_processed=0, status=success
Fixture-based eval tests (2.12.7)
-----------------------------------
Cases are defined in tests/fixtures/agent_runner_v2/cases.yaml.
Email HTML files live in tests/fixtures/agent_runner_v2/data/.
Use --runner-dir to point at a custom folder (same structure required).
Unit tests (no LLM)
--------------------
2.8 items_created count → items_created == N create_* calls
2.9 Device offline → status=error
2.10 Empty file → items_processed=0, status=success
Run:
pytest tests/test_agent_runner_v2.py -v
pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only
pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only
pytest tests/test_agent_runner_v2.py -v --runner-dir /path/to/dir # custom fixtures
"""
from __future__ import annotations
import uuid
from contextlib import nullcontext
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import yaml
from app.core.agent_runner import (
_format_metadata,
@@ -40,7 +44,7 @@ from app.core.agent_runner import (
run_local_agent,
)
from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback
from app.core.langfuse_client import get_langfuse
from app.models import AgentRunLog, LocalAgentConfig
from tests.conftest import TEST_USER_IDS
@@ -48,6 +52,8 @@ from tests.conftest import TEST_USER_IDS
_USER_ID = TEST_USER_IDS["power"]
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "agent_runner_v2"
_AGENT_CONFIG = {
"content_types": [
{
@@ -68,55 +74,53 @@ _AGENT_CONFIG = {
"data_types": ["tasks", "notes", "timelines"],
}
_PROJECT_ALPHA = {"id": "proj-alpha", "name": "Project Alpha", "status": "active"}
_PROJECT_BETA = {"id": "proj-beta", "name": "Project Beta", "status": "active"}
# ── Sample email content ──────────────────────────────────────────────────
_ACTION_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> boss@company.com</p>
<p><b>To:</b> dev@company.com</p>
<p><b>Subject:</b> Fix the login bug</p>
<p><b>Date:</b> 2026-04-07</p>
<p>Hi,<br>Please fix the login bug in Project Alpha by Friday. High priority!</p>
</body></html>
"""
_INFO_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>To:</b> team@company.com</p>
<p><b>Subject:</b> FYI: New policy for Project Alpha</p>
<p>Just a heads-up that starting next week all code reviews must be done
within 24 hours for Project Alpha. No action needed from you now.</p>
</body></html>
"""
_DATE_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>Subject:</b> Project Alpha kick-off meeting</p>
<p>The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.</p>
</body></html>
"""
_NO_PROJECT_EMAIL = """\
<html><head></head><body>
<p><b>From:</b> newsletter@ads.com</p>
<p><b>Subject:</b> Weekly newsletter</p>
<p>Check out our latest deals on electronics!</p>
</body></html>
"""
_EXISTING_TASK = {
"id": "task-existing",
"title": "Fix the login bug",
"status": "todo",
"priority": "medium",
# Canonical project definitions, referenced symbolically in cases.yaml.
_PROJECTS: dict[str, dict] = {
"alpha": {"id": "proj-alpha", "name": "Project Alpha", "status": "active"},
"beta": {"id": "proj-beta", "name": "Project Beta", "status": "active"},
}
# ── Fixture loading ───────────────────────────────────────────────────────
def _fixtures_dir(config) -> Path:
override = config.getoption("--runner-dir")
return Path(override) if override else _DEFAULT_FIXTURE_DIR
def _load_cases(config) -> list[dict]:
return yaml.safe_load(
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
)
def _read_case_file(case: dict, data_dir: Path) -> str:
return (data_dir / case["file"]).read_text(encoding="utf-8")
def _resolve_projects(entries: list[str | dict]) -> list[dict]:
"""Resolve project list from YAML: symbolic names and/or inline dicts."""
result = []
for entry in entries:
if isinstance(entry, str):
if entry in _PROJECTS:
result.append(_PROJECTS[entry])
elif isinstance(entry, dict):
result.append(entry)
return result
# ── pytest_generate_tests — parametrize eval tests from YAML ─────────────
def pytest_generate_tests(metafunc):
if "runner_case" not in metafunc.fixturenames:
return
cases = _load_cases(metafunc.config)
metafunc.parametrize("runner_case", cases, ids=[c["id"] for c in cases])
# ── Test helpers ──────────────────────────────────────────────────────────
@@ -175,7 +179,7 @@ def _make_executor(
directory listing, file reading, project/entity fetching, and CRUD.
"""
calls: list[dict] = []
_projects = projects or [_PROJECT_ALPHA, _PROJECT_BETA]
_projects = projects if projects is not None else list(_PROJECTS.values())
async def _executor(payload: dict) -> dict:
action = payload.get("action", "")
@@ -184,10 +188,7 @@ def _make_executor(
calls.append({"action": action, "table": table, "data": data})
if action == "list_directory":
path = data.get("path", "") or payload.get("data", {}).get("path", "")
return {
"entries": [{"type": "file", "path": file_path}]
}
return {"entries": [{"type": "file", "path": file_path}]}
if action == "get_file_metadata":
return {"modifiedAt": None}
@@ -225,7 +226,7 @@ def test_format_projects_empty():
def test_format_projects_with_data():
result = _format_projects([_PROJECT_ALPHA])
result = _format_projects([_PROJECTS["alpha"]])
assert "proj-alpha" in result
assert "Project Alpha" in result
@@ -253,7 +254,6 @@ def test_get_extraction_rules_fallback():
def test_get_no_match_behavior_from_global_rules():
behavior = _get_no_match_behavior(_AGENT_CONFIG)
# The global rule says "non creare alcuna entità" → skip behavior
assert behavior # non-empty
@@ -292,8 +292,8 @@ async def test_2_10_empty_file():
executor, calls = _make_executor(
file_path="/emails/empty.html",
file_content="", # empty
projects=[_PROJECT_ALPHA],
file_content="",
projects=[_PROJECTS["alpha"]],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
@@ -318,11 +318,10 @@ async def test_2_8_items_created_count():
executor, _calls = _make_executor(
file_path="/emails/action.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA],
file_content="<html><body><p>Fix the login bug in Project Alpha.</p></body></html>",
projects=[_PROJECTS["alpha"]],
)
# Simulate LLM calling create_task twice and update_note once.
async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str:
if _tool_calls_out is not None:
_tool_calls_out.extend(["create_task", "create_note", "update_task"])
@@ -339,33 +338,43 @@ async def test_2_8_items_created_count():
assert kwargs["items_processed"] == 1
# ── Eval: 2.12.7 (real LLM + Langfuse scoring) ──────────────────────────
# ── Eval: 2.12.7 — fixture-driven, real LLM + Langfuse scoring ──────────
#
# Langfuse V3 pattern:
# lf.start_as_current_observation(name=...) as context manager → obs object
# obs.score(name=..., value=...) (not lf.score(trace_id=...))
# contextlib.nullcontext() when lf is None → obs is None, no-op
# Cases loaded from tests/fixtures/agent_runner_v2/cases.yaml.
# Supported assertions (from YAML):
# expect_insert: <table> → at least 1 insert in that table
# expect_no_insert: true → zero inserts in any table
# expect_project_id: <id> → any insert carries this projectId
# expect_dedup: true → task inserts == 0 OR task updates >= 1
# ─────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_1_email_to_task():
"""2.1 Action email → LLM calls create_task. Score: runner.email_to_task."""
from contextlib import nullcontext
lf = get_langfuse()
async def test_eval_runner(runner_case, pytestconfig):
"""Parametrized eval test — one invocation per YAML case."""
case: dict = runner_case
data_dir = _fixtures_dir(pytestconfig) / "data"
file_content = _read_case_file(case, data_dir)
projects = _resolve_projects(case.get("projects", []))
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_action.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
file_path=case["file_path"],
file_content=file_content,
projects=projects,
existing_tasks=case.get("existing_tasks"),
existing_notes=case.get("existing_notes"),
existing_timelines=case.get("existing_timelines"),
)
lf = get_langfuse()
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.1-email-to-task", metadata={"step": "2"}
name=f"eval-runner-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
metadata={"step": "2", "case_id": case["id"]},
) if lf else nullcontext()
with obs_ctx as obs:
@@ -374,253 +383,50 @@ async def test_2_1_email_to_task():
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"]
score = 1.0 if len(task_creates) >= 1 else 0.0
inserts = [c for c in calls if c["action"] == "insert"]
score, comment = _evaluate_case(case, calls, kwargs)
if obs is not None:
obs.score(
name="runner.email_to_task",
name=case.get("score_name", f"runner.case_{case['id']}"),
value=score,
comment=f"task_creates={len(task_creates)} items_created={kwargs.get('items_created')}",
comment=comment,
)
if lf:
lf.flush()
assert score == 1.0, f"Expected at least 1 task created, got {len(task_creates)}"
assert score == 1.0, f"[{case['id']}] {case.get('description', '')}{comment}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_2_email_to_note():
"""2.2 Informational email → LLM calls create_note. Score: runner.email_to_note."""
from contextlib import nullcontext
lf = get_langfuse()
def _evaluate_case(case: dict, calls: list[dict], finalize_kwargs: dict) -> tuple[float, str]:
"""Return (score, comment) for a YAML case given the captured executor calls."""
inserts = [c for c in calls if c["action"] == "insert"]
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_info.html",
file_content=_INFO_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.2-email-to-note", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
note_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "notes"]
score = 1.0 if len(note_creates) >= 1 else 0.0
if obs is not None:
obs.score(name="runner.email_to_note", value=score,
comment=f"note_creates={len(note_creates)}")
if lf:
lf.flush()
assert score == 1.0, f"Expected at least 1 note created, got {len(note_creates)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_3_email_to_timeline():
"""2.3 Email with event date → LLM calls create_timeline. Score: runner.email_to_timeline."""
from contextlib import nullcontext
lf = get_langfuse()
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_kickoff.html",
file_content=_DATE_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.3-email-to-timeline", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
tl_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "timelines"]
score = 1.0 if len(tl_creates) >= 1 else 0.0
if obs is not None:
obs.score(name="runner.email_to_timeline", value=score,
comment=f"timeline_creates={len(tl_creates)}")
if lf:
lf.flush()
assert score == 1.0, f"Expected at least 1 timeline created, got {len(tl_creates)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_4_project_matching_filename():
"""2.4 Filename contains 'ProjectAlpha' → LLM assigns to proj-alpha. Score: runner.project_filename."""
from contextlib import nullcontext
lf = get_langfuse()
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_report.html",
file_content=_ACTION_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.4-project-filename", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
inserts = [c for c in calls if c["action"] == "insert"]
correct_project = any(
c.get("data", {}).get("projectId") == "proj-alpha" for c in inserts
)
score = 1.0 if correct_project else 0.0
if obs is not None:
obs.score(name="runner.project_filename", value=score)
if lf:
lf.flush()
assert score == 1.0, "Expected inserts to use proj-alpha based on filename"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_5_project_matching_content():
"""2.5 Email body mentions 'Project Alpha' → correct project assigned. Score: runner.project_content."""
from contextlib import nullcontext
lf = get_langfuse()
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/email_001.html", # generic filename, no project hint
file_content=_ACTION_EMAIL, # body mentions "Project Alpha"
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.5-project-content", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
inserts = [c for c in calls if c["action"] == "insert"]
correct_project = any(
c.get("data", {}).get("projectId") == "proj-alpha" for c in inserts
)
score = 1.0 if correct_project else 0.0
if obs is not None:
obs.score(name="runner.project_content", value=score)
if lf:
lf.flush()
assert score == 1.0, "Expected inserts to use proj-alpha based on email body content"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_6_no_project_match_global_rule():
"""2.6 Newsletter email + global rule 'no project = no entities' → no creates. Score: runner.no_project."""
from contextlib import nullcontext
lf = get_langfuse()
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/newsletter.html",
file_content=_NO_PROJECT_EMAIL,
projects=[_PROJECT_ALPHA, _PROJECT_BETA],
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.6-no-project", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
inserts = [c for c in calls if c["action"] == "insert"]
if case.get("expect_no_insert"):
score = 1.0 if len(inserts) == 0 else 0.0
return score, f"inserts={len(inserts)} (expected 0)"
if obs is not None:
obs.score(name="runner.no_project", value=score,
comment=f"inserts={len(inserts)}")
if "expect_insert" in case:
tables = case["expect_insert"]
if isinstance(tables, str):
tables = [tables]
missing = [t for t in tables if not any(c["table"] == t for c in inserts)]
score = 1.0 if not missing else 0.0
counts = {t: sum(1 for c in inserts if c["table"] == t) for t in tables}
return score, f"inserts={counts}" + (f" missing={missing}" if missing else "")
if lf:
lf.flush()
if "expect_project_id" in case:
expected_pid = case["expect_project_id"]
correct = any(c.get("data", {}).get("projectId") == expected_pid for c in inserts)
score = 1.0 if correct else 0.0
all_pids = [c.get("data", {}).get("projectId") for c in inserts]
return score, f"projectIds={all_pids} (expected {expected_pid!r})"
assert score == 1.0, f"Expected 0 inserts for unmatched newsletter, got {len(inserts)}"
@pytest.mark.asyncio
@pytest.mark.eval
async def test_2_7_deduplication():
"""2.7 Existing task with same title → LLM calls update_task, not create_task. Score: runner.dedup."""
from contextlib import nullcontext
lf = get_langfuse()
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/ProjectAlpha_followup.html",
file_content=_ACTION_EMAIL, # "Fix the login bug" — already exists
projects=[_PROJECT_ALPHA],
existing_tasks=[_EXISTING_TASK], # task already exists
)
obs_ctx = lf.start_as_current_observation(
name="eval-runner-2.7-dedup", metadata={"step": "2"}
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock):
await run_local_agent(_USER_ID, config, run_log, mgr)
task_creates = [c for c in calls if c["action"] == "insert" and c["table"] == "tasks"]
task_updates = [c for c in calls if c["action"] == "update" and c.get("table") == "tasks"]
if case.get("expect_dedup"):
task_creates = [c for c in inserts if c["table"] == "tasks"]
task_updates = [c for c in calls if c["action"] == "update" and c["table"] == "tasks"]
score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0
return score, f"task_creates={len(task_creates)} task_updates={len(task_updates)}"
if obs is not None:
obs.score(name="runner.dedup", value=score,
comment=f"creates={len(task_creates)} updates={len(task_updates)}")
if lf:
lf.flush()
assert score == 1.0, (
f"Expected deduplication: creates={len(task_creates)}, updates={len(task_updates)}"
)
return 0.0, "no assertion defined in case"