diff --git a/services/batch-agent/eval/__init__.py b/services/batch-agent/eval/__init__.py new file mode 100644 index 0000000..5cc20de --- /dev/null +++ b/services/batch-agent/eval/__init__.py @@ -0,0 +1 @@ +"""Batch Agent E2E evaluation harness.""" diff --git a/services/batch-agent/eval/__main__.py b/services/batch-agent/eval/__main__.py new file mode 100644 index 0000000..f5fc59f --- /dev/null +++ b/services/batch-agent/eval/__main__.py @@ -0,0 +1,5 @@ +"""Allow running the eval package as ``python -m eval``.""" + +from eval.cli import main + +main() diff --git a/services/batch-agent/eval/cli.py b/services/batch-agent/eval/cli.py new file mode 100644 index 0000000..ca339b7 --- /dev/null +++ b/services/batch-agent/eval/cli.py @@ -0,0 +1,182 @@ +"""CLI entry point for the batch agent evaluation harness. + +Usage:: + + # From services/batch-agent/: + python -m eval run # all fixtures, default model + python -m eval run --fixture=freelance-invoices # single fixture + python -m eval run --models=gpt-4o,anthropic/claude-sonnet-4 + python -m eval run --variants=baseline,detailed # specific prompt variants + python -m eval run --no-judge # skip LLM judge scoring + + python -m eval list # list available fixtures + python -m eval sync # sync fixtures to Langfuse datasets +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +# Ensure the service root and repo root are in sys.path +_SERVICE_ROOT = Path(__file__).resolve().parent.parent +_REPO_ROOT = _SERVICE_ROOT.parent.parent +for p in (_SERVICE_ROOT, _REPO_ROOT): + if str(p) not in sys.path: + sys.path.insert(0, str(p)) + +from eval.config import discover_fixtures +from eval.runner import run_fixture_eval, print_results +from eval import langfuse_eval + + +def _setup_logging(verbose: bool) -> None: + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s %(name)-20s %(levelname)-5s %(message)s", + datefmt="%H:%M:%S", + ) + # Quiet noisy libraries + for name in ("httpx", "httpcore", "openai", "litellm", "urllib3"): + logging.getLogger(name).setLevel(logging.WARNING) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Batch Agent E2E evaluation harness", + prog="python -m eval", + ) + sub = parser.add_subparsers(dest="command", required=True) + + # ── run ─────────────────────────────────────────────────────── + run_cmd = sub.add_parser("run", help="Run evaluations") + run_cmd.add_argument( + "--fixture", "-f", + help="Run only the named fixture (default: all)", + ) + run_cmd.add_argument( + "--models", "-m", + default="gpt-4o", + help="Comma-separated list of models to test (default: gpt-4o)", + ) + run_cmd.add_argument( + "--variants", "-p", + default=None, + help="Comma-separated prompt variants to test (default: all in fixture)", + ) + run_cmd.add_argument( + "--no-judge", + action="store_true", + help="Skip LLM-as-judge scoring", + ) + run_cmd.add_argument( + "--judge-model", + default="gpt-4o-mini", + help="Model for LLM judge (default: gpt-4o-mini)", + ) + run_cmd.add_argument( + "--fixtures-dir", + default=None, + help="Path to fixtures directory (default: eval/fixtures/)", + ) + run_cmd.add_argument("-v", "--verbose", action="store_true") + + # ── list ────────────────────────────────────────────────────── + list_cmd = sub.add_parser("list", help="List available fixtures") + list_cmd.add_argument("--fixtures-dir", default=None) + list_cmd.add_argument("-v", "--verbose", action="store_true") + + # ── sync ────────────────────────────────────────────────────── + sync_cmd = sub.add_parser("sync", help="Sync fixtures to Langfuse datasets") + sync_cmd.add_argument("--fixture", "-f", default=None, help="Sync only the named fixture") + sync_cmd.add_argument("--fixtures-dir", default=None) + sync_cmd.add_argument("-v", "--verbose", action="store_true") + + return parser.parse_args() + + +def _fixtures_dir(arg: str | None) -> Path | None: + if arg: + return Path(arg) + return None + + +async def _cmd_run(args: argparse.Namespace) -> None: + fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir)) + if not fixtures: + print("No fixtures found. Create YAML files in eval/fixtures/.") + return + + if args.fixture: + fixtures = [f for f in fixtures if f.name == args.fixture] + if not fixtures: + print(f"Fixture '{args.fixture}' not found.") + return + + models = [m.strip() for m in args.models.split(",")] + variants = [v.strip() for v in args.variants.split(",")] if args.variants else None + + all_results = [] + for fixture in fixtures: + results = await run_fixture_eval( + fixture, + models=models, + variants=variants, + use_llm_judge=not args.no_judge, + judge_model=args.judge_model, + ) + all_results.extend(results) + + print_results(all_results) + + +def _cmd_list(args: argparse.Namespace) -> None: + fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir)) + if not fixtures: + print("No fixtures found.") + return + + print(f"\n{'Name':<30} {'Types':<25} {'Variants':<20} {'Expected'}") + print("-" * 90) + for f in fixtures: + variants = ", ".join(f.prompt_variants.keys()) + types = ", ".join(f.data_types) + print(f"{f.name:<30} {types:<25} {variants:<20} {len(f.expected)}") + print() + + +def _cmd_sync(args: argparse.Namespace) -> None: + fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir)) + if args.fixture: + fixtures = [f for f in fixtures if f.name == args.fixture] + + if not fixtures: + print("No fixtures to sync.") + return + + for fixture in fixtures: + name = langfuse_eval.sync_fixture_to_dataset(fixture) + if name: + print(f"Synced: {fixture.name} → {name}") + else: + print(f"Skipped: {fixture.name} (Langfuse not configured)") + + +def main() -> None: + args = _parse_args() + _setup_logging(args.verbose) + + if args.command == "run": + asyncio.run(_cmd_run(args)) + elif args.command == "list": + _cmd_list(args) + elif args.command == "sync": + _cmd_sync(args) + + +if __name__ == "__main__": + main() diff --git a/services/batch-agent/eval/config.py b/services/batch-agent/eval/config.py new file mode 100644 index 0000000..052be33 --- /dev/null +++ b/services/batch-agent/eval/config.py @@ -0,0 +1,129 @@ +"""Eval configuration — YAML fixture loader and dataclasses. + +A *fixture* is a YAML file that defines a complete test scenario: + +.. code-block:: yaml + + name: freelance-invoices + description: Extract tasks and notes from invoice PDFs (text layer) + directory: sample_files/invoices # relative to fixture dir + data_types: [tasks, notes] + file_extensions: [txt, md] + + # Preseeded records the agent "sees" as existing data + seed_records: + projects: + - id: proj-1 + name: "Website Redesign" + status: active + tasks: [] + + # Prompt variations to test (at least one required) + prompt_variants: + baseline: | + Extract action items as tasks and meeting summaries as notes. + Set priority based on urgency keywords. + detailed: | + Extract action items as tasks. Map "URGENT" to high priority, + "ASAP" to medium. Summaries become notes with full content. + + # Expected extractions — what the agent SHOULD produce + expected: + tasks: + - title: "Send revised invoice to client" + priority: high + status: todo + - title: "Update project timeline" + priority: medium + notes: + - title: "Meeting summary - March kickoff" + + # Optional: models to test (overrides CLI --models) + models: [] +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + + +@dataclass +class ExpectedRecord: + """A single expected extraction result. + + Only the fields specified are checked — unspecified fields are ignored. + """ + + table: str # tasks | notes | timelines | projects + fields: dict[str, Any] # field_name → expected_value + + +@dataclass +class EvalFixture: + """A complete test scenario loaded from YAML.""" + + name: str + description: str + directory: str # relative path to sample files + data_types: list[str] + file_extensions: list[str] + seed_records: dict[str, list[dict]] + prompt_variants: dict[str, str] # variant_name → prompt_template + expected: list[ExpectedRecord] + models: list[str] # if empty, use CLI default + fixture_path: Path = field(default_factory=lambda: Path(".")) + + @property + def fixture_dir(self) -> Path: + """Absolute path to the sample files directory.""" + return self.fixture_path.parent / self.directory + + @classmethod + def from_yaml(cls, path: Path) -> "EvalFixture": + """Load a fixture from a YAML file.""" + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + + expected: list[ExpectedRecord] = [] + for table, records in (raw.get("expected") or {}).items(): + for rec in records: + expected.append(ExpectedRecord(table=table, fields=rec)) + + return cls( + name=raw["name"], + description=raw.get("description", ""), + directory=raw.get("directory", "sample_files"), + data_types=raw.get("data_types", ["tasks"]), + file_extensions=raw.get("file_extensions", []), + seed_records=raw.get("seed_records", {}), + prompt_variants=raw.get("prompt_variants", {"default": ""}), + expected=expected, + models=raw.get("models", []), + fixture_path=path, + ) + + +def discover_fixtures(fixtures_dir: Path | None = None) -> list[EvalFixture]: + """Find and load all YAML fixtures in the fixtures directory.""" + if fixtures_dir is None: + fixtures_dir = Path(__file__).parent / "fixtures" + + fixtures: list[EvalFixture] = [] + if not fixtures_dir.is_dir(): + logger.warning("eval: fixtures directory not found: %s", fixtures_dir) + return fixtures + + for yaml_path in sorted(fixtures_dir.glob("*.yaml")): + try: + fixtures.append(EvalFixture.from_yaml(yaml_path)) + logger.info("eval: loaded fixture %s from %s", fixtures[-1].name, yaml_path.name) + except Exception as exc: + logger.error("eval: failed to load fixture %s: %s", yaml_path.name, exc) + + return fixtures diff --git a/services/batch-agent/eval/fixtures/freelance_invoices.yaml b/services/batch-agent/eval/fixtures/freelance_invoices.yaml new file mode 100644 index 0000000..8194519 --- /dev/null +++ b/services/batch-agent/eval/fixtures/freelance_invoices.yaml @@ -0,0 +1,86 @@ +# Fixture: freelance-invoices +# Tests extraction of tasks, notes, and timelines from +# invoices and meeting notes typical of a freelance workflow. + +name: freelance-invoices +description: > + Extract tasks, notes, and timeline events from Italian freelance + invoices and meeting notes. Tests project matching, priority + mapping, and bilingual content handling. + +directory: sample_files/invoices +data_types: [tasks, notes, timelines] +file_extensions: [txt, md] + +# Pre-existing records in the "database" +seed_records: + projects: + - id: "proj-web-redesign" + name: "Redesign Sito Web Corporate" + status: "active" + aiSummary: "Corporate website redesign for Studio Architettura Bianchi" + - id: "proj-ecommerce" + name: "E-Commerce FashionStore" + status: "active" + aiSummary: "Next.js e-commerce platform for FashionStore srl" + tasks: [] + notes: [] + timelines: [] + +# Prompt variations to compare +prompt_variants: + baseline: | + Extract action items as tasks and summaries as notes. + For timelines, extract any mentioned dates and deadlines. + Set isAiSuggested=1 on every record. + + detailed_italian: | + Estrai i dati dai file come segue: + - TASK: ogni azione da fare, deliverable, o item con scadenza. + Mappa "URGENTE" o "ALTA PRIORITÀ" → priority: high. + Mappa "media priorità" → priority: medium. + Mappa "bassa priorità" → priority: low. + Se un item è marcato come "completato" o [x], impostalo status: done. + Altrimenti status: todo. + - NOTE: riassunti di meeting, decisioni prese, note tecniche. + Il titolo deve essere descrittivo. Il content deve includere tutti i dettagli. + - TIMELINE: date di scadenza, milestone, meeting futuri. + Formato data: timestamp Unix in millisecondi. + Imposta sempre isAiSuggested=1. + + minimal: | + Extract only high-priority action items as tasks. + Ignore notes and timelines unless explicitly marked as important. + Set isAiSuggested=1. + +# Expected extractions (what the agent SHOULD produce) +# Only key fields are specified — scorer uses fuzzy matching +expected: + tasks: + - title: "Sviluppo frontend React" + priority: "high" + status: "todo" + - title: "Integrazione API backend" + priority: "medium" + status: "todo" + - title: "Testing cross-browser e fix bug responsive" + status: "todo" + - title: "Preparare wireframe homepage" + priority: "high" + status: "todo" + - title: "Setup progetto Next.js e configurare CI/CD" + priority: "medium" + status: "todo" + - title: "Ricerca plugin Stripe per gestione abbonamenti" + priority: "low" + status: "todo" + + notes: + - title: "Meeting Kickoff Progetto E-Commerce" + + timelines: + - title: "MVP E-Commerce pronto" + - title: "Meeting di revisione" + +# Models to test (can be overridden via CLI --models) +models: [] diff --git a/services/batch-agent/eval/fixtures/sample_files/invoices/fattura_042.txt b/services/batch-agent/eval/fixtures/sample_files/invoices/fattura_042.txt new file mode 100644 index 0000000..ad2e34f --- /dev/null +++ b/services/batch-agent/eval/fixtures/sample_files/invoices/fattura_042.txt @@ -0,0 +1,18 @@ +FATTURA N. 2026-0042 +Data: 15 Marzo 2026 +Cliente: Studio Architettura Bianchi + +Progetto: Redesign Sito Web Corporate + +Descrizione lavori: +- Sviluppo frontend React (40 ore) — URGENTE, completare entro 20 marzo +- Integrazione API backend (20 ore) — priorità media +- Design UI/UX mockup homepage (8 ore) — completato +- Testing cross-browser e fix bug responsive (12 ore) — da iniziare + +Totale: €4.800,00 + IVA + +Note: +Meeting di revisione previsto per il 18 marzo alle 10:00. +Il cliente ha richiesto modifiche al layout mobile della sezione contatti. +Attendere conferma budget aggiuntivo per sezione blog. diff --git a/services/batch-agent/eval/fixtures/sample_files/invoices/meeting_ecommerce.md b/services/batch-agent/eval/fixtures/sample_files/invoices/meeting_ecommerce.md new file mode 100644 index 0000000..b909b4d --- /dev/null +++ b/services/batch-agent/eval/fixtures/sample_files/invoices/meeting_ecommerce.md @@ -0,0 +1,25 @@ +# Meeting Notes - Kickoff Progetto E-Commerce + +**Data:** 10 Marzo 2026 +**Partecipanti:** Marco R., Giulia T., Cliente (FashionStore srl) + +## Decisioni prese + +1. **Piattaforma**: Next.js + Stripe per i pagamenti +2. **Timeline**: MVP pronto entro 30 aprile 2026 +3. **Budget**: €12.000 totale, €4.000 anticipo già ricevuto + +## Action items + +- [ ] Marco: preparare wireframe homepage entro 14 marzo — ALTA PRIORITÀ +- [ ] Giulia: setup progetto Next.js e configurare CI/CD — media priorità +- [ ] Marco: ricerca plugin Stripe per gestione abbonamenti — bassa priorità +- [x] Giulia: inviare contratto firmato al cliente — COMPLETATO + +## Note aggiuntive + +Il cliente vuole un design minimalista, ispirato a Zara.com. +Colori primari: nero, bianco, oro. +Font: Inter per body, Playfair Display per headings. + +Prossimo meeting: 24 marzo 2026 ore 15:00. diff --git a/services/batch-agent/eval/langfuse_eval.py b/services/batch-agent/eval/langfuse_eval.py new file mode 100644 index 0000000..b666c3b --- /dev/null +++ b/services/batch-agent/eval/langfuse_eval.py @@ -0,0 +1,222 @@ +"""Langfuse evaluation integration — datasets, runs, and scoring. + +Uses the Langfuse Python SDK to: + +1. **Sync fixtures → Langfuse datasets**: Each YAML fixture becomes a dataset, + each prompt variant + expected pair becomes a dataset item. + +2. **Track eval runs**: Each (fixture × model × prompt_variant) execution + is recorded as a dataset run with linked traces and scores. + +3. **Post scores**: precision, recall, F1, field_accuracy, llm_judge are + posted as numeric scores on the trace/run. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from shared.config import settings +from eval.config import EvalFixture +from eval.scorer import EvalScores + +logger = logging.getLogger(__name__) + + +def _get_langfuse(): + """Get or create a Langfuse client instance.""" + if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY: + return None + try: + from langfuse import Langfuse + return Langfuse( + secret_key=settings.LANGFUSE_SECRET_KEY, + public_key=settings.LANGFUSE_PUBLIC_KEY, + host=settings.LANGFUSE_HOST, + ) + except Exception as exc: + logger.warning("langfuse_eval: failed to create client: %s", exc) + return None + + +def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None: + """Create or update a Langfuse dataset from a fixture. + + Each prompt variant becomes a separate dataset item with: + - input: {directory, data_types, prompt_template, seed_records} + - expected_output: {expected records} + + Returns the dataset name, or None if Langfuse is unavailable. + """ + lf = _get_langfuse() + if lf is None: + logger.info("langfuse_eval: Langfuse not configured — skipping dataset sync") + return None + + dataset_name = f"batch-eval-{fixture.name}" + + try: + lf.create_dataset( + name=dataset_name, + description=fixture.description, + metadata={"data_types": fixture.data_types, "file_extensions": fixture.file_extensions}, + ) + except Exception: + # Dataset may already exist — that's fine + pass + + expected_output = {} + for rec in fixture.expected: + expected_output.setdefault(rec.table, []).append(rec.fields) + + for variant_name, prompt_template in fixture.prompt_variants.items(): + item_id = f"{fixture.name}--{variant_name}" + try: + lf.create_dataset_item( + dataset_name=dataset_name, + id=item_id, + input={ + "directory": fixture.directory, + "data_types": fixture.data_types, + "prompt_template": prompt_template, + "seed_records": fixture.seed_records, + }, + expected_output=expected_output, + metadata={"prompt_variant": variant_name}, + ) + except Exception as exc: + logger.warning( + "langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc + ) + + lf.flush() + logger.info("langfuse_eval: synced fixture '%s' → dataset '%s'", fixture.name, dataset_name) + return dataset_name + + +def create_eval_run( + dataset_name: str, + run_name: str, + *, + metadata: dict[str, Any] | None = None, +) -> str: + """Create a dataset run in Langfuse. Returns the run name.""" + lf = _get_langfuse() + if lf is None: + return run_name + + try: + lf.create_dataset_run( + dataset_name=dataset_name, + run_name=run_name, + metadata=metadata or {}, + ) + lf.flush() + except Exception as exc: + logger.warning("langfuse_eval: failed to create run %s: %s", run_name, exc) + + return run_name + + +def post_eval_scores( + scores: EvalScores, + *, + trace_id: str | None = None, + dataset_name: str | None = None, + run_name: str | None = None, +) -> None: + """Post evaluation scores to Langfuse. + + If trace_id is provided, scores are attached to that trace. + """ + lf = _get_langfuse() + if lf is None: + return + + score_data = [ + ("precision", scores.precision), + ("recall", scores.recall), + ("f1", scores.f1), + ("field_accuracy", scores.field_accuracy), + ] + if scores.llm_judge_score is not None: + score_data.append(("llm_judge", scores.llm_judge_score)) + + for name, value in score_data: + try: + kwargs: dict[str, Any] = { + "name": name, + "value": value, + "comment": f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}", + } + if trace_id: + kwargs["trace_id"] = trace_id + lf.score(**kwargs) + except Exception as exc: + logger.warning("langfuse_eval: failed to post score %s: %s", name, exc) + + lf.flush() + logger.info( + "langfuse_eval: posted %d scores for %s/%s/%s", + len(score_data), scores.fixture_name, scores.model, scores.prompt_variant, + ) + + +def log_eval_trace( + *, + fixture_name: str, + model: str, + prompt_variant: str, + prompt_template: str, + actual_mutations: list[dict], + scores_summary: dict[str, Any], + dataset_name: str | None = None, + run_name: str | None = None, + dataset_item_id: str | None = None, +) -> str | None: + """Create a Langfuse trace for one eval execution and link it to a dataset run. + + Returns the trace_id, or None if Langfuse is unavailable. + """ + lf = _get_langfuse() + if lf is None: + return None + + try: + trace = lf.trace( + name=f"eval-{fixture_name}", + input={ + "prompt_template": prompt_template, + "model": model, + "prompt_variant": prompt_variant, + }, + output={ + "mutations": actual_mutations[:50], + "scores": scores_summary, + }, + metadata={ + "eval": True, + "fixture": fixture_name, + "model": model, + "prompt_variant": prompt_variant, + }, + tags=["eval", f"model:{model}", f"variant:{prompt_variant}"], + ) + + # Link to dataset run if available + if dataset_name and run_name and dataset_item_id: + try: + dataset = lf.get_dataset(dataset_name) + item = dataset.get_item(dataset_item_id) + if item: + item.link(trace, run_name) + except Exception as exc: + logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc) + + lf.flush() + return trace.id + except Exception as exc: + logger.warning("langfuse_eval: failed to create eval trace: %s", exc) + return None diff --git a/services/batch-agent/eval/mock_executor.py b/services/batch-agent/eval/mock_executor.py new file mode 100644 index 0000000..93d83cb --- /dev/null +++ b/services/batch-agent/eval/mock_executor.py @@ -0,0 +1,208 @@ +"""Mock executor — intercepts execute_on_client for offline E2E testing. + +Patches ``app.ws_context.execute_on_client`` so agent pipeline runs don't +require a live Electron client or Redis. Instead: + +- **Filesystem actions** (list_directory, read_file_content, get_file_metadata) + are served from local fixture files on disk. +- **Read actions** (select, get) return preseeded records from an in-memory + store provided by the test fixture. +- **Write actions** (insert, update, delete) are captured as *mutations* and + stored for later comparison against expected results. +""" + +from __future__ import annotations + +import json +import os +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, patch + + +@dataclass +class Mutation: + """A single recorded write operation.""" + + action: str # insert | update | delete + table: str + data: dict[str, Any] + timestamp: float = field(default_factory=time.time) + + +@dataclass +class MockExecutor: + """In-memory executor that replaces Redis-based tool round-trip. + + Parameters + ---------- + fixture_dir : Path + Directory containing sample files for filesystem tool calls. + seed_records : dict[str, list[dict]] + Pre-existing records per table, e.g. ``{"tasks": [...], "projects": [...]}``. + The executor returns these for ``select`` / ``get`` actions and auto-updates + them on ``insert`` / ``update`` / ``delete`` so subsequent selects reflect changes. + """ + + fixture_dir: Path + seed_records: dict[str, list[dict]] = field(default_factory=dict) + mutations: list[Mutation] = field(default_factory=list) + _id_counter: int = field(default=1000, repr=False) + + # ── Public API ─────────────────────────────────────────────────── + + def reset(self) -> None: + """Clear recorded mutations (keep seed_records intact).""" + self.mutations.clear() + + def get_mutations(self, *, table: str | None = None, action: str | None = None) -> list[Mutation]: + """Filter mutations by table and/or action.""" + result = self.mutations + if table: + result = [m for m in result if m.table == table] + if action: + result = [m for m in result if m.action == action] + return result + + def created_records(self, table: str) -> list[dict]: + """Return data dicts of all inserts into *table*.""" + return [m.data for m in self.mutations if m.table == table and m.action == "insert"] + + def updated_records(self, table: str) -> list[dict]: + """Return data dicts of all updates to *table*.""" + return [m.data for m in self.mutations if m.table == table and m.action == "update"] + + # ── Context manager for patching ────────────────────────────── + + def patch(self): + """Return an async context-manager that patches execute_on_client.""" + return patch( + "app.ws_context.execute_on_client", + new=AsyncMock(side_effect=self._handle), + ) + + # ── Internal dispatch ───────────────────────────────────────── + + async def _handle( + self, + action: str, + table: str | None = None, + data: dict[str, Any] | None = None, + filters: dict[str, Any] | None = None, + vector: list[float] | None = None, + limit: int | None = None, + ) -> dict[str, Any]: + # Filesystem + if action == "list_directory": + return self._list_directory(data or {}) + if action == "read_file_content": + return self._read_file(data or {}) + if action == "get_file_metadata": + return self._get_file_metadata(data or {}) + + # CRUD + if action == "select": + return self._select(table or "", filters) + if action == "get": + return self._get(table or "", data or {}) + if action == "insert": + return self._insert(table or "", data or {}) + if action == "update": + return self._update(table or "", data or {}) + if action == "delete": + return self._delete(table or "", data or {}) + + # Vector (no-op for eval) + if action in ("vector_upsert", "vector_search"): + return {"rows": []} + + return {"error": f"Unknown action: {action}"} + + # ── Filesystem handlers ─────────────────────────────────────── + + def _list_directory(self, data: dict) -> dict: + rel_path = data.get("path", "") + abs_path = self.fixture_dir / rel_path.lstrip("/\\") + if not abs_path.is_dir(): + return {"entries": []} + entries: list[dict] = [] + for child in sorted(abs_path.iterdir()): + entry_type = "directory" if child.is_dir() else "file" + # Return paths relative to fixture_dir but with the original prefix + entry_path = rel_path.rstrip("/\\") + "/" + child.name + entries.append({ + "name": child.name, + "path": entry_path, + "type": entry_type, + }) + return {"entries": entries} + + def _read_file(self, data: dict) -> dict: + rel_path = data.get("path", "") + abs_path = self.fixture_dir / rel_path.lstrip("/\\") + if not abs_path.is_file(): + return {"content": "", "error": f"File not found: {rel_path}"} + return {"content": abs_path.read_text(encoding="utf-8", errors="replace")} + + def _get_file_metadata(self, data: dict) -> dict: + rel_path = data.get("path", "") + abs_path = self.fixture_dir / rel_path.lstrip("/\\") + if not abs_path.exists(): + return {"error": f"Not found: {rel_path}"} + stat = abs_path.stat() + return { + "path": rel_path, + "size": stat.st_size, + "modifiedAt": int(stat.st_mtime * 1000), + "createdAt": int(stat.st_ctime * 1000), + "isDirectory": abs_path.is_dir(), + } + + # ── CRUD handlers ───────────────────────────────────────────── + + def _select(self, table: str, filters: dict | None) -> dict: + rows = list(self.seed_records.get(table, [])) + if filters: + rows = [ + r for r in rows + if all(r.get(k) == v for k, v in filters.items() if v is not None) + ] + return {"rows": rows} + + def _get(self, table: str, data: dict) -> dict: + record_id = data.get("id", "") + rows = self.seed_records.get(table, []) + for r in rows: + if r.get("id") == record_id: + return {"row": r} + return {"row": None} + + def _insert(self, table: str, data: dict) -> dict: + self._id_counter += 1 + record = {**data, "id": str(self._id_counter)} + # Add to seed so subsequent selects can find it + self.seed_records.setdefault(table, []).append(record) + self.mutations.append(Mutation(action="insert", table=table, data=record)) + return {"row": record} + + def _update(self, table: str, data: dict) -> dict: + record_id = data.get("id", "") + rows = self.seed_records.get(table, []) + for r in rows: + if r.get("id") == record_id: + r.update({k: v for k, v in data.items() if v is not None and v != ""}) + self.mutations.append(Mutation(action="update", table=table, data=dict(r))) + return {"row": r} + # Record not found — still log the mutation + self.mutations.append(Mutation(action="update", table=table, data=data)) + return {"row": data} + + def _delete(self, table: str, data: dict) -> dict: + record_id = data.get("id", "") + rows = self.seed_records.get(table, []) + self.seed_records[table] = [r for r in rows if r.get("id") != record_id] + self.mutations.append(Mutation(action="delete", table=table, data={"id": record_id})) + return {"deleted": True} diff --git a/services/batch-agent/eval/requirements.txt b/services/batch-agent/eval/requirements.txt new file mode 100644 index 0000000..da7e2f6 --- /dev/null +++ b/services/batch-agent/eval/requirements.txt @@ -0,0 +1,2 @@ +# Extra dependencies for the eval harness (on top of the service requirements.txt) +pyyaml>=6.0.0 diff --git a/services/batch-agent/eval/runner.py b/services/batch-agent/eval/runner.py new file mode 100644 index 0000000..920d35f --- /dev/null +++ b/services/batch-agent/eval/runner.py @@ -0,0 +1,236 @@ +"""Eval runner — orchestrates fixture → mock → agent pipeline → scoring. + +For each (fixture × model × prompt_variant) combination: +1. Build a MockExecutor with fixture data +2. Patch execute_on_client +3. Override LLM_MODEL in shared settings +4. Run the batch agent pipeline (run_local_agent) +5. Collect mutations from the mock +6. Score against expected results (field match + optional LLM judge) +7. Report scores to Langfuse +8. Print results +""" + +from __future__ import annotations + +import asyncio +import copy +import json +import logging +import time +import uuid +from pathlib import Path +from typing import Any + +from eval.config import EvalFixture, ExpectedRecord +from eval.mock_executor import MockExecutor +from eval.scorer import ( + EvalScores, + FieldScore, + compute_precision_recall, + llm_judge_score, + score_field_match, +) +from eval import langfuse_eval + +logger = logging.getLogger(__name__) + + +async def run_single_eval( + fixture: EvalFixture, + model: str, + prompt_variant: str, + *, + use_llm_judge: bool = True, + judge_model: str = "gpt-4o-mini", +) -> EvalScores: + """Execute one (fixture × model × prompt_variant) eval and return scores.""" + from shared.config import settings + + prompt_template = fixture.prompt_variants.get(prompt_variant, "") + + # Build mock executor + seed = copy.deepcopy(fixture.seed_records) + mock = MockExecutor( + fixture_dir=fixture.fixture_dir, + seed_records=seed, + ) + + # Override the LLM model for this run + original_model = settings.LLM_MODEL + settings.LLM_MODEL = model + + # Build trigger data (same shape as what redis_consumer delivers) + trigger_data: dict[str, Any] = { + "type": "agent_trigger", + "directory": fixture.directory, + "directory_paths": [fixture.directory], + "data_types": fixture.data_types, + "file_extensions": fixture.file_extensions, + "prompt_template": prompt_template, + "device_id": "eval-harness", + "run_context": { + "agent_id": f"eval-{fixture.name}-{prompt_variant}", + "run_id": None, # skip DB logging during eval + }, + } + + eval_user_id = f"eval-{uuid.uuid4().hex[:8]}" + + logger.info( + "eval: starting %s | model=%s | variant=%s", + fixture.name, model, prompt_variant, + ) + start_time = time.time() + + try: + # Patch execute_on_client + set user context, then run the pipeline + from app.ws_context import set_current_user, clear_current_user + from app.agent_runner import run_local_agent + + set_current_user(eval_user_id) + with mock.patch(): + await run_local_agent(eval_user_id, trigger_data) + except Exception as exc: + logger.error("eval: pipeline failed for %s/%s/%s: %s", fixture.name, model, prompt_variant, exc) + finally: + settings.LLM_MODEL = original_model + from app.ws_context import clear_current_user + clear_current_user() + + elapsed = time.time() - start_time + logger.info("eval: pipeline completed in %.1fs — %d mutations", elapsed, len(mock.mutations)) + + # ── Score results ──────────────────────────────────────────── + all_field_scores: list[FieldScore] = [] + total_expected = 0 + total_actual = 0 + total_matched = 0 + total_extra = 0 + total_missing = 0 + + # Group expected by table + expected_by_table: dict[str, list[dict]] = {} + for rec in fixture.expected: + expected_by_table.setdefault(rec.table, []).append(rec.fields) + + # Compare against actual mutations (inserts + updates) + tables = set(expected_by_table.keys()) | {m.table for m in mock.mutations} + for table in tables: + expected_records = expected_by_table.get(table, []) + actual_records = mock.created_records(table) + mock.updated_records(table) + + field_scores, extra, missing = score_field_match(expected_records, actual_records, table) + all_field_scores.extend(field_scores) + + matched = sum(1 for s in field_scores if s.best_match is not None) + total_expected += len(expected_records) + total_actual += len(actual_records) + total_matched += matched + total_extra += extra + total_missing += missing + + precision, recall, f1 = compute_precision_recall(total_expected, total_actual, total_matched) + + scores = EvalScores( + fixture_name=fixture.name, + model=model, + prompt_variant=prompt_variant, + field_scores=all_field_scores, + precision=precision, + recall=recall, + f1=f1, + extra_records=total_extra, + missing_records=total_missing, + ) + + # ── Optional LLM judge ─────────────────────────────────────── + if use_llm_judge and fixture.expected: + all_expected = [r.fields for r in fixture.expected] + all_actual = [m.data for m in mock.mutations if m.action in ("insert", "update")] + judge_score, reasoning = await llm_judge_score( + all_expected, all_actual, judge_model=judge_model, + ) + scores.llm_judge_score = judge_score + scores.llm_judge_reasoning = reasoning + + # ── Report to Langfuse ─────────────────────────────────────── + dataset_name = f"batch-eval-{fixture.name}" + dataset_item_id = f"{fixture.name}--{prompt_variant}" + run_name = f"{model}--{prompt_variant}--{int(time.time())}" + + trace_id = langfuse_eval.log_eval_trace( + fixture_name=fixture.name, + model=model, + prompt_variant=prompt_variant, + prompt_template=prompt_template, + actual_mutations=[{"action": m.action, "table": m.table, "data": m.data} for m in mock.mutations], + scores_summary=scores.summary(), + dataset_name=dataset_name, + run_name=run_name, + dataset_item_id=dataset_item_id, + ) + + if trace_id: + langfuse_eval.post_eval_scores(scores, trace_id=trace_id) + + return scores + + +async def run_fixture_eval( + fixture: EvalFixture, + models: list[str], + *, + variants: list[str] | None = None, + use_llm_judge: bool = True, + judge_model: str = "gpt-4o-mini", +) -> list[EvalScores]: + """Run all (model × variant) combinations for a fixture.""" + if variants is None: + variants = list(fixture.prompt_variants.keys()) + + # Sync fixture to Langfuse dataset + langfuse_eval.sync_fixture_to_dataset(fixture) + + results: list[EvalScores] = [] + for model in models: + for variant in variants: + if variant not in fixture.prompt_variants: + logger.warning("eval: variant %r not found in fixture %s", variant, fixture.name) + continue + scores = await run_single_eval( + fixture, model, variant, + use_llm_judge=use_llm_judge, + judge_model=judge_model, + ) + results.append(scores) + + return results + + +def print_results(results: list[EvalScores]) -> None: + """Print a formatted summary table of eval results.""" + if not results: + print("\nNo eval results.") + return + + print("\n" + "=" * 90) + print(f"{'Fixture':<25} {'Model':<25} {'Variant':<15} {'P':>6} {'R':>6} {'F1':>6} {'FA':>6} {'LLM':>6}") + print("-" * 90) + + for s in results: + llm_str = f"{s.llm_judge_score:.2f}" if s.llm_judge_score is not None else " --" + print( + f"{s.fixture_name:<25} {s.model:<25} {s.prompt_variant:<15} " + f"{s.precision:>6.2f} {s.recall:>6.2f} {s.f1:>6.2f} " + f"{s.field_accuracy:>6.2f} {llm_str:>6}" + ) + + print("=" * 90) + + # If LLM judge reasoning is available, print it + for s in results: + if s.llm_judge_reasoning: + print(f"\n[{s.model} / {s.prompt_variant}] LLM Judge: {s.llm_judge_reasoning}") + + print() diff --git a/services/batch-agent/eval/scorer.py b/services/batch-agent/eval/scorer.py new file mode 100644 index 0000000..51b2500 --- /dev/null +++ b/services/batch-agent/eval/scorer.py @@ -0,0 +1,268 @@ +"""Scoring functions for batch agent evaluation. + +Two scoring strategies: + +1. **FieldMatchScorer** — deterministic check: for each expected record, + find the best-matching actual record and compare specified fields. + Returns precision, recall, and per-field accuracy. + +2. **LLMJudgeScorer** — uses a secondary LLM to semantically evaluate + whether the actual extractions satisfy the expected intent, even if + wording differs. Returns a 0-1 score + reasoning. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from difflib import SequenceMatcher +from typing import Any + +from langchain_core.messages import HumanMessage, SystemMessage + +logger = logging.getLogger(__name__) + + +# ── Result types ───────────────────────────────────────────────────────── + + +@dataclass +class FieldScore: + """Score for a single expected record against its best match.""" + + expected: dict[str, Any] + best_match: dict[str, Any] | None + matched_fields: dict[str, bool] + similarity: float # 0-1 overall similarity + + @property + def field_accuracy(self) -> float: + if not self.matched_fields: + return 0.0 + return sum(self.matched_fields.values()) / len(self.matched_fields) + + +@dataclass +class EvalScores: + """Aggregated scores for one eval run.""" + + fixture_name: str + model: str + prompt_variant: str + field_scores: list[FieldScore] = field(default_factory=list) + precision: float = 0.0 + recall: float = 0.0 + f1: float = 0.0 + llm_judge_score: float | None = None + llm_judge_reasoning: str = "" + extra_records: int = 0 # records created but not expected + missing_records: int = 0 # expected but not found + + @property + def field_accuracy(self) -> float: + if not self.field_scores: + return 0.0 + return sum(s.field_accuracy for s in self.field_scores) / len(self.field_scores) + + def summary(self) -> dict[str, Any]: + return { + "fixture": self.fixture_name, + "model": self.model, + "prompt_variant": self.prompt_variant, + "precision": round(self.precision, 3), + "recall": round(self.recall, 3), + "f1": round(self.f1, 3), + "field_accuracy": round(self.field_accuracy, 3), + "llm_judge_score": round(self.llm_judge_score, 3) if self.llm_judge_score is not None else None, + "extra_records": self.extra_records, + "missing_records": self.missing_records, + } + + +# ── Field Match Scorer ─────────────────────────────────────────────────── + + +def _normalize(value: Any) -> str: + """Normalize a value for comparison.""" + if value is None: + return "" + return str(value).strip().lower() + + +def _text_similarity(a: str, b: str) -> float: + """Fuzzy text similarity using SequenceMatcher.""" + if not a and not b: + return 1.0 + if not a or not b: + return 0.0 + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + + +def _find_best_match( + expected: dict[str, Any], + actuals: list[dict[str, Any]], +) -> tuple[dict[str, Any] | None, float]: + """Find the actual record most similar to expected, return (match, similarity).""" + if not actuals: + return None, 0.0 + + best_match = None + best_score = 0.0 + + # Primary matching key: title or name + expected_title = _normalize(expected.get("title", expected.get("name", ""))) + + for actual in actuals: + actual_title = _normalize(actual.get("title", actual.get("name", ""))) + sim = _text_similarity(expected_title, actual_title) + if sim > best_score: + best_score = sim + best_match = actual + + return best_match, best_score + + +def _compare_fields( + expected: dict[str, Any], + actual: dict[str, Any], +) -> dict[str, bool]: + """Compare each expected field against the actual record.""" + results: dict[str, bool] = {} + for key, expected_val in expected.items(): + actual_val = actual.get(key) + # Exact match for non-string types + if not isinstance(expected_val, str): + results[key] = actual_val == expected_val + else: + # Fuzzy match for strings (threshold: 0.7) + results[key] = _text_similarity( + _normalize(expected_val), _normalize(actual_val) + ) >= 0.7 + return results + + +def score_field_match( + expected_records: list[dict[str, Any]], + actual_records: list[dict[str, Any]], + table: str, +) -> tuple[list[FieldScore], int, int]: + """Score actual extractions against expected records for one table. + + Returns (field_scores, extra_count, missing_count). + """ + field_scores: list[FieldScore] = [] + matched_actuals: set[int] = set() + + for exp in expected_records: + # Find best match among unmatched actuals + candidates = [ + (i, a) for i, a in enumerate(actual_records) if i not in matched_actuals + ] + if not candidates: + field_scores.append(FieldScore( + expected=exp, best_match=None, matched_fields={}, similarity=0.0, + )) + continue + + best_idx, best_match = None, None + best_sim = 0.0 + for idx, actual in candidates: + _, sim = _find_best_match(exp, [actual]) + if sim > best_sim: + best_sim = sim + best_idx = idx + best_match = actual + + if best_sim >= 0.5 and best_match is not None: + matched_actuals.add(best_idx) + matched_fields = _compare_fields(exp, best_match) + field_scores.append(FieldScore( + expected=exp, best_match=best_match, + matched_fields=matched_fields, similarity=best_sim, + )) + else: + field_scores.append(FieldScore( + expected=exp, best_match=None, matched_fields={}, similarity=0.0, + )) + + extra_count = len(actual_records) - len(matched_actuals) + missing_count = sum(1 for s in field_scores if s.best_match is None) + + return field_scores, extra_count, missing_count + + +def compute_precision_recall( + expected_count: int, + actual_count: int, + matched_count: int, +) -> tuple[float, float, float]: + """Compute precision, recall, F1.""" + precision = matched_count / actual_count if actual_count > 0 else 0.0 + recall = matched_count / expected_count if expected_count > 0 else 0.0 + f1 = ( + 2 * precision * recall / (precision + recall) + if (precision + recall) > 0 + else 0.0 + ) + return precision, recall, f1 + + +# ── LLM Judge Scorer ───────────────────────────────────────────────────── + +_JUDGE_SYSTEM_PROMPT = """\ +You are an evaluation judge for a data extraction system. + +Your task is to compare the EXPECTED extractions against the ACTUAL extractions +produced by an AI agent, and assess quality on a 0-1 scale. + +Scoring criteria: +- 1.0: All expected records found with correct fields, no significant extras +- 0.8: Most expected records found, minor field differences or extras +- 0.6: Core extractions present but some missing or incorrect +- 0.4: Partial match — several expected records missing or wrong +- 0.2: Poor quality — most expected records missing or incorrect +- 0.0: Complete failure — no meaningful overlap + +Consider semantic equivalence: "Send invoice" and "Email the invoice" are matches. +Ignore field ordering and formatting differences. + +Respond with ONLY a JSON object: +{"score": 0.85, "reasoning": "Brief explanation of the score"} +""" + + +async def llm_judge_score( + expected: list[dict[str, Any]], + actual: list[dict[str, Any]], + *, + judge_model: str = "gpt-4o-mini", +) -> tuple[float, str]: + """Use an LLM to semantically evaluate extraction quality. + + Returns (score, reasoning). + """ + from app.llm import get_llm + + llm = get_llm(model=judge_model, temperature=0) + + user_content = ( + f"## Expected extractions\n```json\n{json.dumps(expected, indent=2, default=str)}\n```\n\n" + f"## Actual extractions\n```json\n{json.dumps(actual, indent=2, default=str)}\n```" + ) + + try: + response = await llm.ainvoke([ + SystemMessage(content=_JUDGE_SYSTEM_PROMPT), + HumanMessage(content=user_content), + ]) + raw = response.content.strip() + if raw.startswith("```"): + raw = raw.split("```")[1] + if raw.startswith("json"): + raw = raw[4:] + parsed = json.loads(raw.strip()) + return float(parsed.get("score", 0.0)), str(parsed.get("reasoning", "")) + except Exception as exc: + logger.warning("eval: LLM judge failed: %s", exc) + return 0.0, f"Judge error: {exc}"