"""Journey eval runner — tests the prompt_template builder conversation. For each (journey_fixture × model) combination: 1. Build a MockExecutor (for filesystem tools used during journey) 2. Patch execute_on_client 3. Override LLM_MODEL 4. Call handle_journey_start to kick off the conversation 5. Feed simulated user_messages via handle_journey_message 6. Collect the generated prompt_template 7. Score it against expected_template_criteria (via LLM judge) 8. Report to Langfuse """ from __future__ import annotations import asyncio import copy import json import logging import time import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Any from langchain_core.messages import HumanMessage, SystemMessage from eval.config import JourneyFixture from eval.mock_executor import MockExecutor from eval import langfuse_eval logger = logging.getLogger(__name__) # ── Result type ────────────────────────────────────────────────────────── @dataclass class JourneyEvalResult: """Result of one journey eval run.""" fixture_name: str model: str prompt_template: str | None # the generated template (None if journey failed) conversation_turns: int done: bool # whether journey reached completion criteria_scores: dict[str, float] # criterion → 0-1 score overall_score: float # average of criteria scores judge_reasoning: str elapsed_seconds: float def summary(self) -> dict[str, Any]: return { "fixture": self.fixture_name, "model": self.model, "done": self.done, "turns": self.conversation_turns, "overall_score": round(self.overall_score, 3), "criteria_scores": {k: round(v, 3) for k, v in self.criteria_scores.items()}, "elapsed_s": round(self.elapsed_seconds, 1), } # ── LLM judge for template quality ────────────────────────────────────── _JOURNEY_JUDGE_SYSTEM = """\ You are an evaluation judge for AI-generated prompt templates. A journey chatbot explored a user's directory structure and through conversation produced a prompt_template — an instruction set for a data-extraction agent. Your task: evaluate the generated template against a list of criteria. Score each criterion from 0 to 1: - 1.0: Fully satisfied, clearly present in the template - 0.5: Partially satisfied or ambiguously addressed - 0.0: Not satisfied, missing from the template Respond with ONLY a JSON object: { "scores": {"criterion_1": 0.8, "criterion_2": 1.0, ...}, "reasoning": "Brief explanation" } """ async def _judge_template( prompt_template: str, criteria: list[str], *, judge_model: str = "gpt-4o-mini", ) -> tuple[dict[str, float], str]: """Use an LLM to evaluate a generated prompt_template against criteria. Returns (criteria_scores, reasoning). """ from shared.llm import get_llm llm = get_llm(model=judge_model, temperature=0) criteria_text = "\n".join(f" {i+1}. {c}" for i, c in enumerate(criteria)) user_content = ( f"## Generated prompt_template\n```\n{prompt_template}\n```\n\n" f"## Criteria to evaluate\n{criteria_text}" ) try: response = await llm.ainvoke([ SystemMessage(content=_JOURNEY_JUDGE_SYSTEM), HumanMessage(content=user_content), ]) raw = response.content.strip() if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] parsed = json.loads(raw.strip()) scores_raw = parsed.get("scores", {}) # Map criterion keys back to the original criteria text criteria_scores: dict[str, float] = {} for i, criterion in enumerate(criteria): # Try matching by index key or exact criterion text key_candidates = [ f"criterion_{i+1}", criterion, criterion[:50], str(i + 1), ] score = 0.0 for key in key_candidates: if key in scores_raw: score = float(scores_raw[key]) break # If no match found, try values in order if score == 0.0 and i < len(scores_raw): score = float(list(scores_raw.values())[i]) criteria_scores[criterion] = score reasoning = str(parsed.get("reasoning", "")) return criteria_scores, reasoning except Exception as exc: logger.warning("journey_eval: LLM judge failed: %s", exc) return {c: 0.0 for c in criteria}, f"Judge error: {exc}" # ── Journey runner ─────────────────────────────────────────────────────── async def run_single_journey_eval( fixture: JourneyFixture, model: str, *, judge_model: str = "gpt-4o-mini", data_dir: Path | None = None, ) -> JourneyEvalResult: """Execute one journey eval: start \u2192 messages \u2192 score template.""" from shared.config import settings # When data_dir is given, use its parent as MockExecutor root # and its name as the journey directory so the LLM sees a # meaningful path (not "."). if data_dir: mock_root = data_dir.parent journey_directory = data_dir.name else: mock_root = fixture.fixture_path.parent journey_directory = fixture.directory mock = MockExecutor( fixture_dir=mock_root, seed_records={}, ) original_model = settings.LLM_MODEL settings.LLM_MODEL = model eval_user_id = f"eval-journey-{uuid.uuid4().hex[:8]}" logger.info( "journey_eval: starting %s | model=%s", fixture.name, model, ) start_time = time.time() prompt_template: str | None = None conversation: list[dict[str, str]] = [] done = False try: from shared.ws_context import set_current_user, clear_current_user from app.journey import handle_journey_start, handle_journey_message, _sessions set_current_user(eval_user_id) with mock.patch(): # ── Start the journey ──────────────────────────────── start_frame: dict[str, Any] = { "agent_type": "local", "directory": journey_directory, "data_types": fixture.data_types, "session_id": f"eval-{uuid.uuid4().hex[:8]}", } reply = await handle_journey_start(eval_user_id, start_frame) session_id = reply["session_id"] conversation.append({"role": "assistant", "content": reply["message"]}) logger.info( "journey_eval: start reply (%d chars), done=%s", len(reply["message"]), reply["done"], ) if reply["done"]: prompt_template = reply.get("prompt_template") done = True else: # ── Send user messages ─────────────────────────── for i, user_msg in enumerate(fixture.user_messages): if done: break conversation.append({"role": "user", "content": user_msg}) msg_frame: dict[str, Any] = { "session_id": session_id, "message": user_msg, } reply = await handle_journey_message(eval_user_id, msg_frame) conversation.append({"role": "assistant", "content": reply["message"]}) logger.info( "journey_eval: turn %d reply (%d chars), done=%s", i + 1, len(reply["message"]), reply["done"], ) if reply["done"]: prompt_template = reply.get("prompt_template") done = True # If not done after all user messages, send a final nudge if not done: nudge = "Please generate the final prompt_template now. I'm satisfied with the configuration." conversation.append({"role": "user", "content": nudge}) nudge_frame: dict[str, Any] = { "session_id": session_id, "message": nudge, } reply = await handle_journey_message(eval_user_id, nudge_frame) conversation.append({"role": "assistant", "content": reply["message"]}) if reply["done"]: prompt_template = reply.get("prompt_template") done = True except Exception as exc: logger.error("journey_eval: pipeline failed for %s/%s: %s", fixture.name, model, exc) finally: settings.LLM_MODEL = original_model from shared.ws_context import clear_current_user clear_current_user() elapsed = time.time() - start_time turns = len([c for c in conversation if c["role"] == "user"]) logger.info( "journey_eval: completed in %.1fs — %d turns, done=%s, template=%s", elapsed, turns, done, "yes" if prompt_template else "no", ) # ── Score the template ─────────────────────────────────────── criteria_scores: dict[str, float] = {} judge_reasoning = "" if prompt_template and fixture.expected_template_criteria: criteria_scores, judge_reasoning = await _judge_template( prompt_template, fixture.expected_template_criteria, judge_model=judge_model, ) elif not prompt_template: criteria_scores = {c: 0.0 for c in fixture.expected_template_criteria} judge_reasoning = "No prompt_template was generated — journey did not complete." overall = ( sum(criteria_scores.values()) / len(criteria_scores) if criteria_scores else 0.0 ) result = JourneyEvalResult( fixture_name=fixture.name, model=model, prompt_template=prompt_template, conversation_turns=turns, done=done, criteria_scores=criteria_scores, overall_score=overall, judge_reasoning=judge_reasoning, elapsed_seconds=elapsed, ) # ── Report to Langfuse ─────────────────────────────────────── trace_id = langfuse_eval.log_eval_trace( fixture_name=fixture.name, model=model, prompt_variant="journey", prompt_template=prompt_template or "(not generated)", actual_mutations=[{"conversation": conversation[:20]}], scores_summary=result.summary(), langfuse_prompt_names=["journey_system"], ) if trace_id: from eval.scorer import EvalScores scores_obj = EvalScores( fixture_name=fixture.name, model=model, prompt_variant="journey", precision=overall, recall=float(done), f1=overall, llm_judge_score=overall, llm_judge_reasoning=judge_reasoning, ) langfuse_eval.post_eval_scores(scores_obj, trace_id=trace_id) return result async def run_journey_fixture_eval( fixture: JourneyFixture, models: list[str], *, judge_model: str = "gpt-4o-mini", data_dir: Path | None = None, ) -> list[JourneyEvalResult]: """Run all models for a journey fixture.""" langfuse_eval.sync_journey_fixture_to_dataset(fixture) results: list[JourneyEvalResult] = [] for model in models: result = await run_single_journey_eval( fixture, model, judge_model=judge_model, data_dir=data_dir, ) results.append(result) return results def print_journey_results(results: list[JourneyEvalResult]) -> None: """Print a formatted summary of journey eval results.""" if not results: print("\nNo journey eval results.") return print("\n" + "=" * 95) print(f"{'Fixture':<25} {'Model':<25} {'Done':>5} {'Turns':>6} {'Score':>7} {'Time':>7}") print("-" * 95) for r in results: done_str = "yes" if r.done else "NO" print( f"{r.fixture_name:<25} {r.model:<25} {done_str:>5} " f"{r.conversation_turns:>6} {r.overall_score:>7.2f} {r.elapsed_seconds:>6.1f}s" ) print("=" * 95) # Criteria breakdown for r in results: if r.criteria_scores: print(f"\n[{r.model}] Criteria scores:") for criterion, score in r.criteria_scores.items(): indicator = "PASS" if score >= 0.7 else "PARTIAL" if score >= 0.4 else "FAIL" print(f" {indicator:>7} ({score:.1f}) {criterion}") if r.judge_reasoning: print(f" Judge: {r.judge_reasoning}") if r.prompt_template: preview = r.prompt_template[:200].replace("\n", " ") print(f" Template preview: {preview}...") print()