5 Commits

Author SHA1 Message Date
Roberto Musso
3cc32569d9 chore(tests): remove Langfuse scoring from preprocess tests
Scoring is only meaningful for LLM-backed steps. Preprocess tests are
deterministic Python, so scores add no value. Kept only for detect tests.

- test_preprocess: drop _lf_score call, simplify _run_assertions return type
- cases.yaml: remove score_name from all op=preprocess entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 11:21:42 +02:00
Roberto Musso
bf445ac2ce refactor(tests): YAML-driven fixtures for preprocessor tests
- cases.yaml: 10 test cases con schema dichiarativo (op, assertions)
- data/: 7 file reali (email_action.html, email_thread.html, email_single.html,
  email_heavy.html, generic_page.html, notes.txt, fallback.txt)
- test_preprocessors.py: parametrize da YAML via test_detect / test_preprocess;
  assertion engine generico (no_html_tags, min_length, compression_ratio,
  metadata_keys, contains, not_contains, content_type)
- requirements.txt: add PyYAML

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 10:44:41 +02:00
Roberto Musso
a2d6d689e4 feat: add preprocessor system (Step 1 — Local Agent V2)
- app/core/preprocessors/__init__.py: detect_content_type + preprocess dispatcher
- app/core/preprocessors/base.py: PreprocessResult dataclass
- app/core/preprocessors/email_html.py: BeautifulSoup HTML stripping, metadata extraction, thread splitting
- requirements.txt: add beautifulsoup4 and lxml
- tests/test_preprocessors.py: 10 tests with Langfuse scoring (preprocess.* scores)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 10:19:02 +02:00
Roberto Musso
aa8bcbf0d8 Refactor system prompt variables for clarity and consistency across agent setup and runner modules 2026-04-07 00:23:41 +02:00
Roberto Musso
1ce1d492b0 Add Langfuse observability: traces, prompt management, prompt-to-generation linking
- New app/core/langfuse_client.py: lazy singleton client, get_prompt_or_fallback()
  helper (returns raw template + prompt obj for linking), extract_usage() for token
  counts. No-ops when LANGFUSE_* env vars are not set.
- deep_agent.py: home-agent and floating-agent runs wrapped in spans; each ainvoke
  wrapped in a generation with model/input/output/usage; prompts fetched from
  Langfuse (adiuva-home-agent, adiuva-floating-agent, adiuva-floating-classifier)
  with hardcoded fallback.
- agent_runner.py: step1-classifier and step2-processor LLM calls traced; batch
  agent _run_agent_with_tools spans + generations; cloud-processor included.
  Prompts: adiuva-step1-classifier, adiuva-step2-processor, adiuva-cloud-processor.
- agent_setup.py: journey-setup span + generation per ainvoke; prompt_obj stored
  on JourneySession and reused across turns. Prompt: journey_system.
- settings.py: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_HOST added.
- .env.example: Langfuse section with EU/US/self-hosted host comments.
- requirements.txt: langfuse>=2.0.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 00:19:20 +02:00
19 changed files with 1156 additions and 85 deletions

View File

@@ -39,6 +39,13 @@ QDRANT_URL=
QDRANT_API_KEY= QDRANT_API_KEY=
# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333 # For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333
# ── Langfuse (leave empty to disable observability) ───────────────────────────
LANGFUSE_SECRET_KEY=
LANGFUSE_PUBLIC_KEY=
# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default)
# LANGFUSE_HOST=https://us.cloud.langfuse.com # US
# LANGFUSE_HOST=http://localhost:3000 # Self-hosted
# ── CORS ────────────────────────────────────────────────────────────────────── # ── CORS ──────────────────────────────────────────────────────────────────────
# Comma-separated list parsed by Settings (override default if needed) # Comma-separated list parsed by Settings (override default if needed)
# CORS_ORIGINS=["app://.","http://localhost:3000"] # CORS_ORIGINS=["app://.","http://localhost:3000"]

View File

@@ -31,6 +31,8 @@ from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.agents.filesystem_agent import FILESYSTEM_TOOLS
from app.config.settings import settings
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,6 +64,7 @@ class JourneySession:
data_types: list[str] data_types: list[str]
history: list[dict[str, Any]] = field(default_factory=list) history: list[dict[str, Any]] = field(default_factory=list)
system_prompt: str = "" system_prompt: str = ""
langfuse_prompt: Any = None
created_at: float = field(default_factory=time.monotonic) created_at: float = field(default_factory=time.monotonic)
def is_expired(self) -> bool: def is_expired(self) -> bool:
@@ -85,7 +88,7 @@ def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
# ── System prompt builder ───────────────────────────────────────────────── # ── System prompt builder ─────────────────────────────────────────────────
_SYSTEM_PROMPT_TEMPLATE = """\ _JOURNEY_SYSTEM_PROMPT = """\
You are a friendly assistant helping a freelancer configure a data-extraction agent. You are a friendly assistant helping a freelancer configure a data-extraction agent.
Your job is to understand exactly what data the user wants to extract from their Your job is to understand exactly what data the user wants to extract from their
local directory and produce a detailed prompt_template that a separate AI will use local directory and produce a detailed prompt_template that a separate AI will use
@@ -146,20 +149,25 @@ def _build_system_prompt(
directory: str, directory: str,
data_types: list[str], data_types: list[str],
existing_template: str | None = None, existing_template: str | None = None,
) -> str: ) -> tuple[str, Any]:
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
existing_section = ( existing_section = (
f"\nThe user already has the following prompt_template — refine it based on their answers:\n" f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
f"---\n{existing_template}\n---\n" f"---\n{existing_template}\n---\n"
if existing_template if existing_template
else "" else ""
) )
return _SYSTEM_PROMPT_TEMPLATE.format( template, prompt_obj = get_prompt_or_fallback(
"journey_system", _JOURNEY_SYSTEM_PROMPT
)
compiled = template.format(
directory=directory, directory=directory,
data_types=", ".join(data_types), data_types=", ".join(data_types),
template_start=_TEMPLATE_START, template_start=_TEMPLATE_START,
template_end=_TEMPLATE_END, template_end=_TEMPLATE_END,
existing_section=existing_section, existing_section=existing_section,
) )
return compiled, prompt_obj
# ── Template extraction ─────────────────────────────────────────────────── # ── Template extraction ───────────────────────────────────────────────────
@@ -199,12 +207,17 @@ async def _call_llm_with_tools(
system_prompt: str, system_prompt: str,
history: list[dict[str, Any]], history: list[dict[str, Any]],
tools: list[Any], tools: list[Any],
*,
user_id: str = "",
session_id: str = "",
langfuse_prompt: Any = None,
) -> str: ) -> str:
"""Build LangChain messages from history and invoke the LLM with tools. """Build LangChain messages from history and invoke the LLM with tools.
Handles tool-calling loops: if the LLM calls tools, execute them and Handles tool-calling loops: if the LLM calls tools, execute them and
continue until a final text response is produced. continue until a final text response is produced.
""" """
lf = get_langfuse()
messages: list[Any] = [SystemMessage(content=system_prompt)] messages: list[Any] = [SystemMessage(content=system_prompt)]
for turn in history: for turn in history:
if turn["role"] == "user": if turn["role"] == "user":
@@ -216,38 +229,76 @@ async def _call_llm_with_tools(
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(_MAX_TOOL_STEPS): _span_ctx = (
response: AIMessage = await llm_with_tools.ainvoke(messages) lf.start_as_current_observation(
messages.append(response) as_type="span",
name="journey-setup",
user_id=user_id or None,
session_id=session_id or None,
input=history[-1]["content"] if history else "",
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
if not response.tool_calls: try:
return _as_text(response.content) for _ in range(_MAX_TOOL_STEPS):
_gen_ctx = (
for call in response.tool_calls: lf.start_as_current_observation(
call_name = str(call.get("name", "")) as_type="generation",
call_args = call.get("args", {}) name="journey-setup-llm",
logger.info( model=settings.LLM_MODEL,
"agent_setup: journey tool_call name=%s args=%s", prompt=langfuse_prompt,
call_name, input=messages,
json.dumps(call_args, ensure_ascii=True)[:500], )
if lf else None
) )
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
tool_fn = tool_map.get(call_name) messages.append(response)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info( if not response.tool_calls:
"agent_setup: journey tool_result name=%s output=%s", if _span:
call_name, _span.update(output=_as_text(response.content))
str(tool_output)[:800], return _as_text(response.content)
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps. for call in response.tool_calls:
final = await llm.ainvoke(messages) call_name = str(call.get("name", ""))
return _as_text(final.content) call_args = call.get("args", {})
logger.info(
"agent_setup: journey tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:500],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_setup: journey tool_result name=%s output=%s",
call_name,
str(tool_output)[:800],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps.
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
if _span:
_span.update(output=final_text)
return final_text
finally:
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
# ── Journey handlers (called from device_ws.py) ────────────────────────── # ── Journey handlers (called from device_ws.py) ──────────────────────────
@@ -270,7 +321,7 @@ async def handle_journey_start(
# Use the session_id provided by the FE so the reply matches the # Use the session_id provided by the FE so the reply matches the
# listener key; fall back to a generated one if absent. # listener key; fall back to a generated one if absent.
session_id = frame.get("session_id") or str(uuid.uuid4()) session_id = frame.get("session_id") or str(uuid.uuid4())
system_prompt = _build_system_prompt(directory, data_types, existing_template) system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_template)
session = JourneySession( session = JourneySession(
session_id=session_id, session_id=session_id,
@@ -279,6 +330,7 @@ async def handle_journey_start(
directory=directory, directory=directory,
data_types=data_types, data_types=data_types,
system_prompt=system_prompt, system_prompt=system_prompt,
langfuse_prompt=langfuse_prompt,
) )
# The LLM will explore the directory using FILESYSTEM_TOOLS via the # The LLM will explore the directory using FILESYSTEM_TOOLS via the
@@ -292,6 +344,9 @@ async def handle_journey_start(
system_prompt=system_prompt, system_prompt=system_prompt,
history=seed_history, history=seed_history,
tools=list(FILESYSTEM_TOOLS), tools=list(FILESYSTEM_TOOLS),
user_id=user_id,
session_id=session_id,
langfuse_prompt=langfuse_prompt,
) )
session.history.extend(seed_history) session.history.extend(seed_history)
@@ -356,6 +411,9 @@ async def handle_journey_message(
system_prompt=session.system_prompt, system_prompt=session.system_prompt,
history=session.history, history=session.history,
tools=list(FILESYSTEM_TOOLS), tools=list(FILESYSTEM_TOOLS),
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
) )
session.history.append({"role": "assistant", "content": ai_reply}) session.history.append({"role": "assistant", "content": ai_reply})
@@ -379,6 +437,9 @@ async def handle_journey_message(
system_prompt=session.system_prompt, system_prompt=session.system_prompt,
history=session.history, history=session.history,
tools=list(FILESYSTEM_TOOLS), tools=list(FILESYSTEM_TOOLS),
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
) )
session.history.append({"role": "assistant", "content": nudge_reply}) session.history.append({"role": "assistant", "content": nudge_reply})

View File

@@ -52,6 +52,10 @@ class Settings(BaseSettings):
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"] CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
LANGFUSE_SECRET_KEY: str = ""
LANGFUSE_PUBLIC_KEY: str = ""
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
ENV: Literal["dev", "prod"] = "dev" ENV: Literal["dev", "prod"] = "dev"
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")

View File

@@ -42,7 +42,9 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.config.settings import settings
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session from app.db import async_session
@@ -100,7 +102,7 @@ _DOMAIN_DESCRIPTIONS: dict[str, str] = {
), ),
} }
_STEP1_SYSTEM_PROMPT = """\ _BATCH_FILE_CLASSIFIER_PROMPT = """\
You are a file classifier for a freelance project management tool. You are a file classifier for a freelance project management tool.
Your job is to match a file to an existing project and identify which data domains to extract. Your job is to match a file to an existing project and identify which data domains to extract.
@@ -131,7 +133,7 @@ Respond ONLY with a JSON object — no markdown, no explanation:
# ── Step 2: Processing prompt ───────────────────────────────────────────── # ── Step 2: Processing prompt ─────────────────────────────────────────────
_PROCESSING_SYSTEM_PROMPT = """\ _BATCH_PROCESSING_PROMPT = """\
You are a data extraction assistant for a freelance project management tool. You are a data extraction assistant for a freelance project management tool.
Your task: extract structured data from the file content and persist it using the available tools. Your task: extract structured data from the file content and persist it using the available tools.
@@ -160,7 +162,7 @@ Domains to extract: {data_types}
# ── Cloud processing prompt (kept separate for cloud agent) ─────────────── # ── Cloud processing prompt (kept separate for cloud agent) ───────────────
_CLOUD_PROCESSING_PROMPT = """\ _BATCH_CLOUD_PROCESSING_PROMPT = """\
You are a data extraction and management assistant for a freelance project You are a data extraction and management assistant for a freelance project
management tool. management tool.
@@ -268,8 +270,12 @@ async def _run_agent_with_tools(
user_message: str, user_message: str,
tools: list[Any], tools: list[Any],
max_steps: int, max_steps: int,
user_id: str = "",
langfuse_prompt: Any = None,
agent_name: str = "batch-agent",
) -> str: ) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.""" """Run an LLM agent with tool-calling, returning the final text response."""
lf = get_langfuse()
llm = get_llm() llm = get_llm()
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [ messages: list[Any] = [
@@ -279,38 +285,76 @@ async def _run_agent_with_tools(
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps): _span_ctx = (
response: AIMessage = await llm_with_tools.ainvoke(messages) lf.start_as_current_observation(
messages.append(response) as_type="span",
name=agent_name,
user_id=user_id or None,
input=user_message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
if not response.tool_calls: try:
return _as_text(response.content) for _ in range(max_steps):
_gen_ctx = (
for call in response.tool_calls: lf.start_as_current_observation(
call_id = str(call.get("id", "")) as_type="generation",
call_name = str(call.get("name", "")) name=f"{agent_name}-llm",
call_args = call.get("args", {}) model=settings.LLM_MODEL,
logger.info( prompt=langfuse_prompt,
"agent_runner: tool_call name=%s args=%s", input=messages,
call_name, )
json.dumps(call_args, ensure_ascii=True)[:800], if lf else None
) )
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
tool_fn = tool_map.get(call_name) messages.append(response)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info( if not response.tool_calls:
"agent_runner: tool_result name=%s output=%s", final_text = _as_text(response.content)
call_name, if _span:
str(tool_output)[:200], _span.update(output=final_text)
) return final_text
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
final = await llm.ainvoke(messages) for call in response.tool_calls:
return _as_text(final.content) call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_runner: tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_runner: tool_result name=%s output=%s",
call_name,
str(tool_output)[:200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
if _span:
_span.update(output=final_text)
return final_text
finally:
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
# ── Tool list builder ───────────────────────────────────────────────────── # ── Tool list builder ─────────────────────────────────────────────────────
@@ -515,17 +559,33 @@ async def _classify_file(
if d in _DOMAIN_DESCRIPTIONS if d in _DOMAIN_DESCRIPTIONS
) )
system = _STEP1_SYSTEM_PROMPT.format( step1_template, step1_prompt_obj = get_prompt_or_fallback(
"batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT
)
system = step1_template.format(
domain_definitions=domain_definitions, domain_definitions=domain_definitions,
projects_list=projects_list, projects_list=projects_list,
) )
lf = get_langfuse()
llm = get_llm() llm = get_llm()
classifier_messages = [
SystemMessage(content=system),
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
]
try: try:
response = await llm.ainvoke([ if lf:
SystemMessage(content=system), with lf.start_as_current_observation(
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), as_type="generation",
]) name="step1-classifier",
model=settings.LLM_ROUTER_MODEL,
prompt=step1_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
raw = _as_text(response.content).strip() raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON. # Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"): if raw.startswith("```"):
@@ -713,7 +773,10 @@ async def run_local_agent(
existing_context = "\n\n".join(existing_blocks) existing_context = "\n\n".join(existing_blocks)
system_prompt = _PROCESSING_SYSTEM_PROMPT.format( step2_template, step2_prompt_obj = get_prompt_or_fallback(
"batch_processing", _BATCH_PROCESSING_PROMPT
)
system_prompt = step2_template.format(
existing_context=existing_context, existing_context=existing_context,
project_context=project_context, project_context=project_context,
data_types=", ".join(domains), data_types=", ".join(domains),
@@ -730,6 +793,9 @@ async def run_local_agent(
), ),
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id,
langfuse_prompt=step2_prompt_obj,
agent_name="step2-processor",
) )
logger.info( logger.info(
"agent_runner: run=%s file=%r result=%s", "agent_runner: run=%s file=%r result=%s",
@@ -928,7 +994,10 @@ async def run_cloud_agent(
continue continue
items_processed += 1 items_processed += 1
processing_prompt = _CLOUD_PROCESSING_PROMPT.format( cloud_template, cloud_prompt_obj = get_prompt_or_fallback(
"batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT
)
processing_prompt = cloud_template.format(
data_types=", ".join(config.data_types), data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.", project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})", file_list=f"Message from {config.provider} (id: {msg.id})",
@@ -941,6 +1010,9 @@ async def run_cloud_agent(
user_message=f"Process this message content:\n\n{content_text[:8000]}", user_message=f"Process this message content:\n\n{content_text[:8000]}",
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id,
langfuse_prompt=cloud_prompt_obj,
agent_name="cloud-processor",
) )
except Exception as exc: except Exception as exc:
errors.append(f"LLM processing error for message {msg.id!r}: {exc}") errors.append(f"LLM processing error for message {msg.id!r}: {exc}")

View File

@@ -16,7 +16,9 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
from app.config.settings import settings
from app.core.memory_middleware import MemoryMiddleware from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session from app.db import async_session
@@ -26,7 +28,7 @@ logger = logging.getLogger(__name__)
FloatingDomainType = Literal["task", "timeline", "project", "node"] FloatingDomainType = Literal["task", "timeline", "project", "node"]
FloatingDomainSection = Literal["task", "timeline", "note"] FloatingDomainSection = Literal["task", "timeline", "note"]
_HOME_SINGLE_AGENT_SYSTEM = ( _HOME_SYSTEM_PROMPT = (
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
"Always use tools for factual data retrieval before answering. " "Always use tools for factual data retrieval before answering. "
"When the user asks to remember, forget, or update what you know about them, use memory tools. " "When the user asks to remember, forget, or update what you know about them, use memory tools. "
@@ -39,7 +41,7 @@ _HOME_SINGLE_AGENT_SYSTEM = (
"For upcoming tasks, after tag lines add a short recommendation based on due date and priority." "For upcoming tasks, after tag lines add a short recommendation based on due date and priority."
) )
_FLOATING_SINGLE_AGENT_SYSTEM = ( _FLOATING_SYSTEM_PROMPT = (
"You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
"Stay focused on the floating scope in context.scope and answer concisely. " "Stay focused on the floating scope in context.scope and answer concisely. "
"Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. " "Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. "
@@ -48,7 +50,7 @@ _FLOATING_SINGLE_AGENT_SYSTEM = (
"If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
) )
_FLOATING_DOMAIN_CLASSIFIER_SYSTEM = ( _FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
"You are a strict domain classifier for websocket floating requests. " "You are a strict domain classifier for websocket floating requests. "
"Return ONLY a JSON object with keys: type, id, section. " "Return ONLY a JSON object with keys: type, id, section. "
"Allowed type values: task, timeline, project, node. " "Allowed type values: task, timeline, project, node. "
@@ -536,17 +538,31 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
try: try:
llm = get_llm() llm = get_llm()
response = await llm.ainvoke( classifier_messages = [
[ SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM), HumanMessage(
HumanMessage( content=(
content=( f"Message:\n{message}\n\n"
f"Message:\n{message}\n\n" f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}"
f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}" )
) ),
), ]
] lf = get_langfuse()
_, classifier_prompt_obj = get_prompt_or_fallback(
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
) )
if lf:
with lf.start_as_current_observation(
as_type="generation",
name="floating-classifier",
model=settings.LLM_MODEL,
prompt=classifier_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
parsed = _parse_json_object(_as_text(response.content)) parsed = _parse_json_object(_as_text(response.content))
if parsed is not None: if parsed is not None:
domain = _normalize_domain_payload(parsed, project_id) domain = _normalize_domain_payload(parsed, project_id)
@@ -571,8 +587,11 @@ async def _run_single_agent(
message: str, message: str,
context: dict[str, Any], context: dict[str, Any],
max_steps: int = 6, max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
) -> str: ) -> str:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
lf = get_langfuse()
llm = get_llm() llm = get_llm()
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context) model_context = _context_for_model(context)
@@ -591,9 +610,37 @@ async def _run_single_agent(
tool_calls_count = 0 tool_calls_count = 0
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name=agent_name,
user_id=user_id,
session_id=trace_id,
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
try: try:
for _ in range(max_steps): for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
if not response.tool_calls: if not response.tool_calls:
@@ -605,6 +652,8 @@ async def _run_single_agent(
tool_calls_count, tool_calls_count,
len(final_text), len(final_text),
) )
if _span:
_span.update(output=final_text)
return final_text return final_text
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
@@ -644,9 +693,15 @@ async def _run_single_agent(
tool_calls_count, tool_calls_count,
len(final_text), len(final_text),
) )
if _span:
_span.update(output=final_text)
return final_text return final_text
finally: finally:
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def _run_single_agent_stream( async def _run_single_agent_stream(
@@ -656,8 +711,11 @@ async def _run_single_agent_stream(
message: str, message: str,
context: dict[str, Any], context: dict[str, Any],
max_steps: int = 6, max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
) -> AsyncGenerator[tuple[str, Any], None]: ) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
lf = get_langfuse()
llm = get_llm() llm = get_llm()
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context) model_context = _context_for_model(context)
@@ -677,9 +735,38 @@ async def _run_single_agent_stream(
streamed_chars = 0 streamed_chars = 0
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name=f"{agent_name}-stream",
user_id=user_id,
session_id=trace_id,
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
streamed_text: list[str] = []
try: try:
for _ in range(max_steps): for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=settings.LLM_MODEL,
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
if not response.tool_calls: if not response.tool_calls:
@@ -688,6 +775,7 @@ async def _run_single_agent_stream(
token = _as_text(getattr(chunk, "content", "")) token = _as_text(getattr(chunk, "content", ""))
if token: if token:
streamed_chars += len(token) streamed_chars += len(token)
streamed_text.append(token)
emitted_any = True emitted_any = True
yield "token", token yield "token", token
@@ -696,6 +784,7 @@ async def _run_single_agent_stream(
fallback_text = _as_text(response.content) fallback_text = _as_text(response.content)
if fallback_text: if fallback_text:
streamed_chars += len(fallback_text) streamed_chars += len(fallback_text)
streamed_text.append(fallback_text)
yield "token", fallback_text yield "token", fallback_text
logger.info( logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d", "deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
@@ -704,6 +793,8 @@ async def _run_single_agent_stream(
tool_calls_count, tool_calls_count,
streamed_chars, streamed_chars,
) )
if _span:
_span.update(output="".join(streamed_text))
return return
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
@@ -738,6 +829,7 @@ async def _run_single_agent_stream(
token = _as_text(getattr(chunk, "content", "")) token = _as_text(getattr(chunk, "content", ""))
if token: if token:
streamed_chars += len(token) streamed_chars += len(token)
streamed_text.append(token)
yield "token", token yield "token", token
logger.info( logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1", "deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
@@ -746,17 +838,28 @@ async def _run_single_agent_stream(
tool_calls_count, tool_calls_count,
streamed_chars, streamed_chars,
) )
if _span:
_span.update(output="".join(streamed_text))
finally: finally:
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str: async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
prepared_context = await _prepare_context(message, context) prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT
)
response = await _run_single_agent( response = await _run_single_agent(
user_id=user_id, user_id=user_id,
system_prompt=_HOME_SINGLE_AGENT_SYSTEM, system_prompt=system_prompt,
message=message, message=message,
context=prepared_context, context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
) )
return _normalize_tagged_list_lines(response, message) return _normalize_tagged_list_lines(response, message)
@@ -764,11 +867,16 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]: async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
prepared_context = await _prepare_context(message, context) prepared_context = await _prepare_context(message, context)
domain = await _infer_floating_domain(message, prepared_context) domain = await _infer_floating_domain(message, prepared_context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT
)
response = await _run_single_agent( response = await _run_single_agent(
user_id=user_id, user_id=user_id,
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, system_prompt=system_prompt,
message=message, message=message,
context=prepared_context, context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
) )
sanitized = _strip_floating_markup(response) sanitized = _strip_floating_markup(response)
if not sanitized and response: if not sanitized and response:
@@ -782,12 +890,17 @@ async def run_home_stream(
context: dict[str, Any], context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]: ) -> AsyncGenerator[tuple[str, Any], None]:
prepared_context = await _prepare_context(message, context) prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT
)
text_chunks: list[str] = [] text_chunks: list[str] = []
async for event in _run_single_agent_stream( async for event in _run_single_agent_stream(
user_id=user_id, user_id=user_id,
system_prompt=_HOME_SINGLE_AGENT_SYSTEM, system_prompt=system_prompt,
message=message, message=message,
context=prepared_context, context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
): ):
event_type, data = event event_type, data = event
if event_type != "token": if event_type != "token":
@@ -809,14 +922,19 @@ async def run_floating_stream(
domain = await _infer_floating_domain(message, prepared_context) domain = await _infer_floating_domain(message, prepared_context)
yield "floating_domain", domain yield "floating_domain", domain
system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT
)
sanitizer = _FloatingStreamSanitizer() sanitizer = _FloatingStreamSanitizer()
emitted_sanitized = False emitted_sanitized = False
raw_chunks: list[str] = [] raw_chunks: list[str] = []
async for event in _run_single_agent_stream( async for event in _run_single_agent_stream(
user_id=user_id, user_id=user_id,
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, system_prompt=system_prompt,
message=message, message=message,
context=prepared_context, context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
): ):
event_type, data = event event_type, data = event
if event_type != "token": if event_type != "token":

114
app/core/langfuse_client.py Normal file
View File

@@ -0,0 +1,114 @@
"""Langfuse observability — singleton client and prompt helpers.
If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set,
all helpers are no-ops so the app works without Langfuse configured.
Usage
-----
Tracing::
from app.core.langfuse_client import get_langfuse
lf = get_langfuse()
if lf:
with lf.start_as_current_observation(as_type="span", name="my-agent") as span:
span.update(input=user_message)
# ... do work ...
span.update(output=result)
lf.flush()
Prompt management::
from app.core.langfuse_client import get_prompt_or_fallback
text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT)
# Use text as the system prompt; pass prompt_obj to generations for linking.
Linking a prompt to a generation::
with lf.start_as_current_observation(
as_type="generation",
name="llm-call",
model="gpt-4o",
prompt=prompt_obj, # links generation → prompt version in the UI
input=messages,
) as gen:
response = await llm.ainvoke(messages)
gen.update(output=response.content, usage=_usage(response))
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
_client: Any = None
_initialized: bool = False
def get_langfuse() -> Any | None:
"""Return the Langfuse singleton, or ``None`` when not configured."""
global _client, _initialized
if _initialized:
return _client
_initialized = True
from app.config.settings import settings # local import to avoid circular deps
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
logger.debug("langfuse: not configured — observability disabled")
return None
try:
from langfuse import Langfuse
_client = Langfuse(
secret_key=settings.LANGFUSE_SECRET_KEY,
public_key=settings.LANGFUSE_PUBLIC_KEY,
host=settings.LANGFUSE_HOST,
)
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST)
except Exception as exc:
logger.warning("langfuse: failed to initialize: %s", exc)
_client = None
return _client
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
Returns ``(prompt_text, prompt_obj_or_None)``.
* ``prompt_text`` — the raw template string (variables not yet substituted).
Callers perform variable substitution with Python's ``.format()``.
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
unavailable / the fetch failed. Pass this to generation observations so
Langfuse links the generation to the exact prompt version in the UI.
"""
lf = get_langfuse()
if lf is None:
return fallback, None
try:
prompt = lf.get_prompt(name, label="production", fallback=fallback)
# For text-type prompts .prompt holds the raw template string.
raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback
return raw, prompt
except Exception as exc:
logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc)
return fallback, None
def extract_usage(response: Any) -> dict[str, int]:
"""Extract token usage from a LangChain AI message into Langfuse format."""
meta = getattr(response, "usage_metadata", None)
if not meta:
return {}
return {
"input": int(meta.get("input_tokens", 0)),
"output": int(meta.get("output_tokens", 0)),
"total": int(meta.get("total_tokens", 0)),
}

View File

@@ -0,0 +1,104 @@
"""Preprocessor registry: detect content type and dispatch to handlers.
Public API
----------
detect_content_type(filename, raw_content) -> str
Heuristic detection based on file extension and content patterns.
preprocess(content_type, raw_content) -> PreprocessResult
Dispatch to the appropriate handler.
"""
from __future__ import annotations
import re
from app.core.preprocessors.base import PreprocessResult
# ── Heuristics ────────────────────────────────────────────────────────
# Patterns that strongly suggest an email HTML file
_EMAIL_SIGNALS = re.compile(
r"(Subject:|From:|To:|Date:|Sent:|MIME-Version:|Content-Type:\s*text/html)",
re.IGNORECASE,
)
# Patterns that suggest a generic HTML page (not an email)
_GENERIC_HTML_SIGNALS = re.compile(
r"<(nav|main|header|footer|article|section)\b",
re.IGNORECASE,
)
def detect_content_type(filename: str, raw_content: str) -> str:
"""Return a content-type string for the given file.
Supported types: ``"email_html"``, ``"generic_html"``,
``"plain_text"``, ``"unknown"``.
"""
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext == "txt":
return "plain_text"
if ext in ("html", "htm", "eml", "mhtml", "mht"):
# Prefer email detection over generic HTML
if _EMAIL_SIGNALS.search(raw_content[:4096]):
return "email_html"
if _GENERIC_HTML_SIGNALS.search(raw_content[:4096]) or "<html" in raw_content[:200].lower():
return "generic_html"
# .html without clear signals — check for any email header
if re.search(r"^(From|To|Subject|Date):", raw_content[:2048], re.MULTILINE | re.IGNORECASE):
return "email_html"
return "generic_html"
# Plain text files with email headers
if ext in ("", "txt") or not ext:
if _EMAIL_SIGNALS.search(raw_content[:4096]):
return "email_html"
# Detect binary content
try:
raw_content.encode("utf-8")
except (UnicodeEncodeError, AttributeError):
return "unknown"
# Non-text bytes heuristic: high ratio of non-printable chars
sample = raw_content[:512]
non_printable = sum(1 for c in sample if ord(c) < 32 and c not in "\r\n\t")
if len(sample) > 0 and non_printable / len(sample) > 0.1:
return "unknown"
return "unknown"
# ── Generic fallback handler ──────────────────────────────────────────
def _preprocess_generic(raw_content: str, content_type: str) -> PreprocessResult:
"""Strip HTML tags if present, return text as-is."""
try:
from bs4 import BeautifulSoup
text = BeautifulSoup(raw_content, "html.parser").get_text(separator="\n")
except ImportError:
# No BeautifulSoup — strip tags with a simple regex
text = re.sub(r"<[^>]+>", "", raw_content)
text = re.sub(r"\n{3,}", "\n\n", text).strip()
return PreprocessResult(content_type=content_type, clean_text=text, metadata={})
# ── Dispatch ──────────────────────────────────────────────────────────
def preprocess(content_type: str, raw_content: str) -> PreprocessResult:
"""Dispatch *raw_content* to the handler registered for *content_type*.
Falls back to the generic handler for unknown types.
"""
if content_type == "email_html":
from app.core.preprocessors.email_html import preprocess_email_html
return preprocess_email_html(raw_content)
return _preprocess_generic(raw_content, content_type)
__all__ = ["detect_content_type", "preprocess", "PreprocessResult"]

View File

@@ -0,0 +1,25 @@
"""Base types for the preprocessor system."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class PreprocessResult:
"""Output of a preprocessor handler.
Attributes
----------
content_type:
The detected content type (e.g. ``"email_html"``, ``"plain_text"``).
clean_text:
Human-readable text stripped of markup/binary noise.
metadata:
Dict of extracted metadata (keys vary by handler).
Common keys: ``subject``, ``from``, ``to``, ``date``, ``filename``.
"""
content_type: str
clean_text: str
metadata: dict = field(default_factory=dict)

View File

@@ -0,0 +1,111 @@
"""Preprocessor for email HTML files.
Handles:
- HTML stripping via BeautifulSoup
- Metadata extraction (Subject, From, To, Date)
- Thread splitting — isolates the latest reply
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from app.core.preprocessors.base import PreprocessResult
if TYPE_CHECKING:
pass
# ── Thread split markers ──────────────────────────────────────────────
# Matches patterns like:
# "On Mon, Apr 7, 2026 at 10:00 AM, Alice <alice@co.com> wrote:"
# "-----Original Message-----"
# "> " (plain-text quote prefix)
_THREAD_PATTERNS = [
re.compile(r"^On\s+.+wrote\s*:", re.IGNORECASE | re.MULTILINE),
re.compile(r"^-{3,}\s*(original message|forwarded message)\s*-{3,}", re.IGNORECASE | re.MULTILINE),
re.compile(r"^>{1,}\s+\S", re.MULTILINE),
re.compile(r"^From:\s+.+\nSent:\s+", re.IGNORECASE | re.MULTILINE),
]
# ── Metadata patterns (applied on raw HTML / plain fallback) ──────────
_META_PATTERNS: dict[str, list[re.Pattern]] = {
"subject": [
re.compile(r"<title>(.+?)</title>", re.IGNORECASE | re.DOTALL),
re.compile(r"Subject:\s*(.+)", re.IGNORECASE),
],
"from": [
re.compile(r'<meta[^>]+name=["\']?from["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"From:\s*(.+)", re.IGNORECASE),
],
"to": [
re.compile(r'<meta[^>]+name=["\']?to["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"To:\s*(.+)", re.IGNORECASE),
],
"date": [
re.compile(r'<meta[^>]+name=["\']?date["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r"Date:\s*(.+)", re.IGNORECASE),
re.compile(r"Sent:\s*(.+)", re.IGNORECASE),
],
}
def _extract_metadata(raw_html: str, text: str) -> dict:
"""Extract Subject/From/To/Date from raw HTML or plain text."""
metadata: dict[str, str] = {}
for field, patterns in _META_PATTERNS.items():
for pat in patterns:
m = pat.search(raw_html) or pat.search(text)
if m:
metadata[field] = m.group(1).strip()
break
return metadata
def _split_thread(text: str) -> str:
"""Return only the latest message in a threaded email."""
earliest_pos: int | None = None
for pat in _THREAD_PATTERNS:
m = pat.search(text)
if m and (earliest_pos is None or m.start() < earliest_pos):
earliest_pos = m.start()
if earliest_pos is not None and earliest_pos > 0:
return text[:earliest_pos].strip()
return text.strip()
def preprocess_email_html(raw_content: str) -> PreprocessResult:
"""Strip HTML, extract metadata, split thread from an email HTML file."""
try:
from bs4 import BeautifulSoup # lazy import — optional dep
except ImportError as exc:
raise ImportError(
"beautifulsoup4 is required for email_html preprocessing. "
"Install it with: pip install beautifulsoup4"
) from exc
# Parse with lxml if available, fall back to html.parser
try:
soup = BeautifulSoup(raw_content, "lxml")
except Exception:
soup = BeautifulSoup(raw_content, "html.parser")
# Remove noise tags
for tag in soup(["style", "script", "head", "noscript"]):
tag.decompose()
clean_text = soup.get_text(separator="\n")
# Collapse excessive blank lines
clean_text = re.sub(r"\n{3,}", "\n\n", clean_text).strip()
metadata = _extract_metadata(raw_content, clean_text)
latest_message = _split_thread(clean_text)
return PreprocessResult(
content_type="email_html",
clean_text=latest_message,
metadata=metadata,
)

View File

@@ -32,4 +32,8 @@ google-auth-oauthlib>=1.2.0
google-auth-httplib2>=0.2.0 google-auth-httplib2>=0.2.0
msal>=1.28.0 msal>=1.28.0
cryptography>=42.0.0 cryptography>=42.0.0
langfuse>=2.0.0
beautifulsoup4>=4.12.0
lxml>=5.0.0
PyYAML>=6.0.0
ruff>=0.8.0 ruff>=0.8.0

121
tests/fixtures/preprocessors/cases.yaml vendored Normal file
View File

@@ -0,0 +1,121 @@
# Preprocessor test cases — Step 1 (Local Agent V2)
#
# Schema per caso:
# id: "1.N"
# description: str
# score_name: str # nome score inviato a Langfuse
#
# Sorgente contenuto (una delle due):
# file: <nome file in data/> # letto come testo UTF-8
# generate: binary_noise # contenuto generato dal runner (per test binari)
#
# Per op=detect:
# op: detect
# input_filename: str # filename passato a detect_content_type
# expected_content_type: str
#
# Per op=preprocess:
# op: preprocess
# input_content_type: str # content_type passato a preprocess()
# assertions:
# no_html_tags: bool
# min_length: int
# compression_ratio_lt: float # len(clean) / len(raw) < soglia
# metadata_keys: [str, ...] # chiavi che devono essere in metadata
# contains: str | [str, ...] # substring(s) presenti in clean_text
# not_contains: str | [str, ...] # substring(s) assenti da clean_text
# content_type: str # valore atteso di result.content_type
cases:
# ── Detection tests ────────────────────────────────────────────────
- id: "1.1"
description: "Detect email HTML"
score_name: preprocess.detect_email
file: email_action.html
op: detect
input_filename: email_export.html
expected_content_type: email_html
- id: "1.2"
description: "Detect generic HTML"
score_name: preprocess.detect_generic
file: generic_page.html
op: detect
input_filename: index.html
expected_content_type: generic_html
- id: "1.3"
description: "Detect plain text"
score_name: preprocess.detect_text
file: notes.txt
op: detect
input_filename: notes.txt
expected_content_type: plain_text
- id: "1.4"
description: "Detect unknown (binary-like content)"
score_name: preprocess.detect_unknown
generate: binary_noise
op: detect
input_filename: archive.xyz
expected_content_type: unknown
# ── Preprocess tests ───────────────────────────────────────────────
- id: "1.5"
description: "Email: strip HTML tags"
file: email_action.html
op: preprocess
input_content_type: email_html
assertions:
no_html_tags: true
min_length: 50
compression_ratio_lt: 0.8
- id: "1.6"
description: "Email: extract metadata (Subject + From)"
file: email_action.html
op: preprocess
input_content_type: email_html
assertions:
metadata_keys: [subject, from]
- id: "1.7"
description: "Email: split thread — solo ultimo messaggio"
file: email_thread.html
op: preprocess
input_content_type: email_html
assertions:
contains: "Sure, I'll handle the deploy"
not_contains: "Let's plan the deploy"
- id: "1.8"
description: "Email: singolo messaggio senza thread"
file: email_single.html
op: preprocess
input_content_type: email_html
assertions:
contains: "deploy is done"
- id: "1.9"
description: "Email: HTML pesante con table layout"
file: email_heavy.html
op: preprocess
input_content_type: email_html
assertions:
no_html_tags: true
min_length: 30
not_contains:
- "border-collapse"
- "font-size"
- id: "1.10"
description: "Fallback: file sconosciuto → testo restituito"
file: fallback.txt
op: preprocess
input_content_type: unknown
assertions:
min_length: 1
content_type: unknown

View File

@@ -0,0 +1,25 @@
<!DOCTYPE html>
<html>
<head>
<title>Fix the login bug</title>
<style>
body { font-family: Arial, sans-serif; color: #333; margin: 0; padding: 20px; }
.header { background: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }
.body { padding: 20px; }
</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> boss@company.com</p>
<p><strong>To:</strong> dev@company.com</p>
<p><strong>Subject:</strong> Fix the login bug</p>
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:00:00 +0200</p>
</div>
<div class="body">
<p>Hi,</p>
<p>Please fix the login bug by Friday. It is blocking the release.</p>
<p>Priority: high. Let me know if you need anything.</p>
<p>Thanks,<br>Boss</p>
</div>
</body>
</html>

View File

@@ -0,0 +1,49 @@
<!DOCTYPE html>
<html>
<head>
<style>
table { border-collapse: collapse; width: 100%; max-width: 600px; margin: 0 auto; }
td { padding: 8px 12px; border: 1px solid #dddddd; font-size: 12px; color: #444444; }
.header-row { background-color: #003366; color: #ffffff; font-weight: bold; }
.label-col { background-color: #f0f0f0; width: 80px; font-weight: bold; }
.footer-row { font-size: 10px; color: #999999; text-align: center; }
</style>
</head>
<body bgcolor="#eeeeee">
<center>
<table cellpadding="0" cellspacing="0">
<tr class="header-row">
<td colspan="2">Company Internal Update</td>
</tr>
<tr>
<td class="label-col">From:</td>
<td>newsletter@corp.com</td>
</tr>
<tr>
<td class="label-col">Subject:</td>
<td>Q1 Results Update</td>
</tr>
<tr>
<td class="label-col">Date:</td>
<td>Apr 7, 2026</td>
</tr>
<tr>
<td colspan="2">
<table width="100%" cellpadding="10">
<tr>
<td>
<p style="font-size:14px; font-weight:bold;">Dear Team,</p>
<p>Q1 results are in. Revenue up 15% year-over-year.</p>
<p>Please review the attached report and share any feedback by EOW.</p>
</td>
</tr>
</table>
</td>
</tr>
<tr class="footer-row">
<td colspan="2">Confidential — do not forward outside the company.</td>
</tr>
</table>
</center>
</body>
</html>

View File

@@ -0,0 +1,8 @@
<!DOCTYPE html>
<html><body>
<p><strong>From:</strong> alice@co.com</p>
<p><strong>To:</strong> team@co.com</p>
<p><strong>Subject:</strong> Quick update</p>
<p><strong>Date:</strong> Tue, 7 Apr 2026 10:30:00 +0200</p>
<p>The deploy is done. Everything looks good. No issues so far.</p>
</body></html>

View File

@@ -0,0 +1,24 @@
<!DOCTYPE html>
<html><body>
<div class="message-latest">
<p><strong>From:</strong> alice@co.com</p>
<p><strong>Subject:</strong> Re: Re: Deploy plan</p>
<p>Sure, I'll handle the deploy.</p>
</div>
<p>On Mon, Apr 6, 2026 at 3:00 PM, Bob &lt;bob@co.com&gt; wrote:</p>
<blockquote>
<p>From: bob@co.com</p>
<p>Can you handle the deploy?</p>
<p>On Sun, Apr 5, 2026 at 1:00 PM, Alice &lt;alice@co.com&gt; wrote:</p>
<blockquote>
<p>From: alice@co.com</p>
<p>Let's plan the deploy for Monday.</p>
<p>On Sat, Apr 4, 2026 at 11:00 AM, Charlie &lt;charlie@co.com&gt; wrote:</p>
<blockquote>
<p>From: charlie@co.com</p>
<p>We need to schedule the deploy. What day works?</p>
</blockquote>
</blockquote>
</blockquote>
</body></html>

View File

@@ -0,0 +1,3 @@
random text content without any structure
line two with some words
line three and more content here

View File

@@ -0,0 +1,35 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>My Web App</title>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<nav>
<a href="/">Home</a>
<a href="/about">About</a>
<a href="/contact">Contact</a>
</nav>
<main>
<header>
<h1>Welcome to My App</h1>
</header>
<article>
<p>This is a generic web page with no email headers.</p>
<p>It has navigation, main content, and a footer.</p>
</article>
<section>
<h2>Features</h2>
<ul>
<li>Fast</li>
<li>Reliable</li>
<li>Secure</li>
</ul>
</section>
</main>
<footer>
<p>&copy; 2026 My App</p>
</footer>
</body>
</html>

View File

@@ -0,0 +1,15 @@
Meeting notes - April 7, 2026
Attendees: Alice, Bob, Charlie
Discussion points:
- Deploy scheduled for Friday
- Bug fix for login must be completed by Thursday
- Review Q1 numbers before EOW
Action items:
- Alice: fix login bug
- Bob: prepare deploy checklist
- Charlie: send Q1 report
Next meeting: April 14, 2026

171
tests/test_preprocessors.py Normal file
View File

@@ -0,0 +1,171 @@
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
Fixtures are driven by:
tests/fixtures/preprocessors/cases.yaml — test case definitions
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
Run:
pytest tests/test_preprocessors.py -v
# Only detection tests
pytest tests/test_preprocessors.py -v -k detect
# Only preprocess tests
pytest tests/test_preprocessors.py -v -k preprocess
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
import pytest
import yaml
from app.core.langfuse_client import get_langfuse
from app.core.preprocessors import detect_content_type, preprocess
# ── Paths ──────────────────────────────────────────────────────────────
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
_DATA_DIR = _FIXTURES_DIR / "data"
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
# ── Content generators ─────────────────────────────────────────────────
_GENERATORS: dict[str, str] = {
# High ratio of non-printable chars → triggers "unknown" heuristic
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
}
def _load_cases() -> list[dict]:
with _CASES_FILE.open(encoding="utf-8") as f:
return yaml.safe_load(f)["cases"]
def _read_content(case: dict) -> str:
if "generate" in case:
key = case["generate"]
if key not in _GENERATORS:
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
return _GENERATORS[key]
file_path = _DATA_DIR / case["file"]
return file_path.read_text(encoding="utf-8")
# ── Langfuse helper ───────────────────────────────────────────────────
def _lf_score(score_name: str, value: float, comment: str = "") -> None:
lf = get_langfuse()
if lf:
trace = lf.trace(name=f"eval-{score_name}")
lf.score(
trace_id=trace.id,
name=score_name,
value=value,
data_type="NUMERIC",
comment=comment,
)
lf.flush()
# ── Assertion engine ──────────────────────────────────────────────────
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> list[str]:
"""Run all assertions declared in the YAML case. Returns failure messages."""
failures: list[str] = []
if assertions.get("no_html_tags"):
if re.search(r"<[^>]+>", result.clean_text):
failures.append("clean_text still contains HTML tags")
min_len = assertions.get("min_length")
if min_len is not None:
if len(result.clean_text) < min_len:
failures.append(
f"clean_text too short: {len(result.clean_text)} < {min_len}"
)
ratio_lt = assertions.get("compression_ratio_lt")
if ratio_lt is not None and len(raw) > 0:
ratio = len(result.clean_text) / len(raw)
if ratio >= ratio_lt:
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
meta_keys = assertions.get("metadata_keys", [])
for key in meta_keys:
if not result.metadata.get(key):
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
contains = assertions.get("contains")
if contains:
items = [contains] if isinstance(contains, str) else contains
for item in items:
if item not in result.clean_text:
failures.append(f"clean_text missing expected substring: {item!r}")
not_contains = assertions.get("not_contains")
if not_contains:
items = [not_contains] if isinstance(not_contains, str) else not_contains
for item in items:
if item in result.clean_text:
failures.append(f"clean_text contains forbidden substring: {item!r}")
expected_ct = assertions.get("content_type")
if expected_ct and result.content_type != expected_ct:
failures.append(
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
)
return failures
# ── Parametrized: detect ──────────────────────────────────────────────
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
@pytest.mark.parametrize(
"case",
_detect_cases,
ids=[c["id"] for c in _detect_cases],
)
def test_detect(case: dict) -> None:
raw = _read_content(case)
ct = detect_content_type(case["input_filename"], raw)
expected = case["expected_content_type"]
score = 1.0 if ct == expected else 0.0
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
assert ct == expected, (
f"[{case['id']}] {case['description']}: "
f"expected content_type={expected!r}, got {ct!r}"
)
# ── Parametrized: preprocess ──────────────────────────────────────────
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
@pytest.mark.parametrize(
"case",
_preprocess_cases,
ids=[c["id"] for c in _preprocess_cases],
)
def test_preprocess(case: dict) -> None:
raw = _read_content(case)
result = preprocess(case["input_content_type"], raw)
assertions = case.get("assertions", {})
failures = _run_assertions(assertions, result, raw)
assert not failures, (
f"[{case['id']}] {case['description']}{len(failures)} assertion(s) failed:\n"
+ "\n".join(f"{f}" for f in failures)
)