refactor(eval): 3-mode eval harness (step1/step2/full) with Langfuse fixes

- Rewrite eval config with EvalMode (step1, step2, full) replacing prompt_variants
- Rewrite runner with _run_step1, _run_step2, _run_full dispatch
- CLI: replace --variants with --mode flag
- Add 3 fixture YAMLs: classify_invoices (step1), process_invoices (step2), full_invoices (full)
- Remove old freelance_invoices fixture
- Langfuse: mode-aware dataset items (classifications for step1, extraction for step2, both for full)
- Langfuse: link both prompts (batch_file_classifier + batch_processing) in full mode
- Langfuse: post separate classification_precision/recall/f1 scores for full mode
- Langfuse: skip misleading field_accuracy=0 when field_scores is empty (step1)
- Langfuse: include step1_results in trace output
- MockExecutor: mock async_session to bypass DB in full mode
- Journey fixture: remove user_messages (only interactive test kept)
This commit is contained in:
Roberto Musso
2026-03-24 16:18:51 +01:00
parent 63fa119543
commit d3f7099d93
13 changed files with 1409 additions and 439 deletions

View File

@@ -1,21 +1,21 @@
"""Langfuse evaluation integration — datasets, runs, and scoring.
Uses the Langfuse Python SDK to:
Uses the Langfuse Python SDK v4 (OpenTelemetry-based) to:
1. **Sync fixtures → Langfuse datasets**: Each YAML fixture becomes a dataset,
each prompt variant + expected pair becomes a dataset item.
2. **Track eval runs**: Each (fixture × model × prompt_variant) execution
is recorded as a dataset run with linked traces and scores.
is recorded as a trace with linked scores.
3. **Post scores**: precision, recall, F1, field_accuracy, llm_judge are
posted as numeric scores on the trace/run.
posted as numeric scores on the trace.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Any
from shared.config import settings
@@ -26,16 +26,16 @@ logger = logging.getLogger(__name__)
def _get_langfuse():
"""Get or create a Langfuse client instance."""
"""Get or create a Langfuse client instance (SDK v4)."""
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
return None
try:
from langfuse import Langfuse
return Langfuse(
secret_key=settings.LANGFUSE_SECRET_KEY,
public_key=settings.LANGFUSE_PUBLIC_KEY,
host=settings.LANGFUSE_HOST,
)
os.environ.setdefault("LANGFUSE_SECRET_KEY", settings.LANGFUSE_SECRET_KEY)
os.environ.setdefault("LANGFUSE_PUBLIC_KEY", settings.LANGFUSE_PUBLIC_KEY)
if settings.LANGFUSE_HOST:
os.environ.setdefault("LANGFUSE_HOST", settings.LANGFUSE_HOST)
from langfuse import get_client
return get_client()
except Exception as exc:
logger.warning("langfuse_eval: failed to create client: %s", exc)
return None
@@ -61,35 +61,44 @@ def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None:
lf.create_dataset(
name=dataset_name,
description=fixture.description,
metadata={"data_types": fixture.data_types, "file_extensions": fixture.file_extensions},
metadata={
"data_types": ",".join(fixture.data_types),
"file_extensions": ",".join(fixture.file_extensions) if fixture.file_extensions else "",
},
)
except Exception:
# Dataset may already exist — that's fine
pass
expected_output = {}
for rec in fixture.expected:
expected_output.setdefault(rec.table, []).append(rec.fields)
# Build expected_output appropriate to the fixture's mode
expected_output: dict[str, Any] = {}
if fixture.mode in ("step1", "full") and fixture.expected_classification:
expected_output["classifications"] = [
{"file": ec.file, "project_id": ec.project_id, "domains": ec.domains}
for ec in fixture.expected_classification
]
if fixture.mode in ("step2", "full") and fixture.expected:
for rec in fixture.expected:
expected_output.setdefault(rec.table, []).append(rec.fields)
for variant_name, prompt_template in fixture.prompt_variants.items():
item_id = f"{fixture.name}--{variant_name}"
try:
lf.create_dataset_item(
dataset_name=dataset_name,
id=item_id,
input={
"directory": fixture.directory,
"data_types": fixture.data_types,
"prompt_template": prompt_template,
"seed_records": fixture.seed_records,
},
expected_output=expected_output,
metadata={"prompt_variant": variant_name},
)
except Exception as exc:
logger.warning(
"langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc
)
item_id = f"{fixture.name}--{fixture.mode}"
try:
lf.create_dataset_item(
dataset_name=dataset_name,
id=item_id,
input={
"directory": fixture.directory,
"data_types": fixture.data_types,
"mode": fixture.mode,
"seed_records": fixture.seed_records,
},
expected_output=expected_output,
metadata={"mode": fixture.mode},
)
except Exception as exc:
logger.warning(
"langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc
)
lf.flush()
logger.info("langfuse_eval: synced fixture '%s' → dataset '%s'", fixture.name, dataset_name)
@@ -114,7 +123,7 @@ def sync_journey_fixture_to_dataset(fixture) -> str | None:
lf.create_dataset(
name=dataset_name,
description=fixture.description,
metadata={"type": "journey", "data_types": fixture.data_types},
metadata={"type": "journey", "data_types": ",".join(fixture.data_types)},
)
except Exception:
pass # Dataset may already exist
@@ -148,18 +157,26 @@ def create_eval_run(
*,
metadata: dict[str, Any] | None = None,
) -> str:
"""Create a dataset run in Langfuse. Returns the run name."""
"""Create a dataset run in Langfuse. Returns the run name.
Note: In SDK v4, dataset runs are created implicitly via
dataset.run_experiment(). This function is kept for backwards
compatibility but may not create a run.
"""
lf = _get_langfuse()
if lf is None:
return run_name
try:
lf.create_dataset_run(
dataset_name=dataset_name,
run_name=run_name,
metadata=metadata or {},
)
lf.flush()
if hasattr(lf, "create_dataset_run"):
lf.create_dataset_run(
dataset_name=dataset_name,
run_name=run_name,
metadata=metadata or {},
)
lf.flush()
else:
logger.debug("langfuse_eval: create_dataset_run not available in SDK v4")
except Exception as exc:
logger.warning("langfuse_eval: failed to create run %s: %s", run_name, exc)
@@ -185,21 +202,22 @@ def post_eval_scores(
("precision", scores.precision),
("recall", scores.recall),
("f1", scores.f1),
("field_accuracy", scores.field_accuracy),
]
# Only post field_accuracy when there are field-level scores (step2/full)
if scores.field_scores:
score_data.append(("field_accuracy", scores.field_accuracy))
if scores.llm_judge_score is not None:
score_data.append(("llm_judge", scores.llm_judge_score))
for name, value in score_data:
try:
kwargs: dict[str, Any] = {
"name": name,
"value": value,
"comment": f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}",
}
if trace_id:
kwargs["trace_id"] = trace_id
lf.score(**kwargs)
lf.create_score(
name=name,
value=value,
trace_id=trace_id,
data_type="NUMERIC",
comment=f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}",
)
except Exception as exc:
logger.warning("langfuse_eval: failed to post score %s: %s", name, exc)
@@ -218,12 +236,20 @@ def log_eval_trace(
prompt_template: str,
actual_mutations: list[dict],
scores_summary: dict[str, Any],
step1_results: list[dict] | None = None,
dataset_name: str | None = None,
run_name: str | None = None,
dataset_item_id: str | None = None,
langfuse_prompt_names: list[str] | None = None,
) -> str | None:
"""Create a Langfuse trace for one eval execution and link it to a dataset run.
Uses SDK v4 observation API (traces are created implicitly by root spans).
``langfuse_prompt_names`` can contain one or two prompt names to link
(e.g. ``["batch_file_classifier", "batch_processing"]`` for full mode).
Each prompt gets its own generation-type observation for per-version
metrics tracking.
Returns the trace_id, or None if Langfuse is unavailable.
"""
lf = _get_langfuse()
@@ -231,38 +257,71 @@ def log_eval_trace(
return None
try:
trace = lf.trace(
name=f"eval-{fixture_name}",
input={
"prompt_template": prompt_template,
"model": model,
"prompt_variant": prompt_variant,
},
output={
"mutations": actual_mutations[:50],
"scores": scores_summary,
},
from langfuse import propagate_attributes
# Fetch prompt objects for linking
prompt_objs: list[tuple[str, Any]] = []
for pname in (langfuse_prompt_names or []):
try:
obj = lf.get_prompt(name=pname, cache_ttl_seconds=300)
prompt_objs.append((pname, obj))
logger.info("langfuse_eval: linked prompt '%s' (type=%s)", pname, type(obj).__name__)
except Exception as exc:
logger.warning("langfuse_eval: prompt '%s' not found — %s", pname, exc)
# Build trace output dict
trace_output: dict[str, Any] = {"scores": scores_summary}
if step1_results:
trace_output["classifications"] = step1_results
if actual_mutations:
trace_output["mutations"] = actual_mutations[:50]
with propagate_attributes(
trace_name=f"eval-{fixture_name}",
metadata={
"eval": True,
"eval": "true",
"fixture": fixture_name,
"model": model,
"prompt_variant": prompt_variant,
},
tags=["eval", f"model:{model}", f"variant:{prompt_variant}"],
)
):
# Root span for the eval run
span = lf.start_observation(name=f"eval-{fixture_name}")
span.update(
input={
"prompt_template": prompt_template,
"model": model,
"prompt_variant": prompt_variant,
},
output=trace_output,
)
trace_id = span.trace_id
# Link to dataset run if available
if dataset_name and run_name and dataset_item_id:
try:
dataset = lf.get_dataset(dataset_name)
item = dataset.get_item(dataset_item_id)
if item:
item.link(trace, run_name)
except Exception as exc:
logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc)
# Create a generation-type observation per linked prompt
for pname, pobj in prompt_objs:
gen = lf.start_observation(
name=f"prompt-{pname}",
prompt=pobj,
as_type="generation",
)
gen.end()
# Link to dataset run if available
if dataset_name and run_name and dataset_item_id:
try:
dataset = lf.get_dataset(dataset_name)
for item in dataset.items:
if item.id == dataset_item_id:
item.link(span, run_name)
break
except Exception as exc:
logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc)
span.end()
lf.flush()
return trace.id
return trace_id
except Exception as exc:
logger.warning("langfuse_eval: failed to create eval trace: %s", exc)
return None