"""Langfuse evaluation integration — datasets, runs, and scoring. Uses the Langfuse Python SDK to: 1. **Sync fixtures → Langfuse datasets**: Each YAML fixture becomes a dataset, each prompt variant + expected pair becomes a dataset item. 2. **Track eval runs**: Each (fixture × model × prompt_variant) execution is recorded as a dataset run with linked traces and scores. 3. **Post scores**: precision, recall, F1, field_accuracy, llm_judge are posted as numeric scores on the trace/run. """ from __future__ import annotations import json import logging from typing import Any from shared.config import settings from eval.config import EvalFixture from eval.scorer import EvalScores logger = logging.getLogger(__name__) def _get_langfuse(): """Get or create a Langfuse client instance.""" if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY: return None try: from langfuse import Langfuse return Langfuse( secret_key=settings.LANGFUSE_SECRET_KEY, public_key=settings.LANGFUSE_PUBLIC_KEY, host=settings.LANGFUSE_HOST, ) except Exception as exc: logger.warning("langfuse_eval: failed to create client: %s", exc) return None def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None: """Create or update a Langfuse dataset from a fixture. Each prompt variant becomes a separate dataset item with: - input: {directory, data_types, prompt_template, seed_records} - expected_output: {expected records} Returns the dataset name, or None if Langfuse is unavailable. """ lf = _get_langfuse() if lf is None: logger.info("langfuse_eval: Langfuse not configured — skipping dataset sync") return None dataset_name = f"batch-eval-{fixture.name}" try: lf.create_dataset( name=dataset_name, description=fixture.description, metadata={"data_types": fixture.data_types, "file_extensions": fixture.file_extensions}, ) except Exception: # Dataset may already exist — that's fine pass expected_output = {} for rec in fixture.expected: expected_output.setdefault(rec.table, []).append(rec.fields) for variant_name, prompt_template in fixture.prompt_variants.items(): item_id = f"{fixture.name}--{variant_name}" try: lf.create_dataset_item( dataset_name=dataset_name, id=item_id, input={ "directory": fixture.directory, "data_types": fixture.data_types, "prompt_template": prompt_template, "seed_records": fixture.seed_records, }, expected_output=expected_output, metadata={"prompt_variant": variant_name}, ) except Exception as exc: logger.warning( "langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc ) lf.flush() logger.info("langfuse_eval: synced fixture '%s' → dataset '%s'", fixture.name, dataset_name) return dataset_name def sync_journey_fixture_to_dataset(fixture) -> str | None: """Create or update a Langfuse dataset from a journey fixture. Each journey fixture becomes a single dataset item with: - input: {directory, data_types, user_messages} - expected_output: {criteria} """ lf = _get_langfuse() if lf is None: logger.info("langfuse_eval: Langfuse not configured — skipping journey dataset sync") return None dataset_name = f"journey-eval-{fixture.name}" try: lf.create_dataset( name=dataset_name, description=fixture.description, metadata={"type": "journey", "data_types": fixture.data_types}, ) except Exception: pass # Dataset may already exist item_id = f"{fixture.name}--journey" try: lf.create_dataset_item( dataset_name=dataset_name, id=item_id, input={ "directory": fixture.directory, "data_types": fixture.data_types, "user_messages": fixture.user_messages, }, expected_output={ "criteria": fixture.expected_template_criteria, }, metadata={"type": "journey"}, ) except Exception as exc: logger.warning("langfuse_eval: failed to upsert journey dataset item %s: %s", item_id, exc) lf.flush() logger.info("langfuse_eval: synced journey fixture '%s' → dataset '%s'", fixture.name, dataset_name) return dataset_name def create_eval_run( dataset_name: str, run_name: str, *, metadata: dict[str, Any] | None = None, ) -> str: """Create a dataset run in Langfuse. Returns the run name.""" lf = _get_langfuse() if lf is None: return run_name try: lf.create_dataset_run( dataset_name=dataset_name, run_name=run_name, metadata=metadata or {}, ) lf.flush() except Exception as exc: logger.warning("langfuse_eval: failed to create run %s: %s", run_name, exc) return run_name def post_eval_scores( scores: EvalScores, *, trace_id: str | None = None, dataset_name: str | None = None, run_name: str | None = None, ) -> None: """Post evaluation scores to Langfuse. If trace_id is provided, scores are attached to that trace. """ lf = _get_langfuse() if lf is None: return score_data = [ ("precision", scores.precision), ("recall", scores.recall), ("f1", scores.f1), ("field_accuracy", scores.field_accuracy), ] if scores.llm_judge_score is not None: score_data.append(("llm_judge", scores.llm_judge_score)) for name, value in score_data: try: kwargs: dict[str, Any] = { "name": name, "value": value, "comment": f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}", } if trace_id: kwargs["trace_id"] = trace_id lf.score(**kwargs) except Exception as exc: logger.warning("langfuse_eval: failed to post score %s: %s", name, exc) lf.flush() logger.info( "langfuse_eval: posted %d scores for %s/%s/%s", len(score_data), scores.fixture_name, scores.model, scores.prompt_variant, ) def log_eval_trace( *, fixture_name: str, model: str, prompt_variant: str, prompt_template: str, actual_mutations: list[dict], scores_summary: dict[str, Any], dataset_name: str | None = None, run_name: str | None = None, dataset_item_id: str | None = None, ) -> str | None: """Create a Langfuse trace for one eval execution and link it to a dataset run. Returns the trace_id, or None if Langfuse is unavailable. """ lf = _get_langfuse() if lf is None: return None try: trace = lf.trace( name=f"eval-{fixture_name}", input={ "prompt_template": prompt_template, "model": model, "prompt_variant": prompt_variant, }, output={ "mutations": actual_mutations[:50], "scores": scores_summary, }, metadata={ "eval": True, "fixture": fixture_name, "model": model, "prompt_variant": prompt_variant, }, tags=["eval", f"model:{model}", f"variant:{prompt_variant}"], ) # Link to dataset run if available if dataset_name and run_name and dataset_item_id: try: dataset = lf.get_dataset(dataset_name) item = dataset.get_item(dataset_item_id) if item: item.link(trace, run_name) except Exception as exc: logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc) lf.flush() return trace.id except Exception as exc: logger.warning("langfuse_eval: failed to create eval trace: %s", exc) return None