"""Langfuse evaluation integration — datasets, runs, and scoring. Uses the Langfuse Python SDK v4 (OpenTelemetry-based) to: 1. **Sync fixtures → Langfuse datasets**: Each YAML fixture becomes a dataset, each prompt variant + expected pair becomes a dataset item. 2. **Track eval runs**: Each (fixture × model × prompt_variant) execution is recorded as a trace with linked scores. 3. **Post scores**: precision, recall, F1, field_accuracy, llm_judge are posted as numeric scores on the trace. """ from __future__ import annotations import logging import os from typing import Any from shared.config import settings from eval.config import EvalFixture from eval.scorer import EvalScores logger = logging.getLogger(__name__) def _get_langfuse(): """Get or create a Langfuse client instance (SDK v4).""" if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY: return None try: os.environ.setdefault("LANGFUSE_SECRET_KEY", settings.LANGFUSE_SECRET_KEY) os.environ.setdefault("LANGFUSE_PUBLIC_KEY", settings.LANGFUSE_PUBLIC_KEY) if settings.LANGFUSE_HOST: os.environ.setdefault("LANGFUSE_HOST", settings.LANGFUSE_HOST) from langfuse import get_client return get_client() except Exception as exc: logger.warning("langfuse_eval: failed to create client: %s", exc) return None def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None: """Create or update a Langfuse dataset from a fixture. Each prompt variant becomes a separate dataset item with: - input: {directory, data_types, prompt_template, seed_records} - expected_output: {expected records} Returns the dataset name, or None if Langfuse is unavailable. """ lf = _get_langfuse() if lf is None: logger.info("langfuse_eval: Langfuse not configured — skipping dataset sync") return None dataset_name = f"batch-eval-{fixture.name}" try: lf.create_dataset( name=dataset_name, description=fixture.description, metadata={ "data_types": ",".join(fixture.data_types), "file_extensions": ",".join(fixture.file_extensions) if fixture.file_extensions else "", }, ) except Exception: # Dataset may already exist — that's fine pass # Build expected_output appropriate to the fixture's mode expected_output: dict[str, Any] = {} if fixture.mode in ("step1", "full") and fixture.expected_classification: expected_output["classifications"] = [ {"file": ec.file, "project_id": ec.project_id, "domains": ec.domains} for ec in fixture.expected_classification ] if fixture.mode in ("step2", "full") and fixture.expected: for rec in fixture.expected: expected_output.setdefault(rec.table, []).append(rec.fields) item_id = f"{fixture.name}--{fixture.mode}" try: lf.create_dataset_item( dataset_name=dataset_name, id=item_id, input={ "directory": fixture.directory, "data_types": fixture.data_types, "mode": fixture.mode, "seed_records": fixture.seed_records, }, expected_output=expected_output, metadata={"mode": fixture.mode}, ) except Exception as exc: logger.warning( "langfuse_eval: failed to upsert dataset item %s: %s", item_id, exc ) lf.flush() logger.info("langfuse_eval: synced fixture '%s' → dataset '%s'", fixture.name, dataset_name) return dataset_name def sync_journey_fixture_to_dataset(fixture) -> str | None: """Create or update a Langfuse dataset from a journey fixture. Each journey fixture becomes a single dataset item with: - input: {directory, data_types, user_messages} - expected_output: {criteria} """ lf = _get_langfuse() if lf is None: logger.info("langfuse_eval: Langfuse not configured — skipping journey dataset sync") return None dataset_name = f"journey-eval-{fixture.name}" try: lf.create_dataset( name=dataset_name, description=fixture.description, metadata={"type": "journey", "data_types": ",".join(fixture.data_types)}, ) except Exception: pass # Dataset may already exist item_id = f"{fixture.name}--journey" try: lf.create_dataset_item( dataset_name=dataset_name, id=item_id, input={ "directory": fixture.directory, "data_types": fixture.data_types, "user_messages": fixture.user_messages, }, expected_output={ "criteria": fixture.expected_template_criteria, }, metadata={"type": "journey"}, ) except Exception as exc: logger.warning("langfuse_eval: failed to upsert journey dataset item %s: %s", item_id, exc) lf.flush() logger.info("langfuse_eval: synced journey fixture '%s' → dataset '%s'", fixture.name, dataset_name) return dataset_name def create_eval_run( dataset_name: str, run_name: str, *, metadata: dict[str, Any] | None = None, ) -> str: """Create a dataset run in Langfuse. Returns the run name. Note: In SDK v4, dataset runs are created implicitly via dataset.run_experiment(). This function is kept for backwards compatibility but may not create a run. """ lf = _get_langfuse() if lf is None: return run_name try: if hasattr(lf, "create_dataset_run"): lf.create_dataset_run( dataset_name=dataset_name, run_name=run_name, metadata=metadata or {}, ) lf.flush() else: logger.debug("langfuse_eval: create_dataset_run not available in SDK v4") except Exception as exc: logger.warning("langfuse_eval: failed to create run %s: %s", run_name, exc) return run_name def post_eval_scores( scores: EvalScores, *, trace_id: str | None = None, dataset_name: str | None = None, run_name: str | None = None, ) -> None: """Post evaluation scores to Langfuse. If trace_id is provided, scores are attached to that trace. """ lf = _get_langfuse() if lf is None: return score_data = [ ("precision", scores.precision), ("recall", scores.recall), ("f1", scores.f1), ] # Only post field_accuracy when there are field-level scores (step2/full) if scores.field_scores: score_data.append(("field_accuracy", scores.field_accuracy)) if scores.llm_judge_score is not None: score_data.append(("llm_judge", scores.llm_judge_score)) for name, value in score_data: try: lf.create_score( name=name, value=value, trace_id=trace_id, data_type="NUMERIC", comment=f"{scores.fixture_name} | {scores.model} | {scores.prompt_variant}", ) except Exception as exc: logger.warning("langfuse_eval: failed to post score %s: %s", name, exc) lf.flush() logger.info( "langfuse_eval: posted %d scores for %s/%s/%s", len(score_data), scores.fixture_name, scores.model, scores.prompt_variant, ) def log_eval_trace( *, fixture_name: str, model: str, prompt_variant: str, prompt_template: str, actual_mutations: list[dict], scores_summary: dict[str, Any], step1_results: list[dict] | None = None, dataset_name: str | None = None, run_name: str | None = None, dataset_item_id: str | None = None, langfuse_prompt_names: list[str] | None = None, ) -> str | None: """Create a Langfuse trace for one eval execution and link it to a dataset run. Uses SDK v4 observation API (traces are created implicitly by root spans). ``langfuse_prompt_names`` can contain one or two prompt names to link (e.g. ``["batch_file_classifier", "batch_processing"]`` for full mode). Each prompt gets its own generation-type observation for per-version metrics tracking. Returns the trace_id, or None if Langfuse is unavailable. """ lf = _get_langfuse() if lf is None: return None try: from langfuse import propagate_attributes # Fetch prompt objects for linking prompt_objs: list[tuple[str, Any]] = [] for pname in (langfuse_prompt_names or []): try: obj = lf.get_prompt(name=pname, cache_ttl_seconds=300) prompt_objs.append((pname, obj)) logger.info("langfuse_eval: linked prompt '%s' (type=%s)", pname, type(obj).__name__) except Exception as exc: logger.warning("langfuse_eval: prompt '%s' not found — %s", pname, exc) # Build trace output dict trace_output: dict[str, Any] = {"scores": scores_summary} if step1_results: trace_output["classifications"] = step1_results if actual_mutations: trace_output["mutations"] = actual_mutations[:50] with propagate_attributes( trace_name=f"eval-{fixture_name}", metadata={ "eval": "true", "fixture": fixture_name, "model": model, "prompt_variant": prompt_variant, }, tags=["eval", f"model:{model}", f"variant:{prompt_variant}"], ): # Root span for the eval run span = lf.start_observation(name=f"eval-{fixture_name}") span.update( input={ "prompt_template": prompt_template, "model": model, "prompt_variant": prompt_variant, }, output=trace_output, ) trace_id = span.trace_id # Create a generation-type observation per linked prompt for pname, pobj in prompt_objs: gen = lf.start_observation( name=f"prompt-{pname}", prompt=pobj, as_type="generation", ) gen.end() # Link to dataset run if available if dataset_name and run_name and dataset_item_id: try: dataset = lf.get_dataset(dataset_name) for item in dataset.items: if item.id == dataset_item_id: item.link(span, run_name) break except Exception as exc: logger.warning("langfuse_eval: failed to link trace to dataset run: %s", exc) span.end() lf.flush() return trace_id except Exception as exc: logger.warning("langfuse_eval: failed to create eval trace: %s", exc) return None