From 63fa11954394e5e866e231870b498bdacd20b388 Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Mon, 23 Mar 2026 23:16:41 +0100 Subject: [PATCH] feat(batch-agent): add journey eval to E2E harness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - journey_runner.py: orchestrates journey start → simulated user messages → template extraction → LLM judge scoring - config.py: JourneyFixture dataclass with user_messages and expected_template_criteria, discover_journey_fixtures() - langfuse_eval.py: sync_journey_fixture_to_dataset() - cli.py: new 'journey' subcommand (python -m eval journey) with --fixture, --models, --judge-model flags - fixtures/journey_invoice_setup.yaml: example journey fixture with 4 user messages and 8 quality criteria --- services/batch-agent/eval/cli.py | 101 ++++- services/batch-agent/eval/config.py | 89 +++++ .../eval/fixtures/journey_invoice_setup.yaml | 46 +++ services/batch-agent/eval/journey_runner.py | 372 ++++++++++++++++++ services/batch-agent/eval/langfuse_eval.py | 46 +++ 5 files changed, 643 insertions(+), 11 deletions(-) create mode 100644 services/batch-agent/eval/fixtures/journey_invoice_setup.yaml create mode 100644 services/batch-agent/eval/journey_runner.py diff --git a/services/batch-agent/eval/cli.py b/services/batch-agent/eval/cli.py index ca339b7..59a1dbf 100644 --- a/services/batch-agent/eval/cli.py +++ b/services/batch-agent/eval/cli.py @@ -3,13 +3,17 @@ Usage:: # From services/batch-agent/: - python -m eval run # all fixtures, default model + python -m eval run # all agent fixtures, default model python -m eval run --fixture=freelance-invoices # single fixture python -m eval run --models=gpt-4o,anthropic/claude-sonnet-4 python -m eval run --variants=baseline,detailed # specific prompt variants python -m eval run --no-judge # skip LLM judge scoring - python -m eval list # list available fixtures + python -m eval journey # all journey fixtures + python -m eval journey --fixture=journey-invoices # single journey fixture + python -m eval journey --models=gpt-4o,anthropic/claude-sonnet-4 + + python -m eval list # list all fixtures python -m eval sync # sync fixtures to Langfuse datasets """ @@ -28,8 +32,9 @@ for p in (_SERVICE_ROOT, _REPO_ROOT): if str(p) not in sys.path: sys.path.insert(0, str(p)) -from eval.config import discover_fixtures +from eval.config import discover_fixtures, discover_journey_fixtures from eval.runner import run_fixture_eval, print_results +from eval.journey_runner import run_journey_fixture_eval, print_journey_results from eval import langfuse_eval @@ -90,6 +95,29 @@ def _parse_args() -> argparse.Namespace: list_cmd.add_argument("--fixtures-dir", default=None) list_cmd.add_argument("-v", "--verbose", action="store_true") + # ── journey ─────────────────────────────────────────────────── + journey_cmd = sub.add_parser("journey", help="Run journey evaluations") + journey_cmd.add_argument( + "--fixture", "-f", + help="Run only the named journey fixture (default: all)", + ) + journey_cmd.add_argument( + "--models", "-m", + default="gpt-4o", + help="Comma-separated list of models to test (default: gpt-4o)", + ) + journey_cmd.add_argument( + "--judge-model", + default="gpt-4o-mini", + help="Model for LLM judge (default: gpt-4o-mini)", + ) + journey_cmd.add_argument( + "--fixtures-dir", + default=None, + help="Path to fixtures directory (default: eval/fixtures/)", + ) + journey_cmd.add_argument("-v", "--verbose", action="store_true") + # ── sync ────────────────────────────────────────────────────── sync_cmd = sub.add_parser("sync", help="Sync fixtures to Langfuse datasets") sync_cmd.add_argument("--fixture", "-f", default=None, help="Sync only the named fixture") @@ -136,25 +164,41 @@ async def _cmd_run(args: argparse.Namespace) -> None: def _cmd_list(args: argparse.Namespace) -> None: fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir)) - if not fixtures: + journey_fixtures = discover_journey_fixtures(_fixtures_dir(args.fixtures_dir)) + + if not fixtures and not journey_fixtures: print("No fixtures found.") return - print(f"\n{'Name':<30} {'Types':<25} {'Variants':<20} {'Expected'}") - print("-" * 90) - for f in fixtures: - variants = ", ".join(f.prompt_variants.keys()) - types = ", ".join(f.data_types) - print(f"{f.name:<30} {types:<25} {variants:<20} {len(f.expected)}") + if fixtures: + print(f"\n{'[Agent Fixtures]'}") + print(f"{'Name':<30} {'Types':<25} {'Variants':<20} {'Expected'}") + print("-" * 90) + for f in fixtures: + variants = ", ".join(f.prompt_variants.keys()) + types = ", ".join(f.data_types) + print(f"{f.name:<30} {types:<25} {variants:<20} {len(f.expected)}") + + if journey_fixtures: + print(f"\n{'[Journey Fixtures]'}") + print(f"{'Name':<30} {'Types':<25} {'Messages':<10} {'Criteria'}") + print("-" * 90) + for f in journey_fixtures: + types = ", ".join(f.data_types) + print(f"{f.name:<30} {types:<25} {len(f.user_messages):<10} {len(f.expected_template_criteria)}") + print() def _cmd_sync(args: argparse.Namespace) -> None: fixtures = discover_fixtures(_fixtures_dir(args.fixtures_dir)) + journey_fixtures = discover_journey_fixtures(_fixtures_dir(args.fixtures_dir)) + if args.fixture: fixtures = [f for f in fixtures if f.name == args.fixture] + journey_fixtures = [f for f in journey_fixtures if f.name == args.fixture] - if not fixtures: + if not fixtures and not journey_fixtures: print("No fixtures to sync.") return @@ -165,6 +209,39 @@ def _cmd_sync(args: argparse.Namespace) -> None: else: print(f"Skipped: {fixture.name} (Langfuse not configured)") + for fixture in journey_fixtures: + name = langfuse_eval.sync_journey_fixture_to_dataset(fixture) + if name: + print(f"Synced: {fixture.name} → {name}") + else: + print(f"Skipped: {fixture.name} (Langfuse not configured)") + + +async def _cmd_journey(args: argparse.Namespace) -> None: + journey_fixtures = discover_journey_fixtures(_fixtures_dir(args.fixtures_dir)) + if not journey_fixtures: + print("No journey fixtures found. Create YAML files with type: journey in eval/fixtures/.") + return + + if args.fixture: + journey_fixtures = [f for f in journey_fixtures if f.name == args.fixture] + if not journey_fixtures: + print(f"Journey fixture '{args.fixture}' not found.") + return + + models = [m.strip() for m in args.models.split(",")] + + all_results = [] + for fixture in journey_fixtures: + results = await run_journey_fixture_eval( + fixture, + models=models, + judge_model=args.judge_model, + ) + all_results.extend(results) + + print_journey_results(all_results) + def main() -> None: args = _parse_args() @@ -172,6 +249,8 @@ def main() -> None: if args.command == "run": asyncio.run(_cmd_run(args)) + elif args.command == "journey": + asyncio.run(_cmd_journey(args)) elif args.command == "list": _cmd_list(args) elif args.command == "sync": diff --git a/services/batch-agent/eval/config.py b/services/batch-agent/eval/config.py index 052be33..0b61147 100644 --- a/services/batch-agent/eval/config.py +++ b/services/batch-agent/eval/config.py @@ -40,6 +40,31 @@ A *fixture* is a YAML file that defines a complete test scenario: # Optional: models to test (overrides CLI --models) models: [] + +A *journey fixture* tests the prompt-template builder conversation: + +.. code-block:: yaml + + type: journey + name: journey-invoices + description: Test journey builds a good template for invoices + directory: sample_files/invoices + data_types: [tasks, notes] + + # Simulated user responses for multi-turn conversation + user_messages: + - "I want to extract action items and meeting summaries" + - "Yes, map URGENTE to high priority" + - "That looks good, generate the template" + + # Criteria the generated prompt_template should satisfy + expected_template_criteria: + - "mentions tasks and notes as target entities" + - "includes priority mapping rules" + - "references isAiSuggested=1" + - "does not mention projectId" + + models: [] """ from __future__ import annotations @@ -121,9 +146,73 @@ def discover_fixtures(fixtures_dir: Path | None = None) -> list[EvalFixture]: for yaml_path in sorted(fixtures_dir.glob("*.yaml")): try: + raw = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + if raw.get("type") == "journey": + continue # Skip journey fixtures fixtures.append(EvalFixture.from_yaml(yaml_path)) logger.info("eval: loaded fixture %s from %s", fixtures[-1].name, yaml_path.name) except Exception as exc: logger.error("eval: failed to load fixture %s: %s", yaml_path.name, exc) return fixtures + + +# ── Journey fixtures ───────────────────────────────────────────────────── + + +@dataclass +class JourneyFixture: + """A journey test scenario — tests the prompt_template builder conversation.""" + + name: str + description: str + directory: str # relative path to sample files + data_types: list[str] + user_messages: list[str] # simulated user responses + expected_template_criteria: list[str] # what the template should contain/satisfy + models: list[str] + fixture_path: Path = field(default_factory=lambda: Path(".")) + + @property + def fixture_dir(self) -> Path: + """Absolute path to the sample files directory.""" + return self.fixture_path.parent / self.directory + + @classmethod + def from_yaml(cls, path: Path) -> "JourneyFixture": + """Load a journey fixture from a YAML file.""" + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + + return cls( + name=raw["name"], + description=raw.get("description", ""), + directory=raw.get("directory", "sample_files"), + data_types=raw.get("data_types", ["tasks"]), + user_messages=raw.get("user_messages", []), + expected_template_criteria=raw.get("expected_template_criteria", []), + models=raw.get("models", []), + fixture_path=path, + ) + + +def discover_journey_fixtures(fixtures_dir: Path | None = None) -> list[JourneyFixture]: + """Find and load all journey YAML fixtures in the fixtures directory.""" + if fixtures_dir is None: + fixtures_dir = Path(__file__).parent / "fixtures" + + fixtures: list[JourneyFixture] = [] + if not fixtures_dir.is_dir(): + logger.warning("eval: fixtures directory not found: %s", fixtures_dir) + return fixtures + + for yaml_path in sorted(fixtures_dir.glob("*.yaml")): + try: + raw = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + if raw.get("type") != "journey": + continue + fixtures.append(JourneyFixture.from_yaml(yaml_path)) + logger.info("eval: loaded journey fixture %s from %s", fixtures[-1].name, yaml_path.name) + except Exception as exc: + logger.error("eval: failed to load journey fixture %s: %s", yaml_path.name, exc) + + return fixtures diff --git a/services/batch-agent/eval/fixtures/journey_invoice_setup.yaml b/services/batch-agent/eval/fixtures/journey_invoice_setup.yaml new file mode 100644 index 0000000..b53ad2d --- /dev/null +++ b/services/batch-agent/eval/fixtures/journey_invoice_setup.yaml @@ -0,0 +1,46 @@ +# Journey Fixture: journey-invoice-setup +# Tests that the journey chatbot correctly builds a prompt_template +# for extracting tasks and notes from Italian invoices and meeting notes. + +type: journey +name: journey-invoice-setup +description: > + Test the journey chatbot's ability to explore a directory of Italian + invoices and meeting notes, ask relevant questions, and produce a + well-structured prompt_template for data extraction. + +directory: sample_files/invoices +data_types: [tasks, notes, timelines] + +# Simulated user responses (the journey starts with the LLM exploring +# the directory and asking its first question) +user_messages: + - > + I want to extract action items from invoices and meeting notes. + The invoices are in Italian and contain work descriptions with + deadlines. Meeting notes have action items with checkboxes. + - > + Yes, map Italian priority keywords: "URGENTE" and "ALTA PRIORITÀ" + should be high priority, "media priorità" is medium, "bassa priorità" + is low. Items marked with [x] are already completed. + - > + For notes, I want meeting summaries with the full content including + decisions and attendees. For timelines, extract deadlines and + scheduled meeting dates. + - > + That's everything I need. Please generate the template. + +# Criteria the generated prompt_template must satisfy +# Each is scored 0-1 by an LLM judge +expected_template_criteria: + - "Mentions creating tasks from action items and work descriptions" + - "Includes Italian priority keyword mapping (URGENTE→high, media priorità→medium, bassa priorità→low)" + - "Handles completed items marked with [x] as status done" + - "Mentions creating notes from meeting summaries" + - "Mentions extracting timeline events from deadlines and meeting dates" + - "Sets isAiSuggested=1 on all created records" + - "Does NOT include projectId assignment logic" + - "Uses camelCase field names (title, status, priority, dueDate, content)" + +# Models to test (empty = use CLI --models default) +models: [] diff --git a/services/batch-agent/eval/journey_runner.py b/services/batch-agent/eval/journey_runner.py new file mode 100644 index 0000000..f49b57a --- /dev/null +++ b/services/batch-agent/eval/journey_runner.py @@ -0,0 +1,372 @@ +"""Journey eval runner — tests the prompt_template builder conversation. + +For each (journey_fixture × model) combination: +1. Build a MockExecutor (for filesystem tools used during journey) +2. Patch execute_on_client +3. Override LLM_MODEL +4. Call handle_journey_start to kick off the conversation +5. Feed simulated user_messages via handle_journey_message +6. Collect the generated prompt_template +7. Score it against expected_template_criteria (via LLM judge) +8. Report to Langfuse +""" + +from __future__ import annotations + +import asyncio +import copy +import json +import logging +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from langchain_core.messages import HumanMessage, SystemMessage + +from eval.config import JourneyFixture +from eval.mock_executor import MockExecutor +from eval import langfuse_eval + +logger = logging.getLogger(__name__) + + +# ── Result type ────────────────────────────────────────────────────────── + + +@dataclass +class JourneyEvalResult: + """Result of one journey eval run.""" + + fixture_name: str + model: str + prompt_template: str | None # the generated template (None if journey failed) + conversation_turns: int + done: bool # whether journey reached completion + criteria_scores: dict[str, float] # criterion → 0-1 score + overall_score: float # average of criteria scores + judge_reasoning: str + elapsed_seconds: float + + def summary(self) -> dict[str, Any]: + return { + "fixture": self.fixture_name, + "model": self.model, + "done": self.done, + "turns": self.conversation_turns, + "overall_score": round(self.overall_score, 3), + "criteria_scores": {k: round(v, 3) for k, v in self.criteria_scores.items()}, + "elapsed_s": round(self.elapsed_seconds, 1), + } + + +# ── LLM judge for template quality ────────────────────────────────────── + +_JOURNEY_JUDGE_SYSTEM = """\ +You are an evaluation judge for AI-generated prompt templates. + +A journey chatbot explored a user's directory structure and through +conversation produced a prompt_template — an instruction set for a +data-extraction agent. + +Your task: evaluate the generated template against a list of criteria. +Score each criterion from 0 to 1: + - 1.0: Fully satisfied, clearly present in the template + - 0.5: Partially satisfied or ambiguously addressed + - 0.0: Not satisfied, missing from the template + +Respond with ONLY a JSON object: +{ + "scores": {"criterion_1": 0.8, "criterion_2": 1.0, ...}, + "reasoning": "Brief explanation" +} +""" + + +async def _judge_template( + prompt_template: str, + criteria: list[str], + *, + judge_model: str = "gpt-4o-mini", +) -> tuple[dict[str, float], str]: + """Use an LLM to evaluate a generated prompt_template against criteria. + + Returns (criteria_scores, reasoning). + """ + from app.llm import get_llm + + llm = get_llm(model=judge_model, temperature=0) + + criteria_text = "\n".join(f" {i+1}. {c}" for i, c in enumerate(criteria)) + user_content = ( + f"## Generated prompt_template\n```\n{prompt_template}\n```\n\n" + f"## Criteria to evaluate\n{criteria_text}" + ) + + try: + response = await llm.ainvoke([ + SystemMessage(content=_JOURNEY_JUDGE_SYSTEM), + HumanMessage(content=user_content), + ]) + raw = response.content.strip() + if raw.startswith("```"): + raw = raw.split("```")[1] + if raw.startswith("json"): + raw = raw[4:] + parsed = json.loads(raw.strip()) + + scores_raw = parsed.get("scores", {}) + # Map criterion keys back to the original criteria text + criteria_scores: dict[str, float] = {} + for i, criterion in enumerate(criteria): + # Try matching by index key or exact criterion text + key_candidates = [ + f"criterion_{i+1}", + criterion, + criterion[:50], + str(i + 1), + ] + score = 0.0 + for key in key_candidates: + if key in scores_raw: + score = float(scores_raw[key]) + break + # If no match found, try values in order + if score == 0.0 and i < len(scores_raw): + score = float(list(scores_raw.values())[i]) + criteria_scores[criterion] = score + + reasoning = str(parsed.get("reasoning", "")) + return criteria_scores, reasoning + except Exception as exc: + logger.warning("journey_eval: LLM judge failed: %s", exc) + return {c: 0.0 for c in criteria}, f"Judge error: {exc}" + + +# ── Journey runner ─────────────────────────────────────────────────────── + + +async def run_single_journey_eval( + fixture: JourneyFixture, + model: str, + *, + judge_model: str = "gpt-4o-mini", +) -> JourneyEvalResult: + """Execute one journey eval: start → messages → score template.""" + from shared.config import settings + + # Build mock executor for filesystem tools + mock = MockExecutor( + fixture_dir=fixture.fixture_dir, + seed_records={}, + ) + + original_model = settings.LLM_MODEL + settings.LLM_MODEL = model + + eval_user_id = f"eval-journey-{uuid.uuid4().hex[:8]}" + + logger.info( + "journey_eval: starting %s | model=%s", + fixture.name, model, + ) + start_time = time.time() + + prompt_template: str | None = None + conversation: list[dict[str, str]] = [] + done = False + + try: + from app.ws_context import set_current_user, clear_current_user + from app.journey import handle_journey_start, handle_journey_message, _sessions + + set_current_user(eval_user_id) + with mock.patch(): + # ── Start the journey ──────────────────────────────── + start_frame: dict[str, Any] = { + "agent_type": "local", + "directory": fixture.directory, + "data_types": fixture.data_types, + "session_id": f"eval-{uuid.uuid4().hex[:8]}", + } + + reply = await handle_journey_start(eval_user_id, start_frame) + session_id = reply["session_id"] + conversation.append({"role": "assistant", "content": reply["message"]}) + + logger.info( + "journey_eval: start reply (%d chars), done=%s", + len(reply["message"]), reply["done"], + ) + + if reply["done"]: + prompt_template = reply.get("prompt_template") + done = True + else: + # ── Send user messages ─────────────────────────── + for i, user_msg in enumerate(fixture.user_messages): + if done: + break + + conversation.append({"role": "user", "content": user_msg}) + + msg_frame: dict[str, Any] = { + "session_id": session_id, + "message": user_msg, + } + reply = await handle_journey_message(eval_user_id, msg_frame) + conversation.append({"role": "assistant", "content": reply["message"]}) + + logger.info( + "journey_eval: turn %d reply (%d chars), done=%s", + i + 1, len(reply["message"]), reply["done"], + ) + + if reply["done"]: + prompt_template = reply.get("prompt_template") + done = True + + # If not done after all user messages, send a final nudge + if not done: + nudge = "Please generate the final prompt_template now. I'm satisfied with the configuration." + conversation.append({"role": "user", "content": nudge}) + + nudge_frame: dict[str, Any] = { + "session_id": session_id, + "message": nudge, + } + reply = await handle_journey_message(eval_user_id, nudge_frame) + conversation.append({"role": "assistant", "content": reply["message"]}) + if reply["done"]: + prompt_template = reply.get("prompt_template") + done = True + + except Exception as exc: + logger.error("journey_eval: pipeline failed for %s/%s: %s", fixture.name, model, exc) + finally: + settings.LLM_MODEL = original_model + from app.ws_context import clear_current_user + clear_current_user() + + elapsed = time.time() - start_time + turns = len([c for c in conversation if c["role"] == "user"]) + + logger.info( + "journey_eval: completed in %.1fs — %d turns, done=%s, template=%s", + elapsed, turns, done, "yes" if prompt_template else "no", + ) + + # ── Score the template ─────────────────────────────────────── + criteria_scores: dict[str, float] = {} + judge_reasoning = "" + + if prompt_template and fixture.expected_template_criteria: + criteria_scores, judge_reasoning = await _judge_template( + prompt_template, + fixture.expected_template_criteria, + judge_model=judge_model, + ) + elif not prompt_template: + criteria_scores = {c: 0.0 for c in fixture.expected_template_criteria} + judge_reasoning = "No prompt_template was generated — journey did not complete." + + overall = ( + sum(criteria_scores.values()) / len(criteria_scores) + if criteria_scores + else 0.0 + ) + + result = JourneyEvalResult( + fixture_name=fixture.name, + model=model, + prompt_template=prompt_template, + conversation_turns=turns, + done=done, + criteria_scores=criteria_scores, + overall_score=overall, + judge_reasoning=judge_reasoning, + elapsed_seconds=elapsed, + ) + + # ── Report to Langfuse ─────────────────────────────────────── + trace_id = langfuse_eval.log_eval_trace( + fixture_name=fixture.name, + model=model, + prompt_variant="journey", + prompt_template=prompt_template or "(not generated)", + actual_mutations=[{"conversation": conversation[:20]}], + scores_summary=result.summary(), + ) + + if trace_id: + from eval.scorer import EvalScores + scores_obj = EvalScores( + fixture_name=fixture.name, + model=model, + prompt_variant="journey", + precision=overall, + recall=float(done), + f1=overall, + llm_judge_score=overall, + llm_judge_reasoning=judge_reasoning, + ) + langfuse_eval.post_eval_scores(scores_obj, trace_id=trace_id) + + return result + + +async def run_journey_fixture_eval( + fixture: JourneyFixture, + models: list[str], + *, + judge_model: str = "gpt-4o-mini", +) -> list[JourneyEvalResult]: + """Run all models for a journey fixture.""" + langfuse_eval.sync_journey_fixture_to_dataset(fixture) + + results: list[JourneyEvalResult] = [] + for model in models: + result = await run_single_journey_eval( + fixture, model, judge_model=judge_model, + ) + results.append(result) + + return results + + +def print_journey_results(results: list[JourneyEvalResult]) -> None: + """Print a formatted summary of journey eval results.""" + if not results: + print("\nNo journey eval results.") + return + + print("\n" + "=" * 95) + print(f"{'Fixture':<25} {'Model':<25} {'Done':>5} {'Turns':>6} {'Score':>7} {'Time':>7}") + print("-" * 95) + + for r in results: + done_str = "yes" if r.done else "NO" + print( + f"{r.fixture_name:<25} {r.model:<25} {done_str:>5} " + f"{r.conversation_turns:>6} {r.overall_score:>7.2f} {r.elapsed_seconds:>6.1f}s" + ) + + print("=" * 95) + + # Criteria breakdown + for r in results: + if r.criteria_scores: + print(f"\n[{r.model}] Criteria scores:") + for criterion, score in r.criteria_scores.items(): + indicator = "PASS" if score >= 0.7 else "PARTIAL" if score >= 0.4 else "FAIL" + print(f" {indicator:>7} ({score:.1f}) {criterion}") + + if r.judge_reasoning: + print(f" Judge: {r.judge_reasoning}") + + if r.prompt_template: + preview = r.prompt_template[:200].replace("\n", " ") + print(f" Template preview: {preview}...") + + print() diff --git a/services/batch-agent/eval/langfuse_eval.py b/services/batch-agent/eval/langfuse_eval.py index b666c3b..8ce2cbd 100644 --- a/services/batch-agent/eval/langfuse_eval.py +++ b/services/batch-agent/eval/langfuse_eval.py @@ -96,6 +96,52 @@ def sync_fixture_to_dataset(fixture: EvalFixture) -> str | None: return dataset_name +def sync_journey_fixture_to_dataset(fixture) -> str | None: + """Create or update a Langfuse dataset from a journey fixture. + + Each journey fixture becomes a single dataset item with: + - input: {directory, data_types, user_messages} + - expected_output: {criteria} + """ + lf = _get_langfuse() + if lf is None: + logger.info("langfuse_eval: Langfuse not configured — skipping journey dataset sync") + return None + + dataset_name = f"journey-eval-{fixture.name}" + + try: + lf.create_dataset( + name=dataset_name, + description=fixture.description, + metadata={"type": "journey", "data_types": fixture.data_types}, + ) + except Exception: + pass # Dataset may already exist + + item_id = f"{fixture.name}--journey" + try: + lf.create_dataset_item( + dataset_name=dataset_name, + id=item_id, + input={ + "directory": fixture.directory, + "data_types": fixture.data_types, + "user_messages": fixture.user_messages, + }, + expected_output={ + "criteria": fixture.expected_template_criteria, + }, + metadata={"type": "journey"}, + ) + except Exception as exc: + logger.warning("langfuse_eval: failed to upsert journey dataset item %s: %s", item_id, exc) + + lf.flush() + logger.info("langfuse_eval: synced journey fixture '%s' → dataset '%s'", fixture.name, dataset_name) + return dataset_name + + def create_eval_run( dataset_name: str, run_name: str,