refactor(tests): YAML-driven fixtures for preprocessor tests
- cases.yaml: 10 test cases con schema dichiarativo (op, assertions) - data/: 7 file reali (email_action.html, email_thread.html, email_single.html, email_heavy.html, generic_page.html, notes.txt, fallback.txt) - test_preprocessors.py: parametrize da YAML via test_detect / test_preprocess; assertion engine generico (no_html_tags, min_length, compression_ratio, metadata_keys, contains, not_contains, content_type) - requirements.txt: add PyYAML Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,221 +1,178 @@
|
||||
"""Tests for the preprocessor system (Step 1).
|
||||
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
|
||||
|
||||
Test IDs map to the plan:
|
||||
1.1 detect_email, 1.2 detect_generic, 1.3 detect_text, 1.4 detect_unknown
|
||||
1.5 email_strip, 1.6 email_metadata, 1.7 email_thread, 1.8 email_single
|
||||
1.9 email_heavy_html, 1.10 fallback
|
||||
Fixtures are driven by:
|
||||
tests/fixtures/preprocessors/cases.yaml — test case definitions
|
||||
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
|
||||
|
||||
Run:
|
||||
pytest tests/test_preprocessors.py -v
|
||||
|
||||
# Only detection tests
|
||||
pytest tests/test_preprocessors.py -v -k detect
|
||||
|
||||
# Only preprocess tests
|
||||
pytest tests/test_preprocessors.py -v -k preprocess
|
||||
|
||||
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from app.core.preprocessors import detect_content_type, preprocess
|
||||
from app.core.langfuse_client import get_langfuse
|
||||
from app.core.preprocessors import detect_content_type, preprocess
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────
|
||||
# ── Paths ──────────────────────────────────────────────────────────────
|
||||
|
||||
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
|
||||
_DATA_DIR = _FIXTURES_DIR / "data"
|
||||
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
|
||||
|
||||
# ── Content generators ─────────────────────────────────────────────────
|
||||
|
||||
_GENERATORS: dict[str, str] = {
|
||||
# High ratio of non-printable chars → triggers "unknown" heuristic
|
||||
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_email_html() -> str:
|
||||
return """<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Fix the login bug</title>
|
||||
<style>body { font-family: Arial; color: #333; }</style>
|
||||
</head>
|
||||
<body>
|
||||
<p>Subject: Fix the login bug</p>
|
||||
<p>From: boss@company.com</p>
|
||||
<p>To: dev@company.com</p>
|
||||
<p>Date: Mon, 7 Apr 2026 09:00:00 +0200</p>
|
||||
<p>Please fix the login bug by Friday. It is blocking the release.</p>
|
||||
</body>
|
||||
</html>"""
|
||||
def _load_cases() -> list[dict]:
|
||||
with _CASES_FILE.open(encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)["cases"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_thread_email_html() -> str:
|
||||
return """<!DOCTYPE html>
|
||||
<html><body>
|
||||
<p>From: alice@co.com</p>
|
||||
<p>Subject: Re: Re: Deploy plan</p>
|
||||
<p>Sure, I'll handle the deploy.</p>
|
||||
|
||||
<p>On Mon, Apr 6, 2026 at 3:00 PM, Bob <bob@co.com> wrote:</p>
|
||||
<blockquote>
|
||||
<p>From: bob@co.com</p>
|
||||
<p>Can you handle the deploy?</p>
|
||||
<p>On Sun, Apr 5, 2026 at 1:00 PM, Alice <alice@co.com> wrote:</p>
|
||||
<blockquote>
|
||||
<p>From: alice@co.com</p>
|
||||
<p>Let's plan the deploy for Monday.</p>
|
||||
</blockquote>
|
||||
</blockquote>
|
||||
</body></html>"""
|
||||
def _read_content(case: dict) -> str:
|
||||
if "generate" in case:
|
||||
key = case["generate"]
|
||||
if key not in _GENERATORS:
|
||||
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
|
||||
return _GENERATORS[key]
|
||||
file_path = _DATA_DIR / case["file"]
|
||||
return file_path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_heavy_html_email() -> str:
|
||||
return """<!DOCTYPE html>
|
||||
<html><head>
|
||||
<style>
|
||||
table { border-collapse: collapse; width: 100%; }
|
||||
td { padding: 8px; border: 1px solid #ddd; font-size: 12px; }
|
||||
.header { background: #003366; color: white; }
|
||||
.footer { font-size: 10px; color: #999; }
|
||||
</style>
|
||||
</head><body>
|
||||
<table>
|
||||
<tr class="header"><td colspan="2">Company Newsletter</td></tr>
|
||||
<tr><td>From:</td><td>newsletter@corp.com</td></tr>
|
||||
<tr><td>Subject:</td><td>Q1 Results Update</td></tr>
|
||||
<tr><td>Date:</td><td>Apr 7, 2026</td></tr>
|
||||
<tr><td colspan="2">
|
||||
<p>Dear Team,</p>
|
||||
<p>Q1 results are in. Revenue up 15% year-over-year.</p>
|
||||
<p>Please review the attached report.</p>
|
||||
</td></tr>
|
||||
<tr class="footer"><td colspan="2">Confidential — do not forward</td></tr>
|
||||
</table>
|
||||
</body></html>"""
|
||||
# ── Langfuse helper ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
# ── Helper ────────────────────────────────────────────────────────────
|
||||
|
||||
def _score(name: str, value: float, comment: str = "") -> None:
|
||||
def _lf_score(score_name: str, value: float, comment: str = "") -> None:
|
||||
lf = get_langfuse()
|
||||
if lf:
|
||||
trace = lf.trace(name=f"eval-{name}")
|
||||
lf.score(trace_id=trace.id, name=name, value=value,
|
||||
data_type="NUMERIC", comment=comment)
|
||||
trace = lf.trace(name=f"eval-{score_name}")
|
||||
lf.score(
|
||||
trace_id=trace.id,
|
||||
name=score_name,
|
||||
value=value,
|
||||
data_type="NUMERIC",
|
||||
comment=comment,
|
||||
)
|
||||
lf.flush()
|
||||
|
||||
|
||||
# ── 1.1 — Detect email HTML ───────────────────────────────────────────
|
||||
# ── Assertion engine ──────────────────────────────────────────────────
|
||||
|
||||
def test_detect_email_html(sample_email_html):
|
||||
ct = detect_content_type("email_export.html", sample_email_html)
|
||||
score = 1.0 if ct == "email_html" else 0.0
|
||||
_score("preprocess.detect_email", score)
|
||||
assert ct == "email_html", f"Expected 'email_html', got '{ct}'"
|
||||
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> tuple[float, list[str]]:
|
||||
"""Run all assertions declared in the YAML case.
|
||||
|
||||
Returns (score 0.0–1.0, list of failure messages).
|
||||
"""
|
||||
failures: list[str] = []
|
||||
|
||||
if assertions.get("no_html_tags"):
|
||||
if re.search(r"<[^>]+>", result.clean_text):
|
||||
failures.append("clean_text still contains HTML tags")
|
||||
|
||||
min_len = assertions.get("min_length")
|
||||
if min_len is not None:
|
||||
if len(result.clean_text) < min_len:
|
||||
failures.append(
|
||||
f"clean_text too short: {len(result.clean_text)} < {min_len}"
|
||||
)
|
||||
|
||||
ratio_lt = assertions.get("compression_ratio_lt")
|
||||
if ratio_lt is not None and len(raw) > 0:
|
||||
ratio = len(result.clean_text) / len(raw)
|
||||
if ratio >= ratio_lt:
|
||||
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
|
||||
|
||||
meta_keys = assertions.get("metadata_keys", [])
|
||||
for key in meta_keys:
|
||||
if not result.metadata.get(key):
|
||||
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
|
||||
|
||||
contains = assertions.get("contains")
|
||||
if contains:
|
||||
items = [contains] if isinstance(contains, str) else contains
|
||||
for item in items:
|
||||
if item not in result.clean_text:
|
||||
failures.append(f"clean_text missing expected substring: {item!r}")
|
||||
|
||||
not_contains = assertions.get("not_contains")
|
||||
if not_contains:
|
||||
items = [not_contains] if isinstance(not_contains, str) else not_contains
|
||||
for item in items:
|
||||
if item in result.clean_text:
|
||||
failures.append(f"clean_text contains forbidden substring: {item!r}")
|
||||
|
||||
expected_ct = assertions.get("content_type")
|
||||
if expected_ct and result.content_type != expected_ct:
|
||||
failures.append(
|
||||
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
|
||||
)
|
||||
|
||||
score = 1.0 if not failures else 0.0
|
||||
return score, failures
|
||||
|
||||
|
||||
# ── 1.2 — Detect generic HTML ─────────────────────────────────────────
|
||||
# ── Parametrized: detect ──────────────────────────────────────────────
|
||||
|
||||
def test_detect_generic_html():
|
||||
generic = """<!DOCTYPE html><html><head><title>My App</title></head>
|
||||
<body><nav><a href="/">Home</a></nav><main><p>Welcome</p></main></body></html>"""
|
||||
ct = detect_content_type("index.html", generic)
|
||||
score = 1.0 if ct == "generic_html" else 0.0
|
||||
_score("preprocess.detect_generic", score)
|
||||
assert ct == "generic_html", f"Expected 'generic_html', got '{ct}'"
|
||||
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
|
||||
|
||||
|
||||
# ── 1.3 — Detect plain text ───────────────────────────────────────────
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
_detect_cases,
|
||||
ids=[c["id"] for c in _detect_cases],
|
||||
)
|
||||
def test_detect(case: dict) -> None:
|
||||
raw = _read_content(case)
|
||||
ct = detect_content_type(case["input_filename"], raw)
|
||||
|
||||
def test_detect_plain_text():
|
||||
ct = detect_content_type("notes.txt", "Just some notes here.\nNo HTML at all.")
|
||||
score = 1.0 if ct == "plain_text" else 0.0
|
||||
_score("preprocess.detect_text", score)
|
||||
assert ct == "plain_text", f"Expected 'plain_text', got '{ct}'"
|
||||
expected = case["expected_content_type"]
|
||||
score = 1.0 if ct == expected else 0.0
|
||||
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
|
||||
|
||||
assert ct == expected, (
|
||||
f"[{case['id']}] {case['description']}: "
|
||||
f"expected content_type={expected!r}, got {ct!r}"
|
||||
)
|
||||
|
||||
|
||||
# ── 1.4 — Detect unknown ──────────────────────────────────────────────
|
||||
# ── Parametrized: preprocess ──────────────────────────────────────────
|
||||
|
||||
def test_detect_unknown():
|
||||
# Simulate binary-like content with non-printable chars
|
||||
binary_like = "some\x00\x01\x02\x03\x04\x05content" * 20
|
||||
ct = detect_content_type("archive.xyz", binary_like)
|
||||
score = 1.0 if ct == "unknown" else 0.0
|
||||
_score("preprocess.detect_unknown", score)
|
||||
assert ct == "unknown", f"Expected 'unknown', got '{ct}'"
|
||||
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
|
||||
|
||||
|
||||
# ── 1.5 — Email: strip HTML tags ─────────────────────────────────────
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
_preprocess_cases,
|
||||
ids=[c["id"] for c in _preprocess_cases],
|
||||
)
|
||||
def test_preprocess(case: dict) -> None:
|
||||
raw = _read_content(case)
|
||||
result = preprocess(case["input_content_type"], raw)
|
||||
|
||||
def test_email_strip_html(sample_email_html):
|
||||
result = preprocess("email_html", sample_email_html)
|
||||
has_no_tags = "<" not in result.clean_text
|
||||
has_content = len(result.clean_text) > 50
|
||||
ratio = len(result.clean_text) / len(sample_email_html)
|
||||
score = 1.0 if (has_no_tags and has_content and ratio < 0.8) else 0.0
|
||||
_score("preprocess.email_strip", score, f"ratio={ratio:.2f}, len={len(result.clean_text)}")
|
||||
assert has_no_tags, "clean_text still contains HTML tags"
|
||||
assert has_content, "clean_text is too short"
|
||||
assertions = case.get("assertions", {})
|
||||
score, failures = _run_assertions(assertions, result, raw)
|
||||
|
||||
comment = "; ".join(failures) if failures else f"len={len(result.clean_text)}"
|
||||
_lf_score(case["score_name"], score, comment)
|
||||
|
||||
# ── 1.6 — Email: extract metadata ────────────────────────────────────
|
||||
|
||||
def test_email_extract_metadata(sample_email_html):
|
||||
result = preprocess("email_html", sample_email_html)
|
||||
has_subject = bool(result.metadata.get("subject"))
|
||||
has_from = bool(result.metadata.get("from"))
|
||||
score = 1.0 if (has_subject and has_from) else 0.5 if (has_subject or has_from) else 0.0
|
||||
_score("preprocess.email_metadata", score,
|
||||
f"subject={result.metadata.get('subject')}, from={result.metadata.get('from')}")
|
||||
assert has_subject, f"metadata missing 'subject'. Got: {result.metadata}"
|
||||
assert has_from, f"metadata missing 'from'. Got: {result.metadata}"
|
||||
|
||||
|
||||
# ── 1.7 — Email: split thread ─────────────────────────────────────────
|
||||
|
||||
def test_email_split_thread(sample_thread_email_html):
|
||||
result = preprocess("email_html", sample_thread_email_html)
|
||||
# The latest message is "Sure, I'll handle the deploy."
|
||||
# Quoted content from Bob/Alice should not appear in clean_text
|
||||
has_latest = "Sure, I'll handle the deploy" in result.clean_text
|
||||
lacks_quoted = "Let's plan the deploy" not in result.clean_text
|
||||
score = 1.0 if (has_latest and lacks_quoted) else 0.5 if has_latest else 0.0
|
||||
_score("preprocess.email_thread", score,
|
||||
f"has_latest={has_latest}, lacks_quoted={lacks_quoted}")
|
||||
assert has_latest, "Latest message not found in clean_text"
|
||||
assert lacks_quoted, "Quoted older message leaked into clean_text"
|
||||
|
||||
|
||||
# ── 1.8 — Email: single message (no thread) ──────────────────────────
|
||||
|
||||
def test_email_single_message():
|
||||
single = """<!DOCTYPE html><html><body>
|
||||
<p>From: alice@co.com</p>
|
||||
<p>Subject: Quick update</p>
|
||||
<p>The deploy is done. Everything looks good.</p>
|
||||
</body></html>"""
|
||||
result = preprocess("email_html", single)
|
||||
has_body = "deploy is done" in result.clean_text
|
||||
score = 1.0 if has_body else 0.0
|
||||
_score("preprocess.email_single", score)
|
||||
assert has_body, "Body of single message not found in clean_text"
|
||||
|
||||
|
||||
# ── 1.9 — Email: heavy HTML (table layout) ───────────────────────────
|
||||
|
||||
def test_email_heavy_html(sample_heavy_html_email):
|
||||
result = preprocess("email_html", sample_heavy_html_email)
|
||||
has_no_tags = "<" not in result.clean_text
|
||||
has_content = len(result.clean_text) > 30
|
||||
# CSS properties should not appear in clean text
|
||||
no_css = "border-collapse" not in result.clean_text and "font-size" not in result.clean_text
|
||||
score = 1.0 if (has_no_tags and has_content and no_css) else 0.0
|
||||
_score("preprocess.email_heavy_html", score,
|
||||
f"no_tags={has_no_tags}, has_content={has_content}, no_css={no_css}")
|
||||
assert has_no_tags, "HTML tags found in clean_text"
|
||||
assert has_content, "clean_text is empty"
|
||||
assert no_css, "CSS properties leaked into clean_text"
|
||||
|
||||
|
||||
# ── 1.10 — Fallback: unknown file type ───────────────────────────────
|
||||
|
||||
def test_fallback_unknown_content():
|
||||
raw = "random text content without any structure\nline two\nline three"
|
||||
result = preprocess("unknown", raw)
|
||||
has_text = len(result.clean_text) > 0
|
||||
score = 1.0 if has_text else 0.0
|
||||
_score("preprocess.fallback", score)
|
||||
assert has_text, "fallback handler returned empty clean_text"
|
||||
assert result.content_type == "unknown"
|
||||
assert not failures, (
|
||||
f"[{case['id']}] {case['description']} — {len(failures)} assertion(s) failed:\n"
|
||||
+ "\n".join(f" • {f}" for f in failures)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user