feat(eval): add custom system prompt support for step-1 classification
This commit is contained in:
@@ -400,6 +400,7 @@ async def _classify_file(
|
||||
projects: list[dict],
|
||||
config_data_types: list[str],
|
||||
langfuse_handler: Any | None = None,
|
||||
custom_system_prompt: str | None = None,
|
||||
) -> tuple[str, list[str], str | None]:
|
||||
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
|
||||
|
||||
@@ -421,14 +422,20 @@ async def _classify_file(
|
||||
if d in _DOMAIN_DESCRIPTIONS
|
||||
)
|
||||
|
||||
system = tracing.compile_prompt(
|
||||
"batch_file_classifier",
|
||||
fallback=_STEP1_SYSTEM_PROMPT,
|
||||
variables={
|
||||
"domain_definitions": domain_definitions,
|
||||
"projects_list": projects_list,
|
||||
},
|
||||
)
|
||||
if custom_system_prompt:
|
||||
# Fixture-provided prompt takes absolute priority
|
||||
system = custom_system_prompt.format_map(
|
||||
{"domain_definitions": domain_definitions, "projects_list": projects_list}
|
||||
)
|
||||
else:
|
||||
system = tracing.compile_prompt(
|
||||
"batch_file_classifier",
|
||||
fallback=_STEP1_SYSTEM_PROMPT,
|
||||
variables={
|
||||
"domain_definitions": domain_definitions,
|
||||
"projects_list": projects_list,
|
||||
},
|
||||
)
|
||||
|
||||
llm = get_llm(callbacks=[langfuse_handler] if langfuse_handler else None)
|
||||
try:
|
||||
|
||||
@@ -71,6 +71,7 @@ class EvalFixture:
|
||||
# ── Step-1 inputs (classification) ───────────────────────────
|
||||
domain_definitions: str = ""
|
||||
projects_list: list[dict[str, Any]] = field(default_factory=list)
|
||||
custom_step1_prompt: str = ""
|
||||
|
||||
# ── Step-2 inputs (processing) ───────────────────────────────
|
||||
existing_context: str = ""
|
||||
|
||||
@@ -47,30 +47,47 @@ async def _run_step1(
|
||||
model: str,
|
||||
mock: MockExecutor,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Run step-1 classification for each expected file.
|
||||
"""Run step-1 classification for every file in the fixture directory.
|
||||
|
||||
Returns a list of result dicts:
|
||||
Scans the directory recursively, classifies each file, and returns
|
||||
a list of result dicts:
|
||||
``[{file, project_id, domains, new_project_name}, ...]``
|
||||
"""
|
||||
from app.agent_runner import _classify_file
|
||||
|
||||
# Build project name lookup for display
|
||||
proj_names: dict[str, str] = {
|
||||
p.get("id", ""): p.get("name", "") for p in fixture.projects_list
|
||||
}
|
||||
|
||||
# Discover all files in the fixture directory
|
||||
all_files = await _scan_fixture_files(mock, fixture.directory)
|
||||
print(f"\n Scanning {len(all_files)} files in {fixture.directory}\n")
|
||||
|
||||
results: list[dict[str, Any]] = []
|
||||
for ec in fixture.expected_classification:
|
||||
# Read the file content through the mock
|
||||
for i, file_path in enumerate(all_files, 1):
|
||||
file_result = await mock._handle(
|
||||
action="read_file_content",
|
||||
data={"path": ec.file},
|
||||
data={"path": file_path},
|
||||
)
|
||||
file_content: str = file_result.get("content", "")
|
||||
if not file_content.strip():
|
||||
continue
|
||||
|
||||
project_id, domains, new_name = await _classify_file(
|
||||
file_path=ec.file,
|
||||
file_path=file_path,
|
||||
file_content=file_content,
|
||||
projects=fixture.projects_list,
|
||||
config_data_types=fixture.data_types,
|
||||
custom_system_prompt=fixture.custom_step1_prompt or None,
|
||||
)
|
||||
|
||||
short_name = file_path.rsplit("/", 1)[-1] if "/" in file_path else file_path
|
||||
proj_label = proj_names.get(project_id, new_name or "?")
|
||||
print(f" [{i}/{len(all_files)}] {short_name} → {project_id} ({proj_label}) {domains}")
|
||||
|
||||
results.append({
|
||||
"file": ec.file,
|
||||
"file": file_path,
|
||||
"project_id": project_id,
|
||||
"domains": domains,
|
||||
"new_project_name": new_name,
|
||||
@@ -78,22 +95,64 @@ async def _run_step1(
|
||||
return results
|
||||
|
||||
|
||||
async def _scan_fixture_files(mock: MockExecutor, directory: str) -> list[str]:
|
||||
"""Recursively list all files under *directory* via the mock executor."""
|
||||
files: list[str] = []
|
||||
|
||||
async def _walk(path: str) -> None:
|
||||
result = await mock._handle(action="list_directory", data={"path": path})
|
||||
for entry in result.get("entries", []):
|
||||
if entry.get("type") == "directory":
|
||||
await _walk(entry["path"])
|
||||
elif entry.get("type") == "file":
|
||||
files.append(entry["path"])
|
||||
|
||||
await _walk(directory)
|
||||
return sorted(files)
|
||||
|
||||
|
||||
def _score_step1(
|
||||
fixture: EvalFixture,
|
||||
results: list[dict[str, Any]],
|
||||
) -> tuple[float, float, float, str]:
|
||||
"""Score step-1 results. Returns (precision, recall, f1, reasoning)."""
|
||||
"""Score step-1 results. Returns (precision, recall, f1, reasoning).
|
||||
|
||||
Files with expected classifications are scored (OK/FAIL).
|
||||
Files without expectations are shown as informational (INFO).
|
||||
"""
|
||||
if not fixture.expected_classification:
|
||||
return 0.0, 0.0, 0.0, "No expected classifications"
|
||||
|
||||
# Build project name lookup
|
||||
proj_names: dict[str, str] = {
|
||||
p.get("id", ""): p.get("name", "") for p in fixture.projects_list
|
||||
}
|
||||
proj_names["new"] = "(new project)"
|
||||
|
||||
def _proj_label(pid: str, new_name: str | None = None) -> str:
|
||||
name = proj_names.get(pid, "?")
|
||||
if pid == "new" and new_name:
|
||||
return f"new → \"{new_name}\""
|
||||
return f"{pid} ({name})" if name and name != "?" else pid
|
||||
|
||||
def _short_file(path: str) -> str:
|
||||
"""Use just the filename for cleaner display."""
|
||||
return path.rsplit("/", 1)[-1] if "/" in path else path
|
||||
|
||||
expected_files = {ec.file for ec in fixture.expected_classification}
|
||||
total = len(fixture.expected_classification)
|
||||
matched = 0
|
||||
details: list[str] = []
|
||||
|
||||
scored_lines: list[str] = []
|
||||
info_lines: list[str] = []
|
||||
|
||||
# Score expected files
|
||||
for ec in fixture.expected_classification:
|
||||
actual = next((r for r in results if r["file"] == ec.file), None)
|
||||
fname = _short_file(ec.file)
|
||||
if actual is None:
|
||||
details.append(f" MISS {ec.file}: not processed")
|
||||
scored_lines.append(f" MISS {fname}")
|
||||
scored_lines.append(f" expected: {_proj_label(ec.project_id)}")
|
||||
continue
|
||||
|
||||
pid_ok = actual["project_id"] == ec.project_id
|
||||
@@ -101,20 +160,41 @@ def _score_step1(
|
||||
|
||||
if pid_ok and domains_ok:
|
||||
matched += 1
|
||||
details.append(f" OK {ec.file}: project={actual['project_id']}, domains={actual['domains']}")
|
||||
scored_lines.append(f" OK {fname}")
|
||||
scored_lines.append(f" project: {_proj_label(actual['project_id'])}")
|
||||
scored_lines.append(f" domains: {actual['domains']}")
|
||||
else:
|
||||
parts: list[str] = []
|
||||
scored_lines.append(f" FAIL {fname}")
|
||||
if not pid_ok:
|
||||
parts.append(f"project expected={ec.project_id} got={actual['project_id']}")
|
||||
scored_lines.append(f" project: {_proj_label(actual['project_id'])} (expected: {_proj_label(ec.project_id)})")
|
||||
else:
|
||||
scored_lines.append(f" project: {_proj_label(actual['project_id'])}")
|
||||
if not domains_ok:
|
||||
parts.append(f"domains expected={ec.domains} got={actual['domains']}")
|
||||
details.append(f" FAIL {ec.file}: {'; '.join(parts)}")
|
||||
scored_lines.append(f" domains: {actual['domains']} (expected: {ec.domains})")
|
||||
else:
|
||||
scored_lines.append(f" domains: {actual['domains']}")
|
||||
|
||||
# Show unscored files
|
||||
for r in results:
|
||||
if r["file"] not in expected_files:
|
||||
fname = _short_file(r["file"])
|
||||
proj = _proj_label(r["project_id"], r.get("new_project_name"))
|
||||
info_lines.append(f" · {fname}")
|
||||
info_lines.append(f" project: {proj} | domains: {r['domains']}")
|
||||
|
||||
precision = matched / total if total > 0 else 0.0
|
||||
recall = precision # in step1, precision == recall (same denominator)
|
||||
f1 = precision # same
|
||||
reasoning = "\n".join(details)
|
||||
return precision, recall, f1, reasoning
|
||||
recall = precision
|
||||
f1 = precision
|
||||
|
||||
parts: list[str] = []
|
||||
if scored_lines:
|
||||
parts.append(f"Scored ({matched}/{total}):")
|
||||
parts.extend(scored_lines)
|
||||
if info_lines:
|
||||
parts.append(f"\nOther files ({len(info_lines) // 2}):")
|
||||
parts.extend(info_lines)
|
||||
|
||||
return precision, recall, f1, "\n".join(parts)
|
||||
|
||||
|
||||
# ── Step 2 runner ─────────────────────────────────────────────────────────
|
||||
@@ -438,26 +518,28 @@ def print_results(results: list[EvalScores]) -> None:
|
||||
print("\nNo eval results.")
|
||||
return
|
||||
|
||||
print("\n" + "=" * 95)
|
||||
W = 90
|
||||
|
||||
print("\n" + "=" * W)
|
||||
print(f"{'Fixture':<25} {'Mode':<6} {'Model':<25} {'P':>6} {'R':>6} {'F1':>6} {'FA':>6} {'LLM':>6}")
|
||||
print("-" * 95)
|
||||
print("-" * W)
|
||||
|
||||
for s in results:
|
||||
llm_str = f"{s.llm_judge_score:.2f}" if s.llm_judge_score is not None else " --"
|
||||
fa_str = f"{s.field_accuracy:.2f}" if s.field_scores else " --"
|
||||
print(
|
||||
f"{s.fixture_name:<25} {s.prompt_variant:<6} {s.model:<25} "
|
||||
f"{s.precision:>6.2f} {s.recall:>6.2f} {s.f1:>6.2f} "
|
||||
f"{s.field_accuracy:>6.2f} {llm_str:>6}"
|
||||
f"{fa_str:>6} {llm_str:>6}"
|
||||
)
|
||||
|
||||
print("=" * 95)
|
||||
print()
|
||||
print("=" * W)
|
||||
|
||||
print("=" * 90)
|
||||
|
||||
# If LLM judge reasoning is available, print it
|
||||
for s in results:
|
||||
if s.llm_judge_reasoning:
|
||||
print(f"\n[{s.model} / {s.prompt_variant}] LLM Judge: {s.llm_judge_reasoning}")
|
||||
print(f"\n{'─' * W}")
|
||||
print(f" {s.fixture_name} | {s.model} | {s.prompt_variant}")
|
||||
print(f"{'─' * W}")
|
||||
print(s.llm_judge_reasoning)
|
||||
|
||||
print()
|
||||
|
||||
Reference in New Issue
Block a user