5 Commits

Author SHA1 Message Date
Roberto Musso
3cc32569d9 chore(tests): remove Langfuse scoring from preprocess tests
Scoring is only meaningful for LLM-backed steps. Preprocess tests are
deterministic Python, so scores add no value. Kept only for detect tests.

- test_preprocess: drop _lf_score call, simplify _run_assertions return type
- cases.yaml: remove score_name from all op=preprocess entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 11:21:42 +02:00
Roberto Musso
bf445ac2ce refactor(tests): YAML-driven fixtures for preprocessor tests
- cases.yaml: 10 test cases con schema dichiarativo (op, assertions)
- data/: 7 file reali (email_action.html, email_thread.html, email_single.html,
  email_heavy.html, generic_page.html, notes.txt, fallback.txt)
- test_preprocessors.py: parametrize da YAML via test_detect / test_preprocess;
  assertion engine generico (no_html_tags, min_length, compression_ratio,
  metadata_keys, contains, not_contains, content_type)
- requirements.txt: add PyYAML

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 10:44:41 +02:00
Roberto Musso
a2d6d689e4 feat: add preprocessor system (Step 1 — Local Agent V2)
- app/core/preprocessors/__init__.py: detect_content_type + preprocess dispatcher
- app/core/preprocessors/base.py: PreprocessResult dataclass
- app/core/preprocessors/email_html.py: BeautifulSoup HTML stripping, metadata extraction, thread splitting
- requirements.txt: add beautifulsoup4 and lxml
- tests/test_preprocessors.py: 10 tests with Langfuse scoring (preprocess.* scores)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 10:19:02 +02:00
Roberto Musso
aa8bcbf0d8 Refactor system prompt variables for clarity and consistency across agent setup and runner modules 2026-04-07 00:23:41 +02:00
Roberto Musso
1ce1d492b0 Add Langfuse observability: traces, prompt management, prompt-to-generation linking
- New app/core/langfuse_client.py: lazy singleton client, get_prompt_or_fallback()
  helper (returns raw template + prompt obj for linking), extract_usage() for token
  counts. No-ops when LANGFUSE_* env vars are not set.
- deep_agent.py: home-agent and floating-agent runs wrapped in spans; each ainvoke
  wrapped in a generation with model/input/output/usage; prompts fetched from
  Langfuse (adiuva-home-agent, adiuva-floating-agent, adiuva-floating-classifier)
  with hardcoded fallback.
- agent_runner.py: step1-classifier and step2-processor LLM calls traced; batch
  agent _run_agent_with_tools spans + generations; cloud-processor included.
  Prompts: adiuva-step1-classifier, adiuva-step2-processor, adiuva-cloud-processor.
- agent_setup.py: journey-setup span + generation per ainvoke; prompt_obj stored
  on JourneySession and reused across turns. Prompt: journey_system.
- settings.py: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_HOST added.
- .env.example: Langfuse section with EU/US/self-hosted host comments.
- requirements.txt: langfuse>=2.0.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 00:19:20 +02:00
19 changed files with 1156 additions and 85 deletions

View File

@@ -39,6 +39,13 @@ QDRANT_URL=
QDRANT_API_KEY=
# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333
# ── Langfuse (leave empty to disable observability) ───────────────────────────
LANGFUSE_SECRET_KEY=
LANGFUSE_PUBLIC_KEY=
# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default)
# LANGFUSE_HOST=https://us.cloud.langfuse.com # US
# LANGFUSE_HOST=http://localhost:3000 # Self-hosted
# ── CORS ──────────────────────────────────────────────────────────────────────
# Comma-separated list parsed by Settings (override default if needed)
# CORS_ORIGINS=["app://.","http://localhost:3000"]

View File

@@ -31,6 +31,8 @@ from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
from app.config.settings import settings
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm
logger = logging.getLogger(__name__)
@@ -62,6 +64,7 @@ class JourneySession:
data_types: list[str]
history: list[dict[str, Any]] = field(default_factory=list)
system_prompt: str = ""
langfuse_prompt: Any = None
created_at: float = field(default_factory=time.monotonic)
def is_expired(self) -> bool:
@@ -85,7 +88,7 @@ def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
# ── System prompt builder ─────────────────────────────────────────────────
_SYSTEM_PROMPT_TEMPLATE = """\
_JOURNEY_SYSTEM_PROMPT = """\
You are a friendly assistant helping a freelancer configure a data-extraction agent.
Your job is to understand exactly what data the user wants to extract from their
local directory and produce a detailed prompt_template that a separate AI will use
@@ -146,20 +149,25 @@ def _build_system_prompt(
directory: str,
data_types: list[str],
existing_template: str | None = None,
) -> str:
) -> tuple[str, Any]:
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
existing_section = (
f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
f"---\n{existing_template}\n---\n"
if existing_template
else ""
)
return _SYSTEM_PROMPT_TEMPLATE.format(
template, prompt_obj = get_prompt_or_fallback(
"journey_system", _JOURNEY_SYSTEM_PROMPT
)
compiled = template.format(
directory=directory,
data_types=", ".join(data_types),
template_start=_TEMPLATE_START,
template_end=_TEMPLATE_END,
existing_section=existing_section,
)
return compiled, prompt_obj
# ── Template extraction ───────────────────────────────────────────────────
@@ -199,12 +207,17 @@ async def _call_llm_with_tools(
system_prompt: str,
history: list[dict[str, Any]],
tools: list[Any],
*,
user_id: str = "",
session_id: str = "",
langfuse_prompt: Any = None,
) -> str:
"""Build LangChain messages from history and invoke the LLM with tools.
Handles tool-calling loops: if the LLM calls tools, execute them and
continue until a final text response is produced.
"""
lf = get_langfuse()
messages: list[Any] = [SystemMessage(content=system_prompt)]
for turn in history:
if turn["role"] == "user":
@@ -216,38 +229,76 @@ async def _call_llm_with_tools(
llm_with_tools = llm.bind_tools(tools)
tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(_MAX_TOOL_STEPS):
response: AIMessage = await llm_with_tools.ainvoke(messages)
messages.append(response)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name="journey-setup",
user_id=user_id or None,
session_id=session_id or None,
input=history[-1]["content"] if history else "",
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
if not response.tool_calls:
return _as_text(response.content)
for call in response.tool_calls:
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_setup: journey tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:500],
try:
for _ in range(_MAX_TOOL_STEPS):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name="journey-setup-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
messages.append(response)
logger.info(
"agent_setup: journey tool_result name=%s output=%s",
call_name,
str(tool_output)[:800],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
if not response.tool_calls:
if _span:
_span.update(output=_as_text(response.content))
return _as_text(response.content)
# Fallback: exceeded max steps.
final = await llm.ainvoke(messages)
return _as_text(final.content)
for call in response.tool_calls:
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_setup: journey tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:500],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_setup: journey tool_result name=%s output=%s",
call_name,
str(tool_output)[:800],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps.
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
if _span:
_span.update(output=final_text)
return final_text
finally:
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
# ── Journey handlers (called from device_ws.py) ──────────────────────────
@@ -270,7 +321,7 @@ async def handle_journey_start(
# Use the session_id provided by the FE so the reply matches the
# listener key; fall back to a generated one if absent.
session_id = frame.get("session_id") or str(uuid.uuid4())
system_prompt = _build_system_prompt(directory, data_types, existing_template)
system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_template)
session = JourneySession(
session_id=session_id,
@@ -279,6 +330,7 @@ async def handle_journey_start(
directory=directory,
data_types=data_types,
system_prompt=system_prompt,
langfuse_prompt=langfuse_prompt,
)
# The LLM will explore the directory using FILESYSTEM_TOOLS via the
@@ -292,6 +344,9 @@ async def handle_journey_start(
system_prompt=system_prompt,
history=seed_history,
tools=list(FILESYSTEM_TOOLS),
user_id=user_id,
session_id=session_id,
langfuse_prompt=langfuse_prompt,
)
session.history.extend(seed_history)
@@ -356,6 +411,9 @@ async def handle_journey_message(
system_prompt=session.system_prompt,
history=session.history,
tools=list(FILESYSTEM_TOOLS),
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
)
session.history.append({"role": "assistant", "content": ai_reply})
@@ -379,6 +437,9 @@ async def handle_journey_message(
system_prompt=session.system_prompt,
history=session.history,
tools=list(FILESYSTEM_TOOLS),
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
)
session.history.append({"role": "assistant", "content": nudge_reply})

View File

@@ -52,6 +52,10 @@ class Settings(BaseSettings):
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
LANGFUSE_SECRET_KEY: str = ""
LANGFUSE_PUBLIC_KEY: str = ""
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
ENV: Literal["dev", "prod"] = "dev"
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")

View File

@@ -42,7 +42,9 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.config.settings import settings
from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session
@@ -100,7 +102,7 @@ _DOMAIN_DESCRIPTIONS: dict[str, str] = {
),
}
_STEP1_SYSTEM_PROMPT = """\
_BATCH_FILE_CLASSIFIER_PROMPT = """\
You are a file classifier for a freelance project management tool.
Your job is to match a file to an existing project and identify which data domains to extract.
@@ -131,7 +133,7 @@ Respond ONLY with a JSON object — no markdown, no explanation:
# ── Step 2: Processing prompt ─────────────────────────────────────────────
_PROCESSING_SYSTEM_PROMPT = """\
_BATCH_PROCESSING_PROMPT = """\
You are a data extraction assistant for a freelance project management tool.
Your task: extract structured data from the file content and persist it using the available tools.
@@ -160,7 +162,7 @@ Domains to extract: {data_types}
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
_CLOUD_PROCESSING_PROMPT = """\
_BATCH_CLOUD_PROCESSING_PROMPT = """\
You are a data extraction and management assistant for a freelance project
management tool.
@@ -268,8 +270,12 @@ async def _run_agent_with_tools(
user_message: str,
tools: list[Any],
max_steps: int,
user_id: str = "",
langfuse_prompt: Any = None,
agent_name: str = "batch-agent",
) -> str:
"""Run an LLM agent with tool-calling, returning the final text response."""
lf = get_langfuse()
llm = get_llm()
llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [
@@ -279,38 +285,76 @@ async def _run_agent_with_tools(
tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps):
response: AIMessage = await llm_with_tools.ainvoke(messages)
messages.append(response)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name=agent_name,
user_id=user_id or None,
input=user_message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
if not response.tool_calls:
return _as_text(response.content)
for call in response.tool_calls:
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_runner: tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
try:
for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
messages.append(response)
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
if not response.tool_calls:
final_text = _as_text(response.content)
if _span:
_span.update(output=final_text)
return final_text
final = await llm.ainvoke(messages)
return _as_text(final.content)
for call in response.tool_calls:
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_runner: tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
if _span:
_span.update(output=final_text)
return final_text
finally:
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
# ── Tool list builder ─────────────────────────────────────────────────────
@@ -515,17 +559,33 @@ async def _classify_file(
if d in _DOMAIN_DESCRIPTIONS
)
system = _STEP1_SYSTEM_PROMPT.format(
step1_template, step1_prompt_obj = get_prompt_or_fallback(
"batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT
)
system = step1_template.format(
domain_definitions=domain_definitions,
projects_list=projects_list,
)
lf = get_langfuse()
llm = get_llm()
classifier_messages = [
SystemMessage(content=system),
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
]
try:
response = await llm.ainvoke([
SystemMessage(content=system),
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
])
if lf:
with lf.start_as_current_observation(
as_type="generation",
name="step1-classifier",
model=settings.LLM_ROUTER_MODEL,
prompt=step1_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"):
@@ -713,7 +773,10 @@ async def run_local_agent(
existing_context = "\n\n".join(existing_blocks)
system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
step2_template, step2_prompt_obj = get_prompt_or_fallback(
"batch_processing", _BATCH_PROCESSING_PROMPT
)
system_prompt = step2_template.format(
existing_context=existing_context,
project_context=project_context,
data_types=", ".join(domains),
@@ -730,6 +793,9 @@ async def run_local_agent(
),
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id,
langfuse_prompt=step2_prompt_obj,
agent_name="step2-processor",
)
logger.info(
"agent_runner: run=%s file=%r result=%s",
@@ -928,7 +994,10 @@ async def run_cloud_agent(
continue
items_processed += 1
processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
cloud_template, cloud_prompt_obj = get_prompt_or_fallback(
"batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT
)
processing_prompt = cloud_template.format(
data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})",
@@ -941,6 +1010,9 @@ async def run_cloud_agent(
user_message=f"Process this message content:\n\n{content_text[:8000]}",
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id,
langfuse_prompt=cloud_prompt_obj,
agent_name="cloud-processor",
)
except Exception as exc:
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")

View File

@@ -16,7 +16,9 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm
from app.config.settings import settings
from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session
@@ -26,7 +28,7 @@ logger = logging.getLogger(__name__)
FloatingDomainType = Literal["task", "timeline", "project", "node"]
FloatingDomainSection = Literal["task", "timeline", "note"]
_HOME_SINGLE_AGENT_SYSTEM = (
_HOME_SYSTEM_PROMPT = (
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
"Always use tools for factual data retrieval before answering. "
"When the user asks to remember, forget, or update what you know about them, use memory tools. "
@@ -39,7 +41,7 @@ _HOME_SINGLE_AGENT_SYSTEM = (
"For upcoming tasks, after tag lines add a short recommendation based on due date and priority."
)
_FLOATING_SINGLE_AGENT_SYSTEM = (
_FLOATING_SYSTEM_PROMPT = (
"You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
"Stay focused on the floating scope in context.scope and answer concisely. "
"Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. "
@@ -48,7 +50,7 @@ _FLOATING_SINGLE_AGENT_SYSTEM = (
"If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
)
_FLOATING_DOMAIN_CLASSIFIER_SYSTEM = (
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
"You are a strict domain classifier for websocket floating requests. "
"Return ONLY a JSON object with keys: type, id, section. "
"Allowed type values: task, timeline, project, node. "
@@ -536,17 +538,31 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
try:
llm = get_llm()
response = await llm.ainvoke(
[
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM),
HumanMessage(
content=(
f"Message:\n{message}\n\n"
f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}"
)
),
]
classifier_messages = [
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
HumanMessage(
content=(
f"Message:\n{message}\n\n"
f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}"
)
),
]
lf = get_langfuse()
_, classifier_prompt_obj = get_prompt_or_fallback(
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
)
if lf:
with lf.start_as_current_observation(
as_type="generation",
name="floating-classifier",
model=settings.LLM_MODEL,
prompt=classifier_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
parsed = _parse_json_object(_as_text(response.content))
if parsed is not None:
domain = _normalize_domain_payload(parsed, project_id)
@@ -571,8 +587,11 @@ async def _run_single_agent(
message: str,
context: dict[str, Any],
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
) -> str:
trace_id = _trace_id_from_context(context)
lf = get_langfuse()
llm = get_llm()
tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context)
@@ -591,9 +610,37 @@ async def _run_single_agent(
tool_calls_count = 0
collected: list[dict[str, Any]] = []
set_tool_result_collector(collected)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name=agent_name,
user_id=user_id,
session_id=trace_id,
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
try:
for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
messages.append(response)
if not response.tool_calls:
@@ -605,6 +652,8 @@ async def _run_single_agent(
tool_calls_count,
len(final_text),
)
if _span:
_span.update(output=final_text)
return final_text
tool_map = {tool_def.name: tool_def for tool_def in tools}
@@ -644,9 +693,15 @@ async def _run_single_agent(
tool_calls_count,
len(final_text),
)
if _span:
_span.update(output=final_text)
return final_text
finally:
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def _run_single_agent_stream(
@@ -656,8 +711,11 @@ async def _run_single_agent_stream(
message: str,
context: dict[str, Any],
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context)
lf = get_langfuse()
llm = get_llm()
tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context)
@@ -677,9 +735,38 @@ async def _run_single_agent_stream(
streamed_chars = 0
collected: list[dict[str, Any]] = []
set_tool_result_collector(collected)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name=f"{agent_name}-stream",
user_id=user_id,
session_id=trace_id,
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
streamed_text: list[str] = []
try:
for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
messages.append(response)
if not response.tool_calls:
@@ -688,6 +775,7 @@ async def _run_single_agent_stream(
token = _as_text(getattr(chunk, "content", ""))
if token:
streamed_chars += len(token)
streamed_text.append(token)
emitted_any = True
yield "token", token
@@ -696,6 +784,7 @@ async def _run_single_agent_stream(
fallback_text = _as_text(response.content)
if fallback_text:
streamed_chars += len(fallback_text)
streamed_text.append(fallback_text)
yield "token", fallback_text
logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
@@ -704,6 +793,8 @@ async def _run_single_agent_stream(
tool_calls_count,
streamed_chars,
)
if _span:
_span.update(output="".join(streamed_text))
return
tool_map = {tool_def.name: tool_def for tool_def in tools}
@@ -738,6 +829,7 @@ async def _run_single_agent_stream(
token = _as_text(getattr(chunk, "content", ""))
if token:
streamed_chars += len(token)
streamed_text.append(token)
yield "token", token
logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
@@ -746,17 +838,28 @@ async def _run_single_agent_stream(
tool_calls_count,
streamed_chars,
)
if _span:
_span.update(output="".join(streamed_text))
finally:
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT
)
response = await _run_single_agent(
user_id=user_id,
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
)
return _normalize_tagged_list_lines(response, message)
@@ -764,11 +867,16 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
prepared_context = await _prepare_context(message, context)
domain = await _infer_floating_domain(message, prepared_context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT
)
response = await _run_single_agent(
user_id=user_id,
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
)
sanitized = _strip_floating_markup(response)
if not sanitized and response:
@@ -782,12 +890,17 @@ async def run_home_stream(
context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT
)
text_chunks: list[str] = []
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
):
event_type, data = event
if event_type != "token":
@@ -809,14 +922,19 @@ async def run_floating_stream(
domain = await _infer_floating_domain(message, prepared_context)
yield "floating_domain", domain
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT
)
sanitizer = _FloatingStreamSanitizer()
emitted_sanitized = False
raw_chunks: list[str] = []
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
):
event_type, data = event
if event_type != "token":

114
app/core/langfuse_client.py Normal file
View File

@@ -0,0 +1,114 @@
"""Langfuse observability — singleton client and prompt helpers.
If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set,
all helpers are no-ops so the app works without Langfuse configured.
Usage
-----
Tracing::
from app.core.langfuse_client import get_langfuse
lf = get_langfuse()
if lf:
with lf.start_as_current_observation(as_type="span", name="my-agent") as span:
span.update(input=user_message)
# ... do work ...
span.update(output=result)
lf.flush()
Prompt management::
from app.core.langfuse_client import get_prompt_or_fallback
text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT)
# Use text as the system prompt; pass prompt_obj to generations for linking.
Linking a prompt to a generation::
with lf.start_as_current_observation(
as_type="generation",
name="llm-call",
model="gpt-4o",
prompt=prompt_obj, # links generation → prompt version in the UI
input=messages,
) as gen:
response = await llm.ainvoke(messages)
gen.update(output=response.content, usage=_usage(response))
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
_client: Any = None
_initialized: bool = False
def get_langfuse() -> Any | None:
"""Return the Langfuse singleton, or ``None`` when not configured."""
global _client, _initialized
if _initialized:
return _client
_initialized = True
from app.config.settings import settings # local import to avoid circular deps
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
logger.debug("langfuse: not configured — observability disabled")
return None
try:
from langfuse import Langfuse
_client = Langfuse(
secret_key=settings.LANGFUSE_SECRET_KEY,
public_key=settings.LANGFUSE_PUBLIC_KEY,
host=settings.LANGFUSE_HOST,
)
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST)
except Exception as exc:
logger.warning("langfuse: failed to initialize: %s", exc)
_client = None
return _client
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
Returns ``(prompt_text, prompt_obj_or_None)``.
* ``prompt_text`` — the raw template string (variables not yet substituted).
Callers perform variable substitution with Python's ``.format()``.
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
unavailable / the fetch failed. Pass this to generation observations so
Langfuse links the generation to the exact prompt version in the UI.
"""
lf = get_langfuse()
if lf is None:
return fallback, None
try:
prompt = lf.get_prompt(name, label="production", fallback=fallback)
# For text-type prompts .prompt holds the raw template string.
raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback
return raw, prompt
except Exception as exc:
logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc)
return fallback, None
def extract_usage(response: Any) -> dict[str, int]:
"""Extract token usage from a LangChain AI message into Langfuse format."""
meta = getattr(response, "usage_metadata", None)
if not meta:
return {}
return {
"input": int(meta.get("input_tokens", 0)),
"output": int(meta.get("output_tokens", 0)),
"total": int(meta.get("total_tokens", 0)),
}

View File

@@ -0,0 +1,104 @@
"""Preprocessor registry: detect content type and dispatch to handlers.
Public API
----------
detect_content_type(filename, raw_content) -> str
Heuristic detection based on file extension and content patterns.
preprocess(content_type, raw_content) -> PreprocessResult
Dispatch to the appropriate handler.
"""
from __future__ import annotations
import re
from app.core.preprocessors.base import PreprocessResult
# ── Heuristics ────────────────────────────────────────────────────────
# Patterns that strongly suggest an email HTML file
_EMAIL_SIGNALS = re.compile(
r"(Subject:|From:|To:|Date:|Sent:|MIME-Version:|Content-Type:\s*text/html)",
re.IGNORECASE,
)
# Patterns that suggest a generic HTML page (not an email)
_GENERIC_HTML_SIGNALS = re.compile(
r"<(nav|main|header|footer|article|section)\b",
re.IGNORECASE,
)
def detect_content_type(filename: str, raw_content: str) -> str:
"""Return a content-type string for the given file.
Supported types: ``"email_html"``, ``"generic_html"``,
``"plain_text"``, ``"unknown"``.
"""
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext == "txt":
return "plain_text"
if ext in ("html", "htm", "eml", "mhtml", "mht"):
# Prefer email detection over generic HTML
if _EMAIL_SIGNALS.search(raw_content[:4096]):
return "email_html"
if _GENERIC_HTML_SIGNALS.search(raw_content[:4096]) or "<html" in raw_content[:200].lower():
return "generic_html"
# .html without clear signals — check for any email header
if re.search(r"^(From|To|Subject|Date):", raw_content[:2048], re.MULTILINE | re.IGNORECASE):
return "email_html"
return "generic_html"
# Plain text files with email headers
if ext in ("", "txt") or not ext:
if _EMAIL_SIGNALS.search(raw_content[:4096]):
return "email_html"
# Detect binary content
try:
raw_content.encode("utf-8")
except (UnicodeEncodeError, AttributeError):
return "unknown"
# Non-text bytes heuristic: high ratio of non-printable chars
sample = raw_content[:512]
non_printable = sum(1 for c in sample if ord(c) < 32 and c not in "\r\n\t")
if len(sample) > 0 and non_printable / len(sample) > 0.1:
return "unknown"
return "unknown"
# ── Generic fallback handler ──────────────────────────────────────────
def _preprocess_generic(raw_content: str, content_type: str) -> PreprocessResult:
"""Strip HTML tags if present, return text as-is."""
try:
from bs4 import BeautifulSoup
text = BeautifulSoup(raw_content, "html.parser").get_text(separator="\n")
except ImportError:
# No BeautifulSoup — strip tags with a simple regex
text = re.sub(r"<[^>]+>", "", raw_content)
text = re.sub(r"\n{3,}", "\n\n", text).strip()
return PreprocessResult(content_type=content_type, clean_text=text, metadata={})
# ── Dispatch ──────────────────────────────────────────────────────────
def preprocess(content_type: str, raw_content: str) -> PreprocessResult:
"""Dispatch *raw_content* to the handler registered for *content_type*.
Falls back to the generic handler for unknown types.
"""
if content_type == "email_html":
from app.core.preprocessors.email_html import preprocess_email_html
return preprocess_email_html(raw_content)
return _preprocess_generic(raw_content, content_type)
__all__ = ["detect_content_type", "preprocess", "PreprocessResult"]

View File

@@ -0,0 +1,25 @@
"""Base types for the preprocessor system."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class PreprocessResult:
"""Output of a preprocessor handler.
Attributes
----------
content_type:
The detected content type (e.g. ``"email_html"``, ``"plain_text"``).
clean_text:
Human-readable text stripped of markup/binary noise.
metadata:
Dict of extracted metadata (keys vary by handler).
Common keys: ``subject``, ``from``, ``to``, ``date``, ``filename``.
"""
content_type: str
clean_text: str
metadata: dict = field(default_factory=dict)

View File

@@ -0,0 +1,111 @@
"""Preprocessor for email HTML files.
Handles:
- HTML stripping via BeautifulSoup
- Metadata extraction (Subject, From, To, Date)
- Thread splitting — isolates the latest reply
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from app.core.preprocessors.base import PreprocessResult
if TYPE_CHECKING:
pass
# ── Thread split markers ──────────────────────────────────────────────
# Matches patterns like:
# "On Mon, Apr 7, 2026 at 10:00 AM, Alice <alice@co.com> wrote:"
# "-----Original Message-----"
# "> " (plain-text quote prefix)
_THREAD_PATTERNS = [
re.compile(r"^On\s+.+wrote\s*:", re.IGNORECASE | re.MULTILINE),
re.compile(r"^-{3,}\s*(original message|forwarded message)\s*-{3,}", re.IGNORECASE | re.MULTILINE),
re.compile(r"^>{1,}\s+\S", re.MULTILINE),
re.compile(r"^From:\s+.+\nSent:\s+", re.IGNORECASE | re.MULTILINE),
]
# ── Metadata patterns (applied on raw HTML / plain fallback) ──────────
_META_PATTERNS: dict[str, list[re.Pattern]] = {
"subject": [
re.compile(r"<title>(.+?)</title>", re.IGNORECASE | re.DOTALL),
re.compile(r"Subject:\s*(.+)", re.IGNORECASE),
],
"from": [
re.compile(r'<meta[^>]+name=["\']?from["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"From:\s*(.+)", re.IGNORECASE),
],
"to": [
re.compile(r'<meta[^>]+name=["\']?to["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"To:\s*(.+)", re.IGNORECASE),
],
"date": [
re.compile(r'<meta[^>]+name=["\']?date["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"Date:\s*(.+)", re.IGNORECASE),
re.compile(r"Sent:\s*(.+)", re.IGNORECASE),
],
}
def _extract_metadata(raw_html: str, text: str) -> dict:
"""Extract Subject/From/To/Date from raw HTML or plain text."""
metadata: dict[str, str] = {}
for field, patterns in _META_PATTERNS.items():
for pat in patterns:
m = pat.search(raw_html) or pat.search(text)
if m:
metadata[field] = m.group(1).strip()
break
return metadata
def _split_thread(text: str) -> str:
"""Return only the latest message in a threaded email."""
earliest_pos: int | None = None
for pat in _THREAD_PATTERNS:
m = pat.search(text)
if m and (earliest_pos is None or m.start() < earliest_pos):
earliest_pos = m.start()
if earliest_pos is not None and earliest_pos > 0:
return text[:earliest_pos].strip()
return text.strip()
def preprocess_email_html(raw_content: str) -> PreprocessResult:
"""Strip HTML, extract metadata, split thread from an email HTML file."""
try:
from bs4 import BeautifulSoup # lazy import — optional dep
except ImportError as exc:
raise ImportError(
"beautifulsoup4 is required for email_html preprocessing. "
"Install it with: pip install beautifulsoup4"
) from exc
# Parse with lxml if available, fall back to html.parser
try:
soup = BeautifulSoup(raw_content, "lxml")
except Exception:
soup = BeautifulSoup(raw_content, "html.parser")
# Remove noise tags
for tag in soup(["style", "script", "head", "noscript"]):
tag.decompose()
clean_text = soup.get_text(separator="\n")
# Collapse excessive blank lines
clean_text = re.sub(r"\n{3,}", "\n\n", clean_text).strip()
metadata = _extract_metadata(raw_content, clean_text)
latest_message = _split_thread(clean_text)
return PreprocessResult(
content_type="email_html",
clean_text=latest_message,
metadata=metadata,
)

View File

@@ -32,4 +32,8 @@ google-auth-oauthlib>=1.2.0
google-auth-httplib2>=0.2.0
msal>=1.28.0
cryptography>=42.0.0
langfuse>=2.0.0
beautifulsoup4>=4.12.0
lxml>=5.0.0
PyYAML>=6.0.0
ruff>=0.8.0

121
tests/fixtures/preprocessors/cases.yaml vendored Normal file
View File

@@ -0,0 +1,121 @@
# Preprocessor test cases — Step 1 (Local Agent V2)
#
# Schema per caso:
# id: "1.N"
# description: str
# score_name: str # nome score inviato a Langfuse
#
# Sorgente contenuto (una delle due):
# file: <nome file in data/> # letto come testo UTF-8
# generate: binary_noise # contenuto generato dal runner (per test binari)
#
# Per op=detect:
# op: detect
# input_filename: str # filename passato a detect_content_type
# expected_content_type: str
#
# Per op=preprocess:
# op: preprocess
# input_content_type: str # content_type passato a preprocess()
# assertions:
# no_html_tags: bool
# min_length: int
# compression_ratio_lt: float # len(clean) / len(raw) < soglia
# metadata_keys: [str, ...] # chiavi che devono essere in metadata
# contains: str | [str, ...] # substring(s) presenti in clean_text
# not_contains: str | [str, ...] # substring(s) assenti da clean_text
# content_type: str # valore atteso di result.content_type
cases:
# ── Detection tests ────────────────────────────────────────────────
- id: "1.1"
description: "Detect email HTML"
score_name: preprocess.detect_email
file: email_action.html
op: detect
input_filename: email_export.html
expected_content_type: email_html
- id: "1.2"
description: "Detect generic HTML"
score_name: preprocess.detect_generic
file: generic_page.html
op: detect
input_filename: index.html
expected_content_type: generic_html
- id: "1.3"
description: "Detect plain text"
score_name: preprocess.detect_text
file: notes.txt
op: detect
input_filename: notes.txt
expected_content_type: plain_text
- id: "1.4"
description: "Detect unknown (binary-like content)"
score_name: preprocess.detect_unknown
generate: binary_noise
op: detect
input_filename: archive.xyz
expected_content_type: unknown
# ── Preprocess tests ───────────────────────────────────────────────
- id: "1.5"
description: "Email: strip HTML tags"
file: email_action.html
op: preprocess
input_content_type: email_html
assertions:
no_html_tags: true
min_length: 50
compression_ratio_lt: 0.8
- id: "1.6"
description: "Email: extract metadata (Subject + From)"
file: email_action.html
op: preprocess
input_content_type: email_html
assertions:
metadata_keys: [subject, from]
- id: "1.7"
description: "Email: split thread — solo ultimo messaggio"
file: email_thread.html
op: preprocess
input_content_type: email_html
assertions:
contains: "Sure, I'll handle the deploy"
not_contains: "Let's plan the deploy"
- id: "1.8"
description: "Email: singolo messaggio senza thread"
file: email_single.html
op: preprocess
input_content_type: email_html
assertions:
contains: "deploy is done"
- id: "1.9"
description: "Email: HTML pesante con table layout"
file: email_heavy.html
op: preprocess
input_content_type: email_html
assertions:
no_html_tags: true
min_length: 30
not_contains:
- "border-collapse"
- "font-size"
- id: "1.10"
description: "Fallback: file sconosciuto → testo restituito"
file: fallback.txt
op: preprocess
input_content_type: unknown
assertions:
min_length: 1
content_type: unknown

View File

@@ -0,0 +1,25 @@
<!DOCTYPE html>
<html>
<head>
<title>Fix the login bug</title>
<style>
body { font-family: Arial, sans-serif; color: #333; margin: 0; padding: 20px; }
.header { background: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }
.body { padding: 20px; }
</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> boss@company.com</p>
<p><strong>To:</strong> dev@company.com</p>
<p><strong>Subject:</strong> Fix the login bug</p>
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:00:00 +0200</p>
</div>
<div class="body">
<p>Hi,</p>
<p>Please fix the login bug by Friday. It is blocking the release.</p>
<p>Priority: high. Let me know if you need anything.</p>
<p>Thanks,<br>Boss</p>
</div>
</body>
</html>

View File

@@ -0,0 +1,49 @@
<!DOCTYPE html>
<html>
<head>
<style>
table { border-collapse: collapse; width: 100%; max-width: 600px; margin: 0 auto; }
td { padding: 8px 12px; border: 1px solid #dddddd; font-size: 12px; color: #444444; }
.header-row { background-color: #003366; color: #ffffff; font-weight: bold; }
.label-col { background-color: #f0f0f0; width: 80px; font-weight: bold; }
.footer-row { font-size: 10px; color: #999999; text-align: center; }
</style>
</head>
<body bgcolor="#eeeeee">
<center>
<table cellpadding="0" cellspacing="0">
<tr class="header-row">
<td colspan="2">Company Internal Update</td>
</tr>
<tr>
<td class="label-col">From:</td>
<td>newsletter@corp.com</td>
</tr>
<tr>
<td class="label-col">Subject:</td>
<td>Q1 Results Update</td>
</tr>
<tr>
<td class="label-col">Date:</td>
<td>Apr 7, 2026</td>
</tr>
<tr>
<td colspan="2">
<table width="100%" cellpadding="10">
<tr>
<td>
<p style="font-size:14px; font-weight:bold;">Dear Team,</p>
<p>Q1 results are in. Revenue up 15% year-over-year.</p>
<p>Please review the attached report and share any feedback by EOW.</p>
</td>
</tr>
</table>
</td>
</tr>
<tr class="footer-row">
<td colspan="2">Confidential — do not forward outside the company.</td>
</tr>
</table>
</center>
</body>
</html>

View File

@@ -0,0 +1,8 @@
<!DOCTYPE html>
<html><body>
<p><strong>From:</strong> alice@co.com</p>
<p><strong>To:</strong> team@co.com</p>
<p><strong>Subject:</strong> Quick update</p>
<p><strong>Date:</strong> Tue, 7 Apr 2026 10:30:00 +0200</p>
<p>The deploy is done. Everything looks good. No issues so far.</p>
</body></html>

View File

@@ -0,0 +1,24 @@
<!DOCTYPE html>
<html><body>
<div class="message-latest">
<p><strong>From:</strong> alice@co.com</p>
<p><strong>Subject:</strong> Re: Re: Deploy plan</p>
<p>Sure, I'll handle the deploy.</p>
</div>
<p>On Mon, Apr 6, 2026 at 3:00 PM, Bob &lt;bob@co.com&gt; wrote:</p>
<blockquote>
<p>From: bob@co.com</p>
<p>Can you handle the deploy?</p>
<p>On Sun, Apr 5, 2026 at 1:00 PM, Alice &lt;alice@co.com&gt; wrote:</p>
<blockquote>
<p>From: alice@co.com</p>
<p>Let's plan the deploy for Monday.</p>
<p>On Sat, Apr 4, 2026 at 11:00 AM, Charlie &lt;charlie@co.com&gt; wrote:</p>
<blockquote>
<p>From: charlie@co.com</p>
<p>We need to schedule the deploy. What day works?</p>
</blockquote>
</blockquote>
</blockquote>
</body></html>

View File

@@ -0,0 +1,3 @@
random text content without any structure
line two with some words
line three and more content here

View File

@@ -0,0 +1,35 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>My Web App</title>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<nav>
<a href="/">Home</a>
<a href="/about">About</a>
<a href="/contact">Contact</a>
</nav>
<main>
<header>
<h1>Welcome to My App</h1>
</header>
<article>
<p>This is a generic web page with no email headers.</p>
<p>It has navigation, main content, and a footer.</p>
</article>
<section>
<h2>Features</h2>
<ul>
<li>Fast</li>
<li>Reliable</li>
<li>Secure</li>
</ul>
</section>
</main>
<footer>
<p>&copy; 2026 My App</p>
</footer>
</body>
</html>

View File

@@ -0,0 +1,15 @@
Meeting notes - April 7, 2026
Attendees: Alice, Bob, Charlie
Discussion points:
- Deploy scheduled for Friday
- Bug fix for login must be completed by Thursday
- Review Q1 numbers before EOW
Action items:
- Alice: fix login bug
- Bob: prepare deploy checklist
- Charlie: send Q1 report
Next meeting: April 14, 2026

171
tests/test_preprocessors.py Normal file
View File

@@ -0,0 +1,171 @@
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
Fixtures are driven by:
tests/fixtures/preprocessors/cases.yaml — test case definitions
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
Run:
pytest tests/test_preprocessors.py -v
# Only detection tests
pytest tests/test_preprocessors.py -v -k detect
# Only preprocess tests
pytest tests/test_preprocessors.py -v -k preprocess
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
import pytest
import yaml
from app.core.langfuse_client import get_langfuse
from app.core.preprocessors import detect_content_type, preprocess
# ── Paths ──────────────────────────────────────────────────────────────
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
_DATA_DIR = _FIXTURES_DIR / "data"
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
# ── Content generators ─────────────────────────────────────────────────
_GENERATORS: dict[str, str] = {
# High ratio of non-printable chars → triggers "unknown" heuristic
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
}
def _load_cases() -> list[dict]:
with _CASES_FILE.open(encoding="utf-8") as f:
return yaml.safe_load(f)["cases"]
def _read_content(case: dict) -> str:
if "generate" in case:
key = case["generate"]
if key not in _GENERATORS:
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
return _GENERATORS[key]
file_path = _DATA_DIR / case["file"]
return file_path.read_text(encoding="utf-8")
# ── Langfuse helper ───────────────────────────────────────────────────
def _lf_score(score_name: str, value: float, comment: str = "") -> None:
lf = get_langfuse()
if lf:
trace = lf.trace(name=f"eval-{score_name}")
lf.score(
trace_id=trace.id,
name=score_name,
value=value,
data_type="NUMERIC",
comment=comment,
)
lf.flush()
# ── Assertion engine ──────────────────────────────────────────────────
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> list[str]:
"""Run all assertions declared in the YAML case. Returns failure messages."""
failures: list[str] = []
if assertions.get("no_html_tags"):
if re.search(r"<[^>]+>", result.clean_text):
failures.append("clean_text still contains HTML tags")
min_len = assertions.get("min_length")
if min_len is not None:
if len(result.clean_text) < min_len:
failures.append(
f"clean_text too short: {len(result.clean_text)} < {min_len}"
)
ratio_lt = assertions.get("compression_ratio_lt")
if ratio_lt is not None and len(raw) > 0:
ratio = len(result.clean_text) / len(raw)
if ratio >= ratio_lt:
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
meta_keys = assertions.get("metadata_keys", [])
for key in meta_keys:
if not result.metadata.get(key):
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
contains = assertions.get("contains")
if contains:
items = [contains] if isinstance(contains, str) else contains
for item in items:
if item not in result.clean_text:
failures.append(f"clean_text missing expected substring: {item!r}")
not_contains = assertions.get("not_contains")
if not_contains:
items = [not_contains] if isinstance(not_contains, str) else not_contains
for item in items:
if item in result.clean_text:
failures.append(f"clean_text contains forbidden substring: {item!r}")
expected_ct = assertions.get("content_type")
if expected_ct and result.content_type != expected_ct:
failures.append(
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
)
return failures
# ── Parametrized: detect ──────────────────────────────────────────────
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
@pytest.mark.parametrize(
"case",
_detect_cases,
ids=[c["id"] for c in _detect_cases],
)
def test_detect(case: dict) -> None:
raw = _read_content(case)
ct = detect_content_type(case["input_filename"], raw)
expected = case["expected_content_type"]
score = 1.0 if ct == expected else 0.0
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
assert ct == expected, (
f"[{case['id']}] {case['description']}: "
f"expected content_type={expected!r}, got {ct!r}"
)
# ── Parametrized: preprocess ──────────────────────────────────────────
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
@pytest.mark.parametrize(
"case",
_preprocess_cases,
ids=[c["id"] for c in _preprocess_cases],
)
def test_preprocess(case: dict) -> None:
raw = _read_content(case)
result = preprocess(case["input_content_type"], raw)
assertions = case.get("assertions", {})
failures = _run_assertions(assertions, result, raw)
assert not failures, (
f"[{case['id']}] {case['description']}{len(failures)} assertion(s) failed:\n"
+ "\n".join(f"{f}" for f in failures)
)