feat(batch-agent): add E2E evaluation harness with Langfuse integration

- eval/mock_executor.py: intercepts execute_on_client, serves fixture
  files from disk, records all mutations (insert/update/delete)
- eval/config.py: YAML fixture loader with prompt variants, expected
  results, seed records, model overrides
- eval/scorer.py: FieldMatchScorer (fuzzy title match, per-field
  accuracy, precision/recall/F1) + LLMJudgeScorer (semantic eval)
- eval/langfuse_eval.py: sync fixtures to Langfuse datasets, create
  dataset runs, post scores, link traces to runs
- eval/runner.py: orchestrates fixture → mock → agent pipeline →
  scoring → Langfuse reporting
- eval/cli.py: CLI (python -m eval run/list/sync) with --models,
  --variants, --fixture, --no-judge flags
- eval/fixtures/: example Italian freelance scenario with 3 prompt
  variants (baseline, detailed_italian, minimal)
This commit is contained in:
Roberto Musso
2026-03-23 08:54:19 +01:00
parent 971f1dd84f
commit 75a826c9d8
12 changed files with 1382 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Batch Agent E2E evaluation harness."""

View File

@@ -0,0 +1,5 @@
"""Allow running the eval package as ``python -m eval``."""
from eval.cli import main
main()

View File

@@ -0,0 +1,182 @@
"""CLI entry point for the batch agent evaluation harness.
Usage::
# From services/batch-agent/:
python -m eval run # all fixtures, default model
python -m eval run --fixture=freelance-invoices # single fixture
python -m eval run --models=gpt-4o,anthropic/claude-sonnet-4
python -m eval run --variants=baseline,detailed # specific prompt variants
python -m eval run --no-judge # skip LLM judge scoring
python -m eval list # list available fixtures
python -m eval sync # sync fixtures to Langfuse datasets
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
# Ensure the service root and repo root are in sys.path
_SERVICE_ROOT = Path(__file__).resolve().parent.parent
_REPO_ROOT = _SERVICE_ROOT.parent.parent
for p in (_SERVICE_ROOT, _REPO_ROOT):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
from eval.config import discover_fixtures
from eval.runner import run_fixture_eval, print_results
from eval import langfuse_eval
def _setup_logging(verbose: bool) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(name)-20s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S",
)
# Quiet noisy libraries
for name in ("httpx", "httpcore", "openai", "litellm", "urllib3"):
logging.getLogger(name).setLevel(logging.WARNING)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Batch Agent E2E evaluation harness",
prog="python -m eval",
)
sub = parser.add_subparsers(dest="command", required=True)
# ── run ───────────────────────────────────────────────────────
run_cmd = sub.add_parser("run", help="Run evaluations")
run_cmd.add_argument(
"--fixture", "-f",
help="Run only the named fixture (default: all)",
)
run_cmd.add_argument(
"--models", "-m",
default="gpt-4o",
help="Comma-separated list of models to test (default: gpt-4o)",
)
run_cmd.add_argument(
"--variants", "-p",
default=None,
help="Comma-separated prompt variants to test (default: all in fixture)",
)
run_cmd.add_argument(
"--no-judge",
action="store_true",
help="Skip LLM-as-judge scoring",
)
run_cmd.add_argument(
"--judge-model",
default="gpt-4o-mini",
help="Model for LLM judge (default: gpt-4o-mini)",
)
run_cmd.add_argument(
"--fixtures-dir",
default=None,
help="Path to fixtures directory (default: eval/fixtures/)",
)
run_cmd.add_argument("-v", "--verbose", action="store_true")
# ── list ──────────────────────────────────────────────────────
list_cmd = sub.add_parser("list", help="List available fixtures")
list_cmd.add_argument("--fixtures-dir", default=None)
list_cmd.add_argument("-v", "--verbose", action="store_true")
# ── sync ──────────────────────────────────────────────────────
sync_cmd = sub.add_parser("sync", help="Sync fixtures to Langfuse datasets")
sync_cmd.add_argument("--fixture", "-f", default=None, help="Sync only the named fixture")
sync_cmd.add_argument("--fixtures-dir", default=None)
sync_cmd.add_argument("-v", "--verbose", action="store_true")
return parser.parse_args()
def _fixtures_dir(arg: str | None) -> Path | None:
if arg:
return Path(arg)
return None
async def _cmd_run(args: argparse.Namespace) -> None:
fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir))
if not fixtures:
print("No fixtures found. Create YAML files in eval/fixtures/.")
return
if args.fixture:
fixtures = [f for f in fixtures if f.name == args.fixture]
if not fixtures:
print(f"Fixture '{args.fixture}' not found.")
return
models = [m.strip() for m in args.models.split(",")]
variants = [v.strip() for v in args.variants.split(",")] if args.variants else None
all_results = []
for fixture in fixtures:
results = await run_fixture_eval(
fixture,
models=models,
variants=variants,
use_llm_judge=not args.no_judge,
judge_model=args.judge_model,
)
all_results.extend(results)
print_results(all_results)
def _cmd_list(args: argparse.Namespace) -> None:
fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir))
if not fixtures:
print("No fixtures found.")
return
print(f"\n{'Name':<30} {'Types':<25} {'Variants':<20} {'Expected'}")
print("-" * 90)
for f in fixtures:
variants = ", ".join(f.prompt_variants.keys())
types = ", ".join(f.data_types)
print(f"{f.name:<30} {types:<25} {variants:<20} {len(f.expected)}")
print()
def _cmd_sync(args: argparse.Namespace) -> None:
fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir))
if args.fixture:
fixtures = [f for f in fixtures if f.name == args.fixture]
if not fixtures:
print("No fixtures to sync.")
return
for fixture in fixtures:
name = langfuse_eval.sync_fixture_to_dataset(fixture)
if name:
print(f"Synced: {fixture.name}{name}")
else:
print(f"Skipped: {fixture.name} (Langfuse not configured)")
def main() -> None:
args = _parse_args()
_setup_logging(args.verbose)
if args.command == "run":
asyncio.run(_cmd_run(args))
elif args.command == "list":
_cmd_list(args)
elif args.command == "sync":
_cmd_sync(args)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,129 @@
"""Eval configuration — YAML fixture loader and dataclasses.
A *fixture* is a YAML file that defines a complete test scenario:
.. code-block:: yaml
name: freelance-invoices
description: Extract tasks and notes from invoice PDFs (text layer)
directory: sample_files/invoices # relative to fixture dir
data_types: [tasks, notes]
file_extensions: [txt, md]
# Preseeded records the agent "sees" as existing data
seed_records:
projects:
- id: proj-1
name: "Website Redesign"
status: active
tasks: []
# Prompt variations to test (at least one required)
prompt_variants:
baseline: |
Extract action items as tasks and meeting summaries as notes.
Set priority based on urgency keywords.
detailed: |
Extract action items as tasks. Map "URGENT" to high priority,
"ASAP" to medium. Summaries become notes with full content.
# Expected extractions — what the agent SHOULD produce
expected:
tasks:
- title: "Send revised invoice to client"
priority: high
status: todo
- title: "Update project timeline"
priority: medium
notes:
- title: "Meeting summary - March kickoff"
# Optional: models to test (overrides CLI --models)
models: []
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
@dataclass
class ExpectedRecord:
"""A single expected extraction result.
Only the fields specified are checked — unspecified fields are ignored.
"""
table: str # tasks | notes | timelines | projects
fields: dict[str, Any] # field_name → expected_value
@dataclass
class EvalFixture:
"""A complete test scenario loaded from YAML."""
name: str
description: str
directory: str # relative path to sample files
data_types: list[str]
file_extensions: list[str]
seed_records: dict[str, list[dict]]
prompt_variants: dict[str, str] # variant_name → prompt_template
expected: list[ExpectedRecord]
models: list[str] # if empty, use CLI default
fixture_path: Path = field(default_factory=lambda: Path("."))
@property
def fixture_dir(self) -> Path:
"""Absolute path to the sample files directory."""
return self.fixture_path.parent / self.directory
@classmethod
def from_yaml(cls, path: Path) -> "EvalFixture":
"""Load a fixture from a YAML file."""
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
expected: list[ExpectedRecord] = []
for table, records in (raw.get("expected") or {}).items():
for rec in records:
expected.append(ExpectedRecord(table=table, fields=rec))
return cls(
name=raw["name"],
description=raw.get("description", ""),
directory=raw.get("directory", "sample_files"),
data_types=raw.get("data_types", ["tasks"]),
file_extensions=raw.get("file_extensions", []),
seed_records=raw.get("seed_records", {}),
prompt_variants=raw.get("prompt_variants", {"default": ""}),
expected=expected,
models=raw.get("models", []),
fixture_path=path,
)
def discover_fixtures(fixtures_dir: Path | None = None) -> list[EvalFixture]:
"""Find and load all YAML fixtures in the fixtures directory."""
if fixtures_dir is None:
fixtures_dir = Path(__file__).parent / "fixtures"
fixtures: list[EvalFixture] = []
if not fixtures_dir.is_dir():
logger.warning("eval: fixtures directory not found: %s", fixtures_dir)
return fixtures
for yaml_path in sorted(fixtures_dir.glob("*.yaml")):
try:
fixtures.append(EvalFixture.from_yaml(yaml_path))
logger.info("eval: loaded fixture %s from %s", fixtures[-1].name, yaml_path.name)
except Exception as exc:
logger.error("eval: failed to load fixture %s: %s", yaml_path.name, exc)
return fixtures

View File

@@ -0,0 +1,86 @@
# Fixture: freelance-invoices
# Tests extraction of tasks, notes, and timelines from
# invoices and meeting notes typical of a freelance workflow.
name: freelance-invoices
description: >
Extract tasks, notes, and timeline events from Italian freelance
invoices and meeting notes. Tests project matching, priority
mapping, and bilingual content handling.
directory: sample_files/invoices
data_types: [tasks, notes, timelines]
file_extensions: [txt, md]
# Pre-existing records in the "database"
seed_records:
projects:
- id: "proj-web-redesign"
name: "Redesign Sito Web Corporate"
status: "active"
aiSummary: "Corporate website redesign for Studio Architettura Bianchi"
- id: "proj-ecommerce"
name: "E-Commerce FashionStore"
status: "active"
aiSummary: "Next.js e-commerce platform for FashionStore srl"
tasks: []
notes: []
timelines: []
# Prompt variations to compare
prompt_variants:
baseline: |
Extract action items as tasks and summaries as notes.
For timelines, extract any mentioned dates and deadlines.
Set isAiSuggested=1 on every record.
detailed_italian: |
Estrai i dati dai file come segue:
- TASK: ogni azione da fare, deliverable, o item con scadenza.
Mappa "URGENTE" o "ALTA PRIORITÀ" → priority: high.
Mappa "media priorità" → priority: medium.
Mappa "bassa priorità" → priority: low.
Se un item è marcato come "completato" o [x], impostalo status: done.
Altrimenti status: todo.
- NOTE: riassunti di meeting, decisioni prese, note tecniche.
Il titolo deve essere descrittivo. Il content deve includere tutti i dettagli.
- TIMELINE: date di scadenza, milestone, meeting futuri.
Formato data: timestamp Unix in millisecondi.
Imposta sempre isAiSuggested=1.
minimal: |
Extract only high-priority action items as tasks.
Ignore notes and timelines unless explicitly marked as important.
Set isAiSuggested=1.
# Expected extractions (what the agent SHOULD produce)
# Only key fields are specified — scorer uses fuzzy matching
expected:
tasks:
- title: "Sviluppo frontend React"
priority: "high"
status: "todo"
- title: "Integrazione API backend"
priority: "medium"
status: "todo"
- title: "Testing cross-browser e fix bug responsive"
status: "todo"
- title: "Preparare wireframe homepage"
priority: "high"
status: "todo"
- title: "Setup progetto Next.js e configurare CI/CD"
priority: "medium"
status: "todo"
- title: "Ricerca plugin Stripe per gestione abbonamenti"
priority: "low"
status: "todo"
notes:
- title: "Meeting Kickoff Progetto E-Commerce"
timelines:
- title: "MVP E-Commerce pronto"
- title: "Meeting di revisione"
# Models to test (can be overridden via CLI --models)
models: []

View File

@@ -0,0 +1,18 @@
FATTURA N. 2026-0042
Data: 15 Marzo 2026
Cliente: Studio Architettura Bianchi
Progetto: Redesign Sito Web Corporate
Descrizione lavori:
- Sviluppo frontend React (40 ore) — URGENTE, completare entro 20 marzo
- Integrazione API backend (20 ore) — priorità media
- Design UI/UX mockup homepage (8 ore) — completato
- Testing cross-browser e fix bug responsive (12 ore) — da iniziare
Totale: €4.800,00 + IVA
Note:
Meeting di revisione previsto per il 18 marzo alle 10:00.
Il cliente ha richiesto modifiche al layout mobile della sezione contatti.
Attendere conferma budget aggiuntivo per sezione blog.

View File

@@ -0,0 +1,25 @@
# Meeting Notes - Kickoff Progetto E-Commerce
**Data:** 10 Marzo 2026
**Partecipanti:** Marco R., Giulia T., Cliente (FashionStore srl)
## Decisioni prese
1. **Piattaforma**: Next.js + Stripe per i pagamenti
2. **Timeline**: MVP pronto entro 30 aprile 2026
3. **Budget**: €12.000 totale, €4.000 anticipo già ricevuto
## Action items
- [ ] Marco: preparare wireframe homepage entro 14 marzo — ALTA PRIORITÀ
- [ ] Giulia: setup progetto Next.js e configurare CI/CD — media priorità
- [ ] Marco: ricerca plugin Stripe per gestione abbonamenti — bassa priorità
- [x] Giulia: inviare contratto firmato al cliente — COMPLETATO
## Note aggiuntive
Il cliente vuole un design minimalista, ispirato a Zara.com.
Colori primari: nero, bianco, oro.
Font: Inter per body, Playfair Display per headings.
Prossimo meeting: 24 marzo 2026 ore 15:00.

View File

@@ -0,0 +1,222 @@
"""Langfuse evaluation integration — datasets, runs, and scoring.
Uses the Langfuse Python SDK to:
1. **Sync fixtures → Langfuse datasets**: Each YAML fixture becomes a dataset,
each prompt variant + expected pair becomes a dataset item.
2. **Track eval runs**: Each (fixture × model × prompt_variant) execution
is recorded as a dataset run with linked traces and scores.
3. **Post scores**: precision, recall, F1, field_accuracy, llm_judge are
posted as numeric scores on the trace/run.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from shared.config import settings
from eval.config import EvalFixture
from eval.scorer import EvalScores
logger = logging.getLogger(__name__)
def _get_langfuse():
"""Get or create a Langfuse client instance."""
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
return None
try:
from langfuse import Langfuse
return Langfuse(
secret_key=settings.LANGFUSE_SECRET_KEY,
public_key=settings.LANGFUSE_PUBLIC_KEY,
host=settings.LANGFUSE_HOST,
)
except Exception as exc:
logger.warning("langfuse_eval: failed to create client: %s", exc)
return None
def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None:
"""Create or update a Langfuse dataset from a fixture.
Each prompt variant becomes a separate dataset item with:
- input: {directory, data_types, prompt_template, seed_records}
- expected_output: {expected records}
Returns the dataset name, or None if Langfuse is unavailable.
"""
lf = _get_langfuse()
if lf is None:
logger.info("langfuse_eval: Langfuse not configured — skipping dataset sync")
return None
dataset_name = f"batch-eval-{fixture.name}"
try:
lf.create_dataset(
name=dataset_name,
description=fixture.description,
metadata={"data_types": fixture.data_types, "file_extensions": fixture.file_extensions},
)
except Exception:
# Dataset may already exist — that's fine
pass
expected_output = {}
for rec in fixture.expected:
expected_output.setdefault(rec.table, []).append(rec.fields)
for variant_name, prompt_template in fixture.prompt_variants.items():
item_id = f"{fixture.name}--{variant_name}"
try:
lf.create_dataset_item(
dataset_name=dataset_name,
id=item_id,
input={
"directory": fixture.directory,
"data_types": fixture.data_types,
"prompt_template": prompt_template,
"seed_records": fixture.seed_records,
},
expected_output=expected_output,
metadata={"prompt_variant": variant_name},
)
except Exception as exc:
logger.warning(
"langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc
)
lf.flush()
logger.info("langfuse_eval: synced fixture '%s' → dataset '%s'", fixture.name, dataset_name)
return dataset_name
def create_eval_run(
dataset_name: str,
run_name: str,
*,
metadata: dict[str, Any] | None = None,
) -> str:
"""Create a dataset run in Langfuse. Returns the run name."""
lf = _get_langfuse()
if lf is None:
return run_name
try:
lf.create_dataset_run(
dataset_name=dataset_name,
run_name=run_name,
metadata=metadata or {},
)
lf.flush()
except Exception as exc:
logger.warning("langfuse_eval: failed to create run %s: %s", run_name, exc)
return run_name
def post_eval_scores(
scores: EvalScores,
*,
trace_id: str | None = None,
dataset_name: str | None = None,
run_name: str | None = None,
) -> None:
"""Post evaluation scores to Langfuse.
If trace_id is provided, scores are attached to that trace.
"""
lf = _get_langfuse()
if lf is None:
return
score_data = [
("precision", scores.precision),
("recall", scores.recall),
("f1", scores.f1),
("field_accuracy", scores.field_accuracy),
]
if scores.llm_judge_score is not None:
score_data.append(("llm_judge", scores.llm_judge_score))
for name, value in score_data:
try:
kwargs: dict[str, Any] = {
"name": name,
"value": value,
"comment": f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}",
}
if trace_id:
kwargs["trace_id"] = trace_id
lf.score(**kwargs)
except Exception as exc:
logger.warning("langfuse_eval: failed to post score %s: %s", name, exc)
lf.flush()
logger.info(
"langfuse_eval: posted %d scores for %s/%s/%s",
len(score_data), scores.fixture_name, scores.model, scores.prompt_variant,
)
def log_eval_trace(
*,
fixture_name: str,
model: str,
prompt_variant: str,
prompt_template: str,
actual_mutations: list[dict],
scores_summary: dict[str, Any],
dataset_name: str | None = None,
run_name: str | None = None,
dataset_item_id: str | None = None,
) -> str | None:
"""Create a Langfuse trace for one eval execution and link it to a dataset run.
Returns the trace_id, or None if Langfuse is unavailable.
"""
lf = _get_langfuse()
if lf is None:
return None
try:
trace = lf.trace(
name=f"eval-{fixture_name}",
input={
"prompt_template": prompt_template,
"model": model,
"prompt_variant": prompt_variant,
},
output={
"mutations": actual_mutations[:50],
"scores": scores_summary,
},
metadata={
"eval": True,
"fixture": fixture_name,
"model": model,
"prompt_variant": prompt_variant,
},
tags=["eval", f"model:{model}", f"variant:{prompt_variant}"],
)
# Link to dataset run if available
if dataset_name and run_name and dataset_item_id:
try:
dataset = lf.get_dataset(dataset_name)
item = dataset.get_item(dataset_item_id)
if item:
item.link(trace, run_name)
except Exception as exc:
logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc)
lf.flush()
return trace.id
except Exception as exc:
logger.warning("langfuse_eval: failed to create eval trace: %s", exc)
return None

View File

@@ -0,0 +1,208 @@
"""Mock executor — intercepts execute_on_client for offline E2E testing.
Patches ``app.ws_context.execute_on_client`` so agent pipeline runs don't
require a live Electron client or Redis. Instead:
- **Filesystem actions** (list_directory, read_file_content, get_file_metadata)
are served from local fixture files on disk.
- **Read actions** (select, get) return preseeded records from an in-memory
store provided by the test fixture.
- **Write actions** (insert, update, delete) are captured as *mutations* and
stored for later comparison against expected results.
"""
from __future__ import annotations
import json
import os
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, patch
@dataclass
class Mutation:
"""A single recorded write operation."""
action: str # insert | update | delete
table: str
data: dict[str, Any]
timestamp: float = field(default_factory=time.time)
@dataclass
class MockExecutor:
"""In-memory executor that replaces Redis-based tool round-trip.
Parameters
----------
fixture_dir : Path
Directory containing sample files for filesystem tool calls.
seed_records : dict[str, list[dict]]
Pre-existing records per table, e.g. ``{"tasks": [...], "projects": [...]}``.
The executor returns these for ``select`` / ``get`` actions and auto-updates
them on ``insert`` / ``update`` / ``delete`` so subsequent selects reflect changes.
"""
fixture_dir: Path
seed_records: dict[str, list[dict]] = field(default_factory=dict)
mutations: list[Mutation] = field(default_factory=list)
_id_counter: int = field(default=1000, repr=False)
# ── Public API ───────────────────────────────────────────────────
def reset(self) -> None:
"""Clear recorded mutations (keep seed_records intact)."""
self.mutations.clear()
def get_mutations(self, *, table: str | None = None, action: str | None = None) -> list[Mutation]:
"""Filter mutations by table and/or action."""
result = self.mutations
if table:
result = [m for m in result if m.table == table]
if action:
result = [m for m in result if m.action == action]
return result
def created_records(self, table: str) -> list[dict]:
"""Return data dicts of all inserts into *table*."""
return [m.data for m in self.mutations if m.table == table and m.action == "insert"]
def updated_records(self, table: str) -> list[dict]:
"""Return data dicts of all updates to *table*."""
return [m.data for m in self.mutations if m.table == table and m.action == "update"]
# ── Context manager for patching ──────────────────────────────
def patch(self):
"""Return an async context-manager that patches execute_on_client."""
return patch(
"app.ws_context.execute_on_client",
new=AsyncMock(side_effect=self._handle),
)
# ── Internal dispatch ─────────────────────────────────────────
async def _handle(
self,
action: str,
table: str | None = None,
data: dict[str, Any] | None = None,
filters: dict[str, Any] | None = None,
vector: list[float] | None = None,
limit: int | None = None,
) -> dict[str, Any]:
# Filesystem
if action == "list_directory":
return self._list_directory(data or {})
if action == "read_file_content":
return self._read_file(data or {})
if action == "get_file_metadata":
return self._get_file_metadata(data or {})
# CRUD
if action == "select":
return self._select(table or "", filters)
if action == "get":
return self._get(table or "", data or {})
if action == "insert":
return self._insert(table or "", data or {})
if action == "update":
return self._update(table or "", data or {})
if action == "delete":
return self._delete(table or "", data or {})
# Vector (no-op for eval)
if action in ("vector_upsert", "vector_search"):
return {"rows": []}
return {"error": f"Unknown action: {action}"}
# ── Filesystem handlers ───────────────────────────────────────
def _list_directory(self, data: dict) -> dict:
rel_path = data.get("path", "")
abs_path = self.fixture_dir / rel_path.lstrip("/\\")
if not abs_path.is_dir():
return {"entries": []}
entries: list[dict] = []
for child in sorted(abs_path.iterdir()):
entry_type = "directory" if child.is_dir() else "file"
# Return paths relative to fixture_dir but with the original prefix
entry_path = rel_path.rstrip("/\\") + "/" + child.name
entries.append({
"name": child.name,
"path": entry_path,
"type": entry_type,
})
return {"entries": entries}
def _read_file(self, data: dict) -> dict:
rel_path = data.get("path", "")
abs_path = self.fixture_dir / rel_path.lstrip("/\\")
if not abs_path.is_file():
return {"content": "", "error": f"File not found: {rel_path}"}
return {"content": abs_path.read_text(encoding="utf-8", errors="replace")}
def _get_file_metadata(self, data: dict) -> dict:
rel_path = data.get("path", "")
abs_path = self.fixture_dir / rel_path.lstrip("/\\")
if not abs_path.exists():
return {"error": f"Not found: {rel_path}"}
stat = abs_path.stat()
return {
"path": rel_path,
"size": stat.st_size,
"modifiedAt": int(stat.st_mtime * 1000),
"createdAt": int(stat.st_ctime * 1000),
"isDirectory": abs_path.is_dir(),
}
# ── CRUD handlers ─────────────────────────────────────────────
def _select(self, table: str, filters: dict | None) -> dict:
rows = list(self.seed_records.get(table, []))
if filters:
rows = [
r for r in rows
if all(r.get(k) == v for k, v in filters.items() if v is not None)
]
return {"rows": rows}
def _get(self, table: str, data: dict) -> dict:
record_id = data.get("id", "")
rows = self.seed_records.get(table, [])
for r in rows:
if r.get("id") == record_id:
return {"row": r}
return {"row": None}
def _insert(self, table: str, data: dict) -> dict:
self._id_counter += 1
record = {**data, "id": str(self._id_counter)}
# Add to seed so subsequent selects can find it
self.seed_records.setdefault(table, []).append(record)
self.mutations.append(Mutation(action="insert", table=table, data=record))
return {"row": record}
def _update(self, table: str, data: dict) -> dict:
record_id = data.get("id", "")
rows = self.seed_records.get(table, [])
for r in rows:
if r.get("id") == record_id:
r.update({k: v for k, v in data.items() if v is not None and v != ""})
self.mutations.append(Mutation(action="update", table=table, data=dict(r)))
return {"row": r}
# Record not found — still log the mutation
self.mutations.append(Mutation(action="update", table=table, data=data))
return {"row": data}
def _delete(self, table: str, data: dict) -> dict:
record_id = data.get("id", "")
rows = self.seed_records.get(table, [])
self.seed_records[table] = [r for r in rows if r.get("id") != record_id]
self.mutations.append(Mutation(action="delete", table=table, data={"id": record_id}))
return {"deleted": True}

View File

@@ -0,0 +1,2 @@
# Extra dependencies for the eval harness (on top of the service requirements.txt)
pyyaml>=6.0.0

View File

@@ -0,0 +1,236 @@
"""Eval runner — orchestrates fixture → mock → agent pipeline → scoring.
For each (fixture × model × prompt_variant) combination:
1. Build a MockExecutor with fixture data
2. Patch execute_on_client
3. Override LLM_MODEL in shared settings
4. Run the batch agent pipeline (run_local_agent)
5. Collect mutations from the mock
6. Score against expected results (field match + optional LLM judge)
7. Report scores to Langfuse
8. Print results
"""
from __future__ import annotations
import asyncio
import copy
import json
import logging
import time
import uuid
from pathlib import Path
from typing import Any
from eval.config import EvalFixture, ExpectedRecord
from eval.mock_executor import MockExecutor
from eval.scorer import (
EvalScores,
FieldScore,
compute_precision_recall,
llm_judge_score,
score_field_match,
)
from eval import langfuse_eval
logger = logging.getLogger(__name__)
async def run_single_eval(
fixture: EvalFixture,
model: str,
prompt_variant: str,
*,
use_llm_judge: bool = True,
judge_model: str = "gpt-4o-mini",
) -> EvalScores:
"""Execute one (fixture × model × prompt_variant) eval and return scores."""
from shared.config import settings
prompt_template = fixture.prompt_variants.get(prompt_variant, "")
# Build mock executor
seed = copy.deepcopy(fixture.seed_records)
mock = MockExecutor(
fixture_dir=fixture.fixture_dir,
seed_records=seed,
)
# Override the LLM model for this run
original_model = settings.LLM_MODEL
settings.LLM_MODEL = model
# Build trigger data (same shape as what redis_consumer delivers)
trigger_data: dict[str, Any] = {
"type": "agent_trigger",
"directory": fixture.directory,
"directory_paths": [fixture.directory],
"data_types": fixture.data_types,
"file_extensions": fixture.file_extensions,
"prompt_template": prompt_template,
"device_id": "eval-harness",
"run_context": {
"agent_id": f"eval-{fixture.name}-{prompt_variant}",
"run_id": None, # skip DB logging during eval
},
}
eval_user_id = f"eval-{uuid.uuid4().hex[:8]}"
logger.info(
"eval: starting %s | model=%s | variant=%s",
fixture.name, model, prompt_variant,
)
start_time = time.time()
try:
# Patch execute_on_client + set user context, then run the pipeline
from app.ws_context import set_current_user, clear_current_user
from app.agent_runner import run_local_agent
set_current_user(eval_user_id)
with mock.patch():
await run_local_agent(eval_user_id, trigger_data)
except Exception as exc:
logger.error("eval: pipeline failed for %s/%s/%s: %s", fixture.name, model, prompt_variant, exc)
finally:
settings.LLM_MODEL = original_model
from app.ws_context import clear_current_user
clear_current_user()
elapsed = time.time() - start_time
logger.info("eval: pipeline completed in %.1fs — %d mutations", elapsed, len(mock.mutations))
# ── Score results ────────────────────────────────────────────
all_field_scores: list[FieldScore] = []
total_expected = 0
total_actual = 0
total_matched = 0
total_extra = 0
total_missing = 0
# Group expected by table
expected_by_table: dict[str, list[dict]] = {}
for rec in fixture.expected:
expected_by_table.setdefault(rec.table, []).append(rec.fields)
# Compare against actual mutations (inserts + updates)
tables = set(expected_by_table.keys()) | {m.table for m in mock.mutations}
for table in tables:
expected_records = expected_by_table.get(table, [])
actual_records = mock.created_records(table) + mock.updated_records(table)
field_scores, extra, missing = score_field_match(expected_records, actual_records, table)
all_field_scores.extend(field_scores)
matched = sum(1 for s in field_scores if s.best_match is not None)
total_expected += len(expected_records)
total_actual += len(actual_records)
total_matched += matched
total_extra += extra
total_missing += missing
precision, recall, f1 = compute_precision_recall(total_expected, total_actual, total_matched)
scores = EvalScores(
fixture_name=fixture.name,
model=model,
prompt_variant=prompt_variant,
field_scores=all_field_scores,
precision=precision,
recall=recall,
f1=f1,
extra_records=total_extra,
missing_records=total_missing,
)
# ── Optional LLM judge ───────────────────────────────────────
if use_llm_judge and fixture.expected:
all_expected = [r.fields for r in fixture.expected]
all_actual = [m.data for m in mock.mutations if m.action in ("insert", "update")]
judge_score, reasoning = await llm_judge_score(
all_expected, all_actual, judge_model=judge_model,
)
scores.llm_judge_score = judge_score
scores.llm_judge_reasoning = reasoning
# ── Report to Langfuse ───────────────────────────────────────
dataset_name = f"batch-eval-{fixture.name}"
dataset_item_id = f"{fixture.name}--{prompt_variant}"
run_name = f"{model}--{prompt_variant}--{int(time.time())}"
trace_id = langfuse_eval.log_eval_trace(
fixture_name=fixture.name,
model=model,
prompt_variant=prompt_variant,
prompt_template=prompt_template,
actual_mutations=[{"action": m.action, "table": m.table, "data": m.data} for m in mock.mutations],
scores_summary=scores.summary(),
dataset_name=dataset_name,
run_name=run_name,
dataset_item_id=dataset_item_id,
)
if trace_id:
langfuse_eval.post_eval_scores(scores, trace_id=trace_id)
return scores
async def run_fixture_eval(
fixture: EvalFixture,
models: list[str],
*,
variants: list[str] | None = None,
use_llm_judge: bool = True,
judge_model: str = "gpt-4o-mini",
) -> list[EvalScores]:
"""Run all (model × variant) combinations for a fixture."""
if variants is None:
variants = list(fixture.prompt_variants.keys())
# Sync fixture to Langfuse dataset
langfuse_eval.sync_fixture_to_dataset(fixture)
results: list[EvalScores] = []
for model in models:
for variant in variants:
if variant not in fixture.prompt_variants:
logger.warning("eval: variant %r not found in fixture %s", variant, fixture.name)
continue
scores = await run_single_eval(
fixture, model, variant,
use_llm_judge=use_llm_judge,
judge_model=judge_model,
)
results.append(scores)
return results
def print_results(results: list[EvalScores]) -> None:
"""Print a formatted summary table of eval results."""
if not results:
print("\nNo eval results.")
return
print("\n" + "=" * 90)
print(f"{'Fixture':<25} {'Model':<25} {'Variant':<15} {'P':>6} {'R':>6} {'F1':>6} {'FA':>6} {'LLM':>6}")
print("-" * 90)
for s in results:
llm_str = f"{s.llm_judge_score:.2f}" if s.llm_judge_score is not None else " --"
print(
f"{s.fixture_name:<25} {s.model:<25} {s.prompt_variant:<15} "
f"{s.precision:>6.2f} {s.recall:>6.2f} {s.f1:>6.2f} "
f"{s.field_accuracy:>6.2f} {llm_str:>6}"
)
print("=" * 90)
# If LLM judge reasoning is available, print it
for s in results:
if s.llm_judge_reasoning:
print(f"\n[{s.model} / {s.prompt_variant}] LLM Judge: {s.llm_judge_reasoning}")
print()

View File

@@ -0,0 +1,268 @@
"""Scoring functions for batch agent evaluation.
Two scoring strategies:
1. **FieldMatchScorer** — deterministic check: for each expected record,
find the best-matching actual record and compare specified fields.
Returns precision, recall, and per-field accuracy.
2. **LLMJudgeScorer** — uses a secondary LLM to semantically evaluate
whether the actual extractions satisfy the expected intent, even if
wording differs. Returns a 0-1 score + reasoning.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
logger = logging.getLogger(__name__)
# ── Result types ─────────────────────────────────────────────────────────
@dataclass
class FieldScore:
"""Score for a single expected record against its best match."""
expected: dict[str, Any]
best_match: dict[str, Any] | None
matched_fields: dict[str, bool]
similarity: float # 0-1 overall similarity
@property
def field_accuracy(self) -> float:
if not self.matched_fields:
return 0.0
return sum(self.matched_fields.values()) / len(self.matched_fields)
@dataclass
class EvalScores:
"""Aggregated scores for one eval run."""
fixture_name: str
model: str
prompt_variant: str
field_scores: list[FieldScore] = field(default_factory=list)
precision: float = 0.0
recall: float = 0.0
f1: float = 0.0
llm_judge_score: float | None = None
llm_judge_reasoning: str = ""
extra_records: int = 0 # records created but not expected
missing_records: int = 0 # expected but not found
@property
def field_accuracy(self) -> float:
if not self.field_scores:
return 0.0
return sum(s.field_accuracy for s in self.field_scores) / len(self.field_scores)
def summary(self) -> dict[str, Any]:
return {
"fixture": self.fixture_name,
"model": self.model,
"prompt_variant": self.prompt_variant,
"precision": round(self.precision, 3),
"recall": round(self.recall, 3),
"f1": round(self.f1, 3),
"field_accuracy": round(self.field_accuracy, 3),
"llm_judge_score": round(self.llm_judge_score, 3) if self.llm_judge_score is not None else None,
"extra_records": self.extra_records,
"missing_records": self.missing_records,
}
# ── Field Match Scorer ───────────────────────────────────────────────────
def _normalize(value: Any) -> str:
"""Normalize a value for comparison."""
if value is None:
return ""
return str(value).strip().lower()
def _text_similarity(a: str, b: str) -> float:
"""Fuzzy text similarity using SequenceMatcher."""
if not a and not b:
return 1.0
if not a or not b:
return 0.0
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _find_best_match(
expected: dict[str, Any],
actuals: list[dict[str, Any]],
) -> tuple[dict[str, Any] | None, float]:
"""Find the actual record most similar to expected, return (match, similarity)."""
if not actuals:
return None, 0.0
best_match = None
best_score = 0.0
# Primary matching key: title or name
expected_title = _normalize(expected.get("title", expected.get("name", "")))
for actual in actuals:
actual_title = _normalize(actual.get("title", actual.get("name", "")))
sim = _text_similarity(expected_title, actual_title)
if sim > best_score:
best_score = sim
best_match = actual
return best_match, best_score
def _compare_fields(
expected: dict[str, Any],
actual: dict[str, Any],
) -> dict[str, bool]:
"""Compare each expected field against the actual record."""
results: dict[str, bool] = {}
for key, expected_val in expected.items():
actual_val = actual.get(key)
# Exact match for non-string types
if not isinstance(expected_val, str):
results[key] = actual_val == expected_val
else:
# Fuzzy match for strings (threshold: 0.7)
results[key] = _text_similarity(
_normalize(expected_val), _normalize(actual_val)
) >= 0.7
return results
def score_field_match(
expected_records: list[dict[str, Any]],
actual_records: list[dict[str, Any]],
table: str,
) -> tuple[list[FieldScore], int, int]:
"""Score actual extractions against expected records for one table.
Returns (field_scores, extra_count, missing_count).
"""
field_scores: list[FieldScore] = []
matched_actuals: set[int] = set()
for exp in expected_records:
# Find best match among unmatched actuals
candidates = [
(i, a) for i, a in enumerate(actual_records) if i not in matched_actuals
]
if not candidates:
field_scores.append(FieldScore(
expected=exp, best_match=None, matched_fields={}, similarity=0.0,
))
continue
best_idx, best_match = None, None
best_sim = 0.0
for idx, actual in candidates:
_, sim = _find_best_match(exp, [actual])
if sim > best_sim:
best_sim = sim
best_idx = idx
best_match = actual
if best_sim >= 0.5 and best_match is not None:
matched_actuals.add(best_idx)
matched_fields = _compare_fields(exp, best_match)
field_scores.append(FieldScore(
expected=exp, best_match=best_match,
matched_fields=matched_fields, similarity=best_sim,
))
else:
field_scores.append(FieldScore(
expected=exp, best_match=None, matched_fields={}, similarity=0.0,
))
extra_count = len(actual_records) - len(matched_actuals)
missing_count = sum(1 for s in field_scores if s.best_match is None)
return field_scores, extra_count, missing_count
def compute_precision_recall(
expected_count: int,
actual_count: int,
matched_count: int,
) -> tuple[float, float, float]:
"""Compute precision, recall, F1."""
precision = matched_count / actual_count if actual_count > 0 else 0.0
recall = matched_count / expected_count if expected_count > 0 else 0.0
f1 = (
2 * precision * recall / (precision + recall)
if (precision + recall) > 0
else 0.0
)
return precision, recall, f1
# ── LLM Judge Scorer ─────────────────────────────────────────────────────
_JUDGE_SYSTEM_PROMPT = """\
You are an evaluation judge for a data extraction system.
Your task is to compare the EXPECTED extractions against the ACTUAL extractions
produced by an AI agent, and assess quality on a 0-1 scale.
Scoring criteria:
- 1.0: All expected records found with correct fields, no significant extras
- 0.8: Most expected records found, minor field differences or extras
- 0.6: Core extractions present but some missing or incorrect
- 0.4: Partial match — several expected records missing or wrong
- 0.2: Poor quality — most expected records missing or incorrect
- 0.0: Complete failure — no meaningful overlap
Consider semantic equivalence: "Send invoice" and "Email the invoice" are matches.
Ignore field ordering and formatting differences.
Respond with ONLY a JSON object:
{"score": 0.85, "reasoning": "Brief explanation of the score"}
"""
async def llm_judge_score(
expected: list[dict[str, Any]],
actual: list[dict[str, Any]],
*,
judge_model: str = "gpt-4o-mini",
) -> tuple[float, str]:
"""Use an LLM to semantically evaluate extraction quality.
Returns (score, reasoning).
"""
from app.llm import get_llm
llm = get_llm(model=judge_model, temperature=0)
user_content = (
f"## Expected extractions\n```json\n{json.dumps(expected, indent=2, default=str)}\n```\n\n"
f"## Actual extractions\n```json\n{json.dumps(actual, indent=2, default=str)}\n```"
)
try:
response = await llm.ainvoke([
SystemMessage(content=_JUDGE_SYSTEM_PROMPT),
HumanMessage(content=user_content),
])
raw = response.content.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
parsed = json.loads(raw.strip())
return float(parsed.get("score", 0.0)), str(parsed.get("reasoning", ""))
except Exception as exc:
logger.warning("eval: LLM judge failed: %s", exc)
return 0.0, f"Judge error: {exc}"