refactor(tests): simplify YAML fixture schema and test runner
YAML: rimosse op/description/score_name/assertions block — ora detect/process come chiave diretta, assertions piatte sullo stesso livello del caso. Runner: eliminato _run_assertions engine, assertions inline in test_preprocess. Riduzione da ~170 a ~75 righe totali tra YAML + test. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,26 +1,15 @@
|
||||
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
|
||||
|
||||
Fixtures are driven by:
|
||||
tests/fixtures/preprocessors/cases.yaml — test case definitions
|
||||
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
|
||||
Fixtures: tests/fixtures/preprocessors/cases.yaml + data/
|
||||
|
||||
Run:
|
||||
pytest tests/test_preprocessors.py -v
|
||||
|
||||
# Only detection tests
|
||||
pytest tests/test_preprocessors.py -v -k detect
|
||||
|
||||
# Only preprocess tests
|
||||
pytest tests/test_preprocessors.py -v -k preprocess
|
||||
|
||||
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
@@ -28,144 +17,77 @@ import yaml
|
||||
from app.core.langfuse_client import get_langfuse
|
||||
from app.core.preprocessors import detect_content_type, preprocess
|
||||
|
||||
# ── Paths ──────────────────────────────────────────────────────────────
|
||||
_DATA_DIR = Path(__file__).parent / "fixtures" / "preprocessors" / "data"
|
||||
_CASES_FILE = Path(__file__).parent / "fixtures" / "preprocessors" / "cases.yaml"
|
||||
|
||||
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
|
||||
_DATA_DIR = _FIXTURES_DIR / "data"
|
||||
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
|
||||
|
||||
# ── Content generators ─────────────────────────────────────────────────
|
||||
|
||||
_GENERATORS: dict[str, str] = {
|
||||
# High ratio of non-printable chars → triggers "unknown" heuristic
|
||||
_GENERATORS = {
|
||||
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
|
||||
}
|
||||
|
||||
|
||||
def _load_cases() -> list[dict]:
|
||||
with _CASES_FILE.open(encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)["cases"]
|
||||
def _cases():
|
||||
return yaml.safe_load(_CASES_FILE.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def _read_content(case: dict) -> str:
|
||||
def _content(case: dict) -> str:
|
||||
if "generate" in case:
|
||||
key = case["generate"]
|
||||
if key not in _GENERATORS:
|
||||
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
|
||||
return _GENERATORS[key]
|
||||
file_path = _DATA_DIR / case["file"]
|
||||
return file_path.read_text(encoding="utf-8")
|
||||
return _GENERATORS[case["generate"]]
|
||||
return (_DATA_DIR / case["file"]).read_text(encoding="utf-8")
|
||||
|
||||
|
||||
# ── Langfuse helper ───────────────────────────────────────────────────
|
||||
|
||||
def _lf_score(score_name: str, value: float, comment: str = "") -> None:
|
||||
def _lf_score(name: str, value: float, comment: str = "") -> None:
|
||||
lf = get_langfuse()
|
||||
if lf:
|
||||
trace = lf.trace(name=f"eval-{score_name}")
|
||||
lf.score(
|
||||
trace_id=trace.id,
|
||||
name=score_name,
|
||||
value=value,
|
||||
data_type="NUMERIC",
|
||||
comment=comment,
|
||||
)
|
||||
trace = lf.trace(name=f"eval-{name}")
|
||||
lf.score(trace_id=trace.id, name=name, value=value, data_type="NUMERIC", comment=comment)
|
||||
lf.flush()
|
||||
|
||||
|
||||
# ── Assertion engine ──────────────────────────────────────────────────
|
||||
# ── detect ────────────────────────────────────────────────────────────
|
||||
|
||||
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> list[str]:
|
||||
"""Run all assertions declared in the YAML case. Returns failure messages."""
|
||||
failures: list[str] = []
|
||||
|
||||
if assertions.get("no_html_tags"):
|
||||
if re.search(r"<[^>]+>", result.clean_text):
|
||||
failures.append("clean_text still contains HTML tags")
|
||||
|
||||
min_len = assertions.get("min_length")
|
||||
if min_len is not None:
|
||||
if len(result.clean_text) < min_len:
|
||||
failures.append(
|
||||
f"clean_text too short: {len(result.clean_text)} < {min_len}"
|
||||
)
|
||||
|
||||
ratio_lt = assertions.get("compression_ratio_lt")
|
||||
if ratio_lt is not None and len(raw) > 0:
|
||||
ratio = len(result.clean_text) / len(raw)
|
||||
if ratio >= ratio_lt:
|
||||
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
|
||||
|
||||
meta_keys = assertions.get("metadata_keys", [])
|
||||
for key in meta_keys:
|
||||
if not result.metadata.get(key):
|
||||
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
|
||||
|
||||
contains = assertions.get("contains")
|
||||
if contains:
|
||||
items = [contains] if isinstance(contains, str) else contains
|
||||
for item in items:
|
||||
if item not in result.clean_text:
|
||||
failures.append(f"clean_text missing expected substring: {item!r}")
|
||||
|
||||
not_contains = assertions.get("not_contains")
|
||||
if not_contains:
|
||||
items = [not_contains] if isinstance(not_contains, str) else not_contains
|
||||
for item in items:
|
||||
if item in result.clean_text:
|
||||
failures.append(f"clean_text contains forbidden substring: {item!r}")
|
||||
|
||||
expected_ct = assertions.get("content_type")
|
||||
if expected_ct and result.content_type != expected_ct:
|
||||
failures.append(
|
||||
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
|
||||
)
|
||||
|
||||
return failures
|
||||
_detect = [c for c in _cases() if "detect" in c]
|
||||
|
||||
|
||||
# ── Parametrized: detect ──────────────────────────────────────────────
|
||||
|
||||
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
_detect_cases,
|
||||
ids=[c["id"] for c in _detect_cases],
|
||||
)
|
||||
@pytest.mark.parametrize("case", _detect, ids=[c["id"] for c in _detect])
|
||||
def test_detect(case: dict) -> None:
|
||||
raw = _read_content(case)
|
||||
ct = detect_content_type(case["input_filename"], raw)
|
||||
|
||||
expected = case["expected_content_type"]
|
||||
score = 1.0 if ct == expected else 0.0
|
||||
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
|
||||
|
||||
assert ct == expected, (
|
||||
f"[{case['id']}] {case['description']}: "
|
||||
f"expected content_type={expected!r}, got {ct!r}"
|
||||
)
|
||||
raw = _content(case)
|
||||
filename = case.get("filename", case.get("file", ""))
|
||||
ct = detect_content_type(filename, raw)
|
||||
expected = case["detect"]
|
||||
_lf_score(f"preprocess.detect.{case['id']}", 1.0 if ct == expected else 0.0)
|
||||
assert ct == expected, f"[{case['id']}] expected {expected!r}, got {ct!r}"
|
||||
|
||||
|
||||
# ── Parametrized: preprocess ──────────────────────────────────────────
|
||||
# ── preprocess ────────────────────────────────────────────────────────
|
||||
|
||||
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
|
||||
_process = [c for c in _cases() if "process" in c]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
_preprocess_cases,
|
||||
ids=[c["id"] for c in _preprocess_cases],
|
||||
)
|
||||
@pytest.mark.parametrize("case", _process, ids=[c["id"] for c in _process])
|
||||
def test_preprocess(case: dict) -> None:
|
||||
raw = _read_content(case)
|
||||
result = preprocess(case["input_content_type"], raw)
|
||||
raw = _content(case)
|
||||
result = preprocess(case["process"], raw)
|
||||
|
||||
assertions = case.get("assertions", {})
|
||||
failures = _run_assertions(assertions, result, raw)
|
||||
if case.get("no_html"):
|
||||
assert not re.search(r"<[^>]+>", result.clean_text), "clean_text contains HTML tags"
|
||||
|
||||
assert not failures, (
|
||||
f"[{case['id']}] {case['description']} — {len(failures)} assertion(s) failed:\n"
|
||||
+ "\n".join(f" • {f}" for f in failures)
|
||||
)
|
||||
if "min_chars" in case:
|
||||
assert len(result.clean_text) >= case["min_chars"], \
|
||||
f"clean_text too short: {len(result.clean_text)} < {case['min_chars']}"
|
||||
|
||||
if "ratio_lt" in case:
|
||||
ratio = len(result.clean_text) / len(raw)
|
||||
assert ratio < case["ratio_lt"], f"compression ratio {ratio:.2f} >= {case['ratio_lt']}"
|
||||
|
||||
for key in case.get("has_meta", []):
|
||||
assert result.metadata.get(key), f"metadata missing {key!r} (got {result.metadata})"
|
||||
|
||||
for item in ([case["contains"]] if isinstance(case.get("contains"), str) else case.get("contains", [])):
|
||||
assert item in result.clean_text, f"clean_text missing {item!r}"
|
||||
|
||||
for item in ([case["excludes"]] if isinstance(case.get("excludes"), str) else case.get("excludes", [])):
|
||||
assert item not in result.clean_text, f"clean_text contains forbidden {item!r}"
|
||||
|
||||
if "content_type" in case:
|
||||
assert result.content_type == case["content_type"], \
|
||||
f"expected content_type {case['content_type']!r}, got {result.content_type!r}"
|
||||
|
||||
Reference in New Issue
Block a user