"""Eval runner — orchestrates fixture → mock → agent pipeline → scoring. For each (fixture × model × prompt_variant) combination: 1. Build a MockExecutor with fixture data 2. Patch execute_on_client 3. Override LLM_MODEL in shared settings 4. Run the batch agent pipeline (run_local_agent) 5. Collect mutations from the mock 6. Score against expected results (field match + optional LLM judge) 7. Report scores to Langfuse 8. Print results """ from __future__ import annotations import asyncio import copy import json import logging import time import uuid from pathlib import Path from typing import Any from eval.config import EvalFixture, ExpectedRecord from eval.mock_executor import MockExecutor from eval.scorer import ( EvalScores, FieldScore, compute_precision_recall, llm_judge_score, score_field_match, ) from eval import langfuse_eval logger = logging.getLogger(__name__) async def run_single_eval( fixture: EvalFixture, model: str, prompt_variant: str, *, use_llm_judge: bool = True, judge_model: str = "gpt-4o-mini", ) -> EvalScores: """Execute one (fixture × model × prompt_variant) eval and return scores.""" from shared.config import settings prompt_template = fixture.prompt_variants.get(prompt_variant, "") # Build mock executor seed = copy.deepcopy(fixture.seed_records) mock = MockExecutor( fixture_dir=fixture.fixture_dir, seed_records=seed, ) # Override the LLM model for this run original_model = settings.LLM_MODEL settings.LLM_MODEL = model # Build trigger data (same shape as what redis_consumer delivers) trigger_data: dict[str, Any] = { "type": "agent_trigger", "directory": fixture.directory, "directory_paths": [fixture.directory], "data_types": fixture.data_types, "file_extensions": fixture.file_extensions, "prompt_template": prompt_template, "device_id": "eval-harness", "run_context": { "agent_id": f"eval-{fixture.name}-{prompt_variant}", "run_id": None, # skip DB logging during eval }, } eval_user_id = f"eval-{uuid.uuid4().hex[:8]}" logger.info( "eval: starting %s | model=%s | variant=%s", fixture.name, model, prompt_variant, ) start_time = time.time() try: # Patch execute_on_client + set user context, then run the pipeline from app.ws_context import set_current_user, clear_current_user from app.agent_runner import run_local_agent set_current_user(eval_user_id) with mock.patch(): await run_local_agent(eval_user_id, trigger_data) except Exception as exc: logger.error("eval: pipeline failed for %s/%s/%s: %s", fixture.name, model, prompt_variant, exc) finally: settings.LLM_MODEL = original_model from app.ws_context import clear_current_user clear_current_user() elapsed = time.time() - start_time logger.info("eval: pipeline completed in %.1fs — %d mutations", elapsed, len(mock.mutations)) # ── Score results ──────────────────────────────────────────── all_field_scores: list[FieldScore] = [] total_expected = 0 total_actual = 0 total_matched = 0 total_extra = 0 total_missing = 0 # Group expected by table expected_by_table: dict[str, list[dict]] = {} for rec in fixture.expected: expected_by_table.setdefault(rec.table, []).append(rec.fields) # Compare against actual mutations (inserts + updates) tables = set(expected_by_table.keys()) | {m.table for m in mock.mutations} for table in tables: expected_records = expected_by_table.get(table, []) actual_records = mock.created_records(table) + mock.updated_records(table) field_scores, extra, missing = score_field_match(expected_records, actual_records, table) all_field_scores.extend(field_scores) matched = sum(1 for s in field_scores if s.best_match is not None) total_expected += len(expected_records) total_actual += len(actual_records) total_matched += matched total_extra += extra total_missing += missing precision, recall, f1 = compute_precision_recall(total_expected, total_actual, total_matched) scores = EvalScores( fixture_name=fixture.name, model=model, prompt_variant=prompt_variant, field_scores=all_field_scores, precision=precision, recall=recall, f1=f1, extra_records=total_extra, missing_records=total_missing, ) # ── Optional LLM judge ─────────────────────────────────────── if use_llm_judge and fixture.expected: all_expected = [r.fields for r in fixture.expected] all_actual = [m.data for m in mock.mutations if m.action in ("insert", "update")] judge_score, reasoning = await llm_judge_score( all_expected, all_actual, judge_model=judge_model, ) scores.llm_judge_score = judge_score scores.llm_judge_reasoning = reasoning # ── Report to Langfuse ─────────────────────────────────────── dataset_name = f"batch-eval-{fixture.name}" dataset_item_id = f"{fixture.name}--{prompt_variant}" run_name = f"{model}--{prompt_variant}--{int(time.time())}" trace_id = langfuse_eval.log_eval_trace( fixture_name=fixture.name, model=model, prompt_variant=prompt_variant, prompt_template=prompt_template, actual_mutations=[{"action": m.action, "table": m.table, "data": m.data} for m in mock.mutations], scores_summary=scores.summary(), dataset_name=dataset_name, run_name=run_name, dataset_item_id=dataset_item_id, ) if trace_id: langfuse_eval.post_eval_scores(scores, trace_id=trace_id) return scores async def run_fixture_eval( fixture: EvalFixture, models: list[str], *, variants: list[str] | None = None, use_llm_judge: bool = True, judge_model: str = "gpt-4o-mini", ) -> list[EvalScores]: """Run all (model × variant) combinations for a fixture.""" if variants is None: variants = list(fixture.prompt_variants.keys()) # Sync fixture to Langfuse dataset langfuse_eval.sync_fixture_to_dataset(fixture) results: list[EvalScores] = [] for model in models: for variant in variants: if variant not in fixture.prompt_variants: logger.warning("eval: variant %r not found in fixture %s", variant, fixture.name) continue scores = await run_single_eval( fixture, model, variant, use_llm_judge=use_llm_judge, judge_model=judge_model, ) results.append(scores) return results def print_results(results: list[EvalScores]) -> None: """Print a formatted summary table of eval results.""" if not results: print("\nNo eval results.") return print("\n" + "=" * 90) print(f"{'Fixture':<25} {'Model':<25} {'Variant':<15} {'P':>6} {'R':>6} {'F1':>6} {'FA':>6} {'LLM':>6}") print("-" * 90) for s in results: llm_str = f"{s.llm_judge_score:.2f}" if s.llm_judge_score is not None else " --" print( f"{s.fixture_name:<25} {s.model:<25} {s.prompt_variant:<15} " f"{s.precision:>6.2f} {s.recall:>6.2f} {s.f1:>6.2f} " f"{s.field_accuracy:>6.2f} {llm_str:>6}" ) print("=" * 90) # If LLM judge reasoning is available, print it for s in results: if s.llm_judge_reasoning: print(f"\n[{s.model} / {s.prompt_variant}] LLM Judge: {s.llm_judge_reasoning}") print()