10 Commits

Author SHA1 Message Date
Roberto Musso
e672b58b6f fix(langfuse): remove invalid user_id/session_id kwargs from start_as_current_observation
Langfuse V3 does not accept user_id/session_id on observation-level calls.
Moved to metadata dict in agent_runner, deep_agent, and agent_setup.

refactor(tests): fixture-based pattern for agent_runner_v2 eval tests

- cases.yaml + data/ fixtures under tests/fixtures/agent_runner_v2/
- pytest_generate_tests parametrizes test_eval_runner from YAML
- _resolve_projects() handles symbolic names and inline dicts
- _evaluate_case() centralizes all assertion logic
- --runner-dir CLI option for custom fixture folders

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 00:45:15 +02:00
Roberto Musso
d8add7e8cb feat(local-agent-v2): step 4 — journey produces structured AgentConfig JSON
Replace freeform prompt_template output with validated AgentConfig JSON:
- agent_setup.py: new system prompt (journey_system_v2), AGENT_CONFIG_START/END
  markers, _extract_agent_config() with Pydantic validation, updated handlers
  returning agent_config key; import AgentConfig from schemas
- tests/test_journey_v2.py: 6 unit tests + 5 parametrized LLM eval cases
  following test_agent_runner_v2.py pattern; _run_journey uses
  set_client_executor/clear_client_executor mirroring device_ws
- tests/fixtures/journey_v2/: cases.yaml + email_action.html + email_info.html
- tests/conftest.py: add --journey-dir CLI option; remove S3/plugin fixtures
  (cleanup from microservices migration, already present in working tree)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 00:23:58 +02:00
Roberto Musso
c6c4578f9a fix(tests): migrate eval tests to Langfuse V3 API
lf.trace() and lf.score(trace_id=...) are V2 API removed in V3.

V3 pattern:
  lf.start_as_current_observation(name=...) as context manager → obs
  obs.score(name=..., value=...)
  contextlib.nullcontext() when lf is None so structure stays the same

Updated tests 2.1–2.7 in test_agent_runner_v2.py accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 23:04:24 +02:00
Roberto Musso
3aa0b36a6c fix(langfuse): use compile() instead of .format() for prompt variable injection
Langfuse uses {{variable}} syntax in its prompt management UI, while the
hardcoded fallbacks use {variable} (Python str.format). The previous code
always called .format() which silently failed/errored when a real Langfuse
prompt was fetched.

- langfuse_client.py: add compile_prompt(template, prompt_obj, **vars)
  → uses prompt_obj.compile(**vars) when Langfuse is available
  → falls back to template.format(**vars) when using the hardcoded fallback
- agent_runner.py: replace .format() with compile_prompt() for
  unified_processing (V2 local) and batch_cloud_processing (cloud agent)
- agent_setup.py: replace .format() with compile_prompt() for journey_system

deep_agent.py prompts have no variables, so no change needed there.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 16:49:26 +02:00
Roberto Musso
fa231a3642 feat(local-agent-v2): step 2+3 — unified runner + AgentConfig schema
Step 3 (prerequisite):
- app/schemas.py: add ContentTypeConfig + AgentConfig Pydantic models
- app/models.py: add agent_config (JSON, nullable) to LocalAgentConfig
- alembic migration a3b9c0d1e2f3: ADD COLUMN agent_config

Step 2 (runner refactor):
- Remove _classify_file() and _BATCH_FILE_CLASSIFIER_PROMPT (LLM classification step)
- Add Phase A: detect_content_type + preprocess (zero LLM, per file)
- Add _UNIFIED_PROCESSING_PROMPT (hot-swappable via Langfuse "unified_processing")
- Add helper functions: _format_projects, _format_metadata, _get_extraction_rules,
  _get_no_match_behavior
- Single LLM call per file with tools (classify + extract + create)
- Fix items_created: count create_* tool calls via _tool_calls_out param
- test_agent_runner_v2.py: 10 cases (2.1-2.10) with Langfuse eval scoring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 15:00:32 +02:00
Roberto Musso
d91c98f86d chore(tests): remove Langfuse from all preprocessor tests
I test del preprocessor sono deterministici — nessun LLM coinvolto,
nessuno score da tracciare.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:26:33 +02:00
Roberto Musso
c0619f5c4d fix(tests): move pytest_addoption after __future__ import in conftest
SyntaxError: from __future__ imports must occur at the beginning of the file.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:21:50 +02:00
Roberto Musso
da282229ff refactor(tests): remove redundant filename field
file: serve sia come path da leggere che come nome passato a detect_content_type.
Non c'è motivo di averli separati.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:13:14 +02:00
Roberto Musso
7fa6ad5760 feat(tests): add --preprocess-dir CLI option to pytest
- conftest.py: registra --preprocess-dir via pytest_addoption
- test_preprocessors.py: usa pytest_generate_tests per leggere i casi
  a collection time con accesso a config; _content e _fixtures_dir
  accettano path dinamico

Usage: pytest tests/test_preprocessors.py --preprocess-dir /my/folder

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:59:32 +02:00
Roberto Musso
dcd14220ca refactor(tests): simplify YAML fixture schema and test runner
YAML: rimosse op/description/score_name/assertions block — ora detect/process
come chiave diretta, assertions piatte sullo stesso livello del caso.

Runner: eliminato _run_assertions engine, assertions inline in test_preprocess.
Riduzione da ~170 a ~75 righe totali tra YAML + test.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 11:30:38 +02:00
20 changed files with 1527 additions and 698 deletions

View File

@@ -0,0 +1,31 @@
"""add agent_config to local_agent_configs
Revision ID: a3b9c0d1e2f3
Revises: 9a1f2d0b6c7e
Create Date: 2026-04-07 00:00:00.000000
"""
from __future__ import annotations
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "a3b9c0d1e2f3"
down_revision: Union[str, None] = "9a1f2d0b6c7e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
"local_agent_configs",
sa.Column("agent_config", sa.JSON(), nullable=True),
)
def downgrade() -> None:
op.drop_column("local_agent_configs", "agent_config")

View File

@@ -1,11 +1,11 @@
"""Chatbot Journey — WS-based guided conversation to build an agent prompt_template. """Chatbot Journey — WS-based guided conversation to build an AgentConfig.
The journey is driven entirely through WebSocket frames (no REST endpoints). The journey is driven entirely through WebSocket frames (no REST endpoints).
The device WS handler dispatches ``journey_start`` and ``journey_message`` The device WS handler dispatches ``journey_start`` and ``journey_message``
frames to the functions exported here. frames to the functions exported here.
Journey flow: Journey flow:
1. FE sends ``journey_start`` frame with basic agent config (directory, 1. FE sends ``journey_start`` frame with basic agent info (directory,
data_types, schedule). data_types, schedule).
2. Server creates an in-memory session, sets up a WS executor so the 2. Server creates an in-memory session, sets up a WS executor so the
setup LLM can use file-system tools, does a first directory scrape, setup LLM can use file-system tools, does a first directory scrape,
@@ -13,10 +13,11 @@ Journey flow:
3. FE sends ``journey_message`` frames for each user reply. 3. FE sends ``journey_message`` frames for each user reply.
4. Server appends the user message, calls the LLM (which may read files 4. Server appends the user message, calls the LLM (which may read files
via tools), and sends back a ``journey_reply``. via tools), and sends back a ``journey_reply``.
5. After 3-5 turns the LLM wraps up by emitting a ``prompt_template`` 5. After 3-5 turns the LLM wraps up by emitting an ``AgentConfig`` JSON
block delimited by ``PROMPT_TEMPLATE_START`` / ``PROMPT_TEMPLATE_END``. block delimited by ``AGENT_CONFIG_START`` / ``AGENT_CONFIG_END``.
6. Server parses the block, sends ``journey_reply`` with ``done=True`` 6. Server parses and validates the JSON with Pydantic, sends
and the template. FE stores it locally. ``journey_reply`` with ``done=True`` and the serialised config.
FE stores it locally.
""" """
from __future__ import annotations from __future__ import annotations
@@ -32,8 +33,9 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, Tool
from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.agents.filesystem_agent import FILESYSTEM_TOOLS
from app.config.settings import settings from app.config.settings import settings
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
from app.schemas import AgentConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,9 +43,9 @@ logger = logging.getLogger(__name__)
_SESSION_TTL_SECONDS: int = 1800 # 30 minutes _SESSION_TTL_SECONDS: int = 1800 # 30 minutes
# Sentinel strings used to delimit the LLM-produced prompt_template. # Sentinel strings used to delimit the LLM-produced AgentConfig JSON.
_TEMPLATE_START = "PROMPT_TEMPLATE_START" _CONFIG_START = "AGENT_CONFIG_START"
_TEMPLATE_END = "PROMPT_TEMPLATE_END" _CONFIG_END = "AGENT_CONFIG_END"
# Minimum turns before we consider nudging the LLM to wrap up. # Minimum turns before we consider nudging the LLM to wrap up.
_MIN_TURNS_BEFORE_NUDGE: int = 3 _MIN_TURNS_BEFORE_NUDGE: int = 3
@@ -86,61 +88,76 @@ def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
return s return s
# ── System prompt builder ───────────────────────────────────────────────── # ── System prompt ─────────────────────────────────────────────────────────
_JOURNEY_SYSTEM_PROMPT = """\ _JOURNEY_SYSTEM_PROMPT = """\
You are a friendly assistant helping a freelancer configure a data-extraction agent. You are a friendly assistant helping a freelancer configure a data-extraction agent.
Your job is to understand exactly what data the user wants to extract from their Your job is to understand what files the user has in their directory and produce a
local directory and produce a detailed prompt_template that a separate AI will use structured AgentConfig JSON that the extraction agent will use as its instruction set.
as its instruction set.
The extraction agent already has this base behaviour built in:
- Reads each file using file-system tools.
- Creates records (tasks, notes, timelines, projects) via CRUD tools.
- Sets isAiSuggested=1 on every new record.
- Only extracts data explicitly present in the files — it never invents information.
The user's custom prompt is appended AFTER this base behaviour, so focus on
what to look for and how to map it — not on the general extraction mechanics.
You have access to file-system tools to explore the user's directory: You have access to file-system tools to explore the user's directory:
- list_directory: to see folder structure - list_directory: see folder structure and file names
- read_file_content: to peek at file contents - read_file_content: peek at a file's content
- get_file_metadata: to check file info - get_file_metadata: check file size, extension, dates
The user's configured directory is: {directory} The user's configured directory is: {directory}
Target data types: {data_types} Target data types: {data_types}
IMPORTANT — project assignment is handled automatically by the main agent runner ## Your process
before the custom prompt is ever used. You MUST NOT ask the user about projects,
projectId, or how to link records to projects. Never include projectId logic or
project creation instructions in the generated prompt_template.
Start by exploring the directory to understand its structure. Then ask concise, ### Step 1 — Explore the directory
focused questions one at a time. Cover these topics (not necessarily in this order): Use list_directory and read_file_content to understand what types of files are present
1. The type and format of the source content (confirmed by your exploration). (HTML emails, plain-text documents, CSVs, etc.).
2. How fields should be mapped (e.g. filename → task title).
3. Priority or status rules (e.g. "urgent" keyword → high priority).
4. Any special handling, date extraction, or exclusions.
Once you reach 90% confidence, output the final prompt_template between these exact ### Step 2 — Identify content types
markers on their own lines: For each distinct file type found, decide:
- A short id (e.g. "email_html", "plain_text", "csv")
- Which preprocessing handler to use: "email_html" for HTML emails, "generic" for everything else
- A human-readable label and optional detection_hint
{template_start} ### Step 3 — Ask focused questions (one at a time)
<the complete extraction prompt here> Cover these topics based on what you discovered:
{template_end} 1. How to map content to entity types (task / note / timeline entry)
2. Field mapping rules (e.g. email Subject → task title, filename → note title)
3. Priority or status rules (e.g. "urgent" in subject → high priority)
4. Date extraction (e.g. "by Friday" → dueDate)
5. Exclusion rules (e.g. skip newsletters, skip files with no project match)
The prompt_template must be a self-contained instruction for an AI that reads files ### Step 4 — Produce the AgentConfig JSON
and must perform CRUD operations using tools to create records. It should specify: Once you are ≥ 90% confident, output the final config between these exact markers
- What entity types to create (tasks, notes, timelines) — never projects. (each on its own line):
- How to map file content to record fields (camelCase: title, status, priority,
dueDate, content, etc.) — never include projectId. {config_start}
- That isAiSuggested must be set to 1 on every new record. {{
- Concrete examples of mappings based on what you discovered in the directory. "content_types": [
{{
"id": "email_html",
"label": "Email HTML",
"detection_hint": "HTML file with From/To/Subject headers",
"preprocessing": "email_html",
"extraction_prompt": "Detailed extraction instructions for this content type..."
}}
],
"global_rules": [
"If the file cannot be matched to any project, do not create any entity."
],
"data_types": {data_types_json}
}}
{config_end}
## Rules for the extraction_prompt field
- Describe when to create a task vs note vs timeline entry (be specific and concrete)
- Include field mapping rules based on what you found in the directory
- Include priority/status/date rules if applicable
- Do NOT include projectId logic — the runner handles project assignment automatically
- Do NOT mention isAiSuggested — the runner always sets it to 1
## Constraints
- Never ask about projects, projectId, or how to link records to projects
- Never include projectId or project creation logic in the generated config
- Keep asking questions until ≥ 90% confident, then output the JSON immediately
{existing_section}\ {existing_section}\
Keep asking clarifying questions until you are at least 90% confident you have
enough information to generate an accurate prompt_template. Once you reach that
confidence level, stop asking and produce the final template immediately.
Begin by exploring the directory, then ask your first question.\ Begin by exploring the directory, then ask your first question.\
""" """
@@ -148,38 +165,53 @@ Begin by exploring the directory, then ask your first question.\
def _build_system_prompt( def _build_system_prompt(
directory: str, directory: str,
data_types: list[str], data_types: list[str],
existing_template: str | None = None, existing_config: str | None = None,
) -> tuple[str, Any]: ) -> tuple[str, Any]:
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``.""" """Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
existing_section = ( existing_section = (
f"\nThe user already has the following prompt_template — refine it based on their answers:\n" "\nThe user already has the following AgentConfig — refine it based on their answers:\n"
f"---\n{existing_template}\n---\n" f"```json\n{existing_config}\n```\n"
if existing_template if existing_config
else "" else ""
) )
template, prompt_obj = get_prompt_or_fallback( template, prompt_obj = get_prompt_or_fallback(
"journey_system", _JOURNEY_SYSTEM_PROMPT "journey_system", _JOURNEY_SYSTEM_PROMPT
) )
compiled = template.format( compiled = compile_prompt(
template,
prompt_obj,
directory=directory, directory=directory,
data_types=", ".join(data_types), data_types=", ".join(data_types),
template_start=_TEMPLATE_START, data_types_json=json.dumps(data_types),
template_end=_TEMPLATE_END, config_start=_CONFIG_START,
config_end=_CONFIG_END,
existing_section=existing_section, existing_section=existing_section,
) )
return compiled, prompt_obj return compiled, prompt_obj
# ── Template extraction ─────────────────────────────────────────────────── # ── AgentConfig extraction ────────────────────────────────────────────────
def _extract_template(text: str) -> str | None: def _extract_agent_config(text: str) -> str | None:
"""Return the text between PROMPT_TEMPLATE_START and PROMPT_TEMPLATE_END, or None.""" """Return validated AgentConfig JSON string from between markers, or None.
if _TEMPLATE_START not in text or _TEMPLATE_END not in text:
Parses the JSON with Pydantic to ensure it conforms to the schema before
returning. Returns None if markers are absent or JSON is invalid.
"""
if _CONFIG_START not in text or _CONFIG_END not in text:
return None
start_idx = text.index(_CONFIG_START) + len(_CONFIG_START)
end_idx = text.index(_CONFIG_END)
raw = text[start_idx:end_idx].strip()
if not raw:
return None
try:
parsed = AgentConfig.model_validate_json(raw)
return parsed.model_dump_json()
except Exception as exc:
logger.warning("agent_setup: failed to parse AgentConfig JSON: %s", exc)
return None return None
start_idx = text.index(_TEMPLATE_START) + len(_TEMPLATE_START)
end_idx = text.index(_TEMPLATE_END)
return text[start_idx:end_idx].strip() or None
# ── LLM call with tool support ─────────────────────────────────────────── # ── LLM call with tool support ───────────────────────────────────────────
@@ -233,8 +265,7 @@ async def _call_llm_with_tools(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name="journey-setup", name="journey-setup",
user_id=user_id or None, metadata={"user_id": user_id or None, "session_id": session_id or None},
session_id=session_id or None,
input=history[-1]["content"] if history else "", input=history[-1]["content"] if history else "",
) )
if lf else None if lf else None
@@ -316,12 +347,12 @@ async def handle_journey_start(
agent_type = frame.get("agent_type", "local") agent_type = frame.get("agent_type", "local")
directory = frame.get("directory", "") directory = frame.get("directory", "")
data_types = frame.get("data_types", []) data_types = frame.get("data_types", [])
existing_template = frame.get("existing_template") existing_config = frame.get("existing_config")
# Use the session_id provided by the FE so the reply matches the # Use the session_id provided by the FE so the reply matches the
# listener key; fall back to a generated one if absent. # listener key; fall back to a generated one if absent.
session_id = frame.get("session_id") or str(uuid.uuid4()) session_id = frame.get("session_id") or str(uuid.uuid4())
system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_template) system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_config)
session = JourneySession( session = JourneySession(
session_id=session_id, session_id=session_id,
@@ -333,10 +364,8 @@ async def handle_journey_start(
langfuse_prompt=langfuse_prompt, langfuse_prompt=langfuse_prompt,
) )
# The LLM will explore the directory using FILESYSTEM_TOOLS via the # Seed with an initial user message — some providers require at least one
# ws_context executor (already set by the WS handler before calling us). # user/input message to be present.
# Seed with an initial user message — some providers (e.g. GitHub Copilot)
# require at least one user/input message to be present.
seed_history: list[dict[str, Any]] = [ seed_history: list[dict[str, Any]] = [
{"role": "user", "content": "Hi, I'm ready to set up my agent. Please explore my directory and ask me your first question."}, {"role": "user", "content": "Hi, I'm ready to set up my agent. Please explore my directory and ask me your first question."},
] ]
@@ -360,14 +389,14 @@ async def handle_journey_start(
directory, directory,
) )
# Check if the LLM produced the template on the first turn (unlikely but possible). # Check if the LLM produced the config on the first turn (unlikely but possible).
prompt_template = _extract_template(ai_reply) agent_config = _extract_agent_config(ai_reply)
done = prompt_template is not None done = agent_config is not None
display_message = ai_reply display_message = ai_reply
if done: if done:
display_message = ( display_message = (
ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() ai_reply[: ai_reply.index(_CONFIG_START)].strip()
or "Here is your agent configuration. You can save it or continue refining." or "Here is your agent configuration. You can save it or continue refining."
) )
_sessions.pop(session_id, None) _sessions.pop(session_id, None)
@@ -377,7 +406,7 @@ async def handle_journey_start(
"session_id": session_id, "session_id": session_id,
"message": display_message, "message": display_message,
"done": done, "done": done,
"prompt_template": prompt_template, "agent_config": agent_config,
} }
@@ -400,7 +429,7 @@ async def handle_journey_message(
"session_id": session_id, "session_id": session_id,
"message": "Journey session not found or expired. Please start a new setup.", "message": "Journey session not found or expired. Please start a new setup.",
"done": True, "done": True,
"prompt_template": None, "agent_config": None,
} }
# Append user turn. # Append user turn.
@@ -418,18 +447,17 @@ async def handle_journey_message(
session.history.append({"role": "assistant", "content": ai_reply}) session.history.append({"role": "assistant", "content": ai_reply})
# Check if the LLM produced the final template. # Check if the LLM produced the final config.
prompt_template = _extract_template(ai_reply) agent_config = _extract_agent_config(ai_reply)
done = prompt_template is not None done = agent_config is not None
# If the LLM didn't produce a template, nudge it once it has asked enough # If the LLM didn't produce a config, nudge it once it hits the hard safety cap.
# questions (>= _MIN_TURNS_BEFORE_NUDGE) or hits the hard safety cap.
if not done: if not done:
turns = sum(1 for t in session.history if t["role"] == "user") turns = sum(1 for t in session.history if t["role"] == "user")
if turns >= _MAX_TURNS: if turns >= _MAX_TURNS:
nudge_content = ( nudge_content = (
"[System: You have enough information. Please generate the final " "[System: You have enough information. Please generate the final "
f"prompt_template now, wrapped in {_TEMPLATE_START} / {_TEMPLATE_END} markers.]" f"AgentConfig JSON now, wrapped in {_CONFIG_START} / {_CONFIG_END} markers.]"
) )
session.history.append({"role": "user", "content": nudge_content}) session.history.append({"role": "user", "content": nudge_content})
@@ -443,16 +471,16 @@ async def handle_journey_message(
) )
session.history.append({"role": "assistant", "content": nudge_reply}) session.history.append({"role": "assistant", "content": nudge_reply})
prompt_template = _extract_template(nudge_reply) agent_config = _extract_agent_config(nudge_reply)
if prompt_template is not None: if agent_config is not None:
done = True done = True
ai_reply = nudge_reply ai_reply = nudge_reply
display_message = ai_reply display_message = ai_reply
if done: if done:
display_message = ( display_message = (
ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() ai_reply[: ai_reply.index(_CONFIG_START)].strip()
if _TEMPLATE_START in ai_reply if _CONFIG_START in ai_reply
else "Here is your agent configuration. You can save it or continue refining." else "Here is your agent configuration. You can save it or continue refining."
) )
_sessions.pop(session_id, None) _sessions.pop(session_id, None)
@@ -463,5 +491,5 @@ async def handle_journey_message(
"session_id": session_id, "session_id": session_id,
"message": display_message, "message": display_message,
"done": done, "done": done,
"prompt_template": prompt_template, "agent_config": agent_config,
} }

View File

@@ -2,12 +2,12 @@
Drives two agent types: Drives two agent types:
* **Local directory agent** — two-step execution per file: * **Local directory agent** — V2 unified flow per file:
Step 1 (Classification) uses code to fetch all projects and asks the LLM Phase A (Detect + Preprocess, zero LLM): Python detects the content type
to identify which project the file belongs to and which domains are relevant. and strips markup/noise, producing clean text + metadata.
Step 2 (Processing) fetches existing entities for that project/domains via Phase B (Single LLM call with tools): the LLM identifies the project,
code and runs an LLM with tools — existing data in context enforces checks for duplicates via list_* tools, and creates/updates records.
update-first naturally. ``items_created`` is counted from ``create_*`` tool calls.
* **Cloud connector agent** — fetches data from third-party APIs (Gmail, * **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron. Teams, Outlook) and pushes extracted items to Electron.
@@ -29,6 +29,7 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import os
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
@@ -44,8 +45,9 @@ from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.config.settings import settings from app.config.settings import settings
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm from app.core.llm import get_llm
from app.core.preprocessors import detect_content_type, preprocess
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
@@ -81,83 +83,38 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"timelines": TIMELINE_TOOLS, "timelines": TIMELINE_TOOLS,
} }
# ── Step 1: Classification prompt ───────────────────────────────────────── # ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ──
_DOMAIN_DESCRIPTIONS: dict[str, str] = { _UNIFIED_PROCESSING_PROMPT = """\
"tasks": (
"Action items, to-dos, deliverables — anything that describes work to be done, "
"assigned to someone, or tracked with a due date or status."
),
"notes": (
"Documentation, meeting notes, summaries, reference material — "
"written content meant to be read and referenced rather than acted on."
),
"timelines": (
"Project milestones, deadlines, scheduled events — "
"specific dates that mark a point in the progress of a project."
),
"projects": (
"High-level project entities — only relevant if the file clearly introduces "
"a new project or updates the scope of an existing one."
),
}
_BATCH_FILE_CLASSIFIER_PROMPT = """\
You are a file classifier for a freelance project management tool.
Your job is to match a file to an existing project and identify which data domains to extract.
## Project matching rules (STRICT — follow in order)
1. Search the file content for any mention of a project name, client name, acronym, or topic
that overlaps with the existing projects listed below.
2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough.
3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort
when the file has zero meaningful connection to any listed project.
4. When in doubt, pick the closest match from the list.
## Response format
Respond ONLY with a JSON object — no markdown, no explanation:
{{"project_id": "<exact id from the list below, or new>", "new_project_name": "<concise 2-5 word name, only when project_id is new>", "domains": ["tasks", "notes"]}}
## Domain definitions (only consider domains in the allowed list)
{domain_definitions}
## Existing projects
{projects_list}
"""
# ── Step 2: Processing prompt ─────────────────────────────────────────────
_BATCH_PROCESSING_PROMPT = """\
You are a data extraction assistant for a freelance project management tool. You are a data extraction assistant for a freelance project management tool.
Your task: extract structured data from the file content and persist it using the available tools. ## Your process (follow this exact order)
## Mandatory process — follow this order for EVERY item you extract ### 1. Identify the project
File: {filename}
{metadata_section}
1. READ the existing records listed below for the relevant domain. Existing projects:
2. SEARCH for a match by title, topic, or semantic similarity. {projects_list}
3. If a match exists → call the update_* tool with the existing record's id.
4. If no match exists → call the create_* tool and set isAiSuggested=1.
NEVER call create_* without first checking the existing records. Match this file to an existing project using the filename and content clues.
NEVER duplicate a record that already exists under a different wording. If no project matches, {no_match_behavior}.
## Existing records (source of truth) ### 2. Check existing records
Once you identify the project, use list_tasks / list_notes / list_timelines
(filtered by projectId) to see what already exists.
NEVER create a record that already exists under the same or similar title.
{existing_context} ### 3. Extract and create / update
{extraction_rules}
## Context ### Rules
- Set isAiSuggested=1 on every new record.
Project: {project_context} - Set projectId on every record (use the id from the project list above).
Domains to extract: {data_types} - Update existing records when a match is found by title or topic.
- Do NOT invent data — only extract what is clearly stated in the content.
{custom_prompt_section} - Target entity types: {data_types}.
{global_rules}
""" """
# ── Cloud processing prompt (kept separate for cloud agent) ─────────────── # ── Cloud processing prompt (kept separate for cloud agent) ───────────────
@@ -273,8 +230,13 @@ async def _run_agent_with_tools(
user_id: str = "", user_id: str = "",
langfuse_prompt: Any = None, langfuse_prompt: Any = None,
agent_name: str = "batch-agent", agent_name: str = "batch-agent",
_tool_calls_out: list[str] | None = None,
) -> str: ) -> str:
"""Run an LLM agent with tool-calling, returning the final text response.""" """Run an LLM agent with tool-calling, returning the final text response.
If *_tool_calls_out* is provided, the name of every tool called during the
run is appended to it (used by the caller to count ``create_*`` calls).
"""
lf = get_langfuse() lf = get_langfuse()
llm = get_llm() llm = get_llm()
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
@@ -289,7 +251,7 @@ async def _run_agent_with_tools(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name=agent_name, name=agent_name,
user_id=user_id or None, metadata={"user_id": user_id} if user_id else None,
input=user_message, input=user_message,
) )
if lf else None if lf else None
@@ -332,6 +294,9 @@ async def _run_agent_with_tools(
json.dumps(call_args, ensure_ascii=True)[:800], json.dumps(call_args, ensure_ascii=True)[:800],
) )
if _tool_calls_out is not None:
_tool_calls_out.append(call_name)
tool_fn = tool_map.get(call_name) tool_fn = tool_map.get(call_name)
if tool_fn is None: if tool_fn is None:
tool_output = f"Unknown tool: {call_name}" tool_output = f"Unknown tool: {call_name}"
@@ -523,99 +488,66 @@ def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
return f"Existing {domain}:\n" + "\n".join(lines) return f"Existing {domain}:\n" + "\n".join(lines)
# ── Step 1: LLM file classifier ─────────────────────────────────────────── # ── V2 helper functions ───────────────────────────────────────────────────
async def _classify_file( def _format_projects(projects: list[dict]) -> str:
file_path: str, """Format the project list for the unified system prompt."""
file_content: str, if not projects:
projects: list[dict], return " (no projects yet)"
config_data_types: list[str], lines: list[str] = []
) -> tuple[str, list[str], str | None]: for p in projects:
"""Call the LLM to classify a file by project and relevant domains.
Returns ``(project_id_or_"new", domains, new_project_name_or_None)``.
- ``project_id`` is an existing project UUID, or ``"new"`` when no match found.
- ``new_project_name`` is only set when ``project_id == "new"``.
Falls back to ``("new", config_data_types, None)`` on any error.
"""
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
if not file_content.strip():
return fallback
valid_project_ids = {p["id"] for p in projects}
def _fmt_project(p: dict) -> str:
summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip() summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip()
summary_part = f"{summary[:100]}" if summary else "" summary_part = f"{summary[:100]}" if summary else ""
return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}" lines.append(
f" - id={p['id']} | name={p.get('name', '')} | "
f"status={p.get('status', '')}{summary_part}"
)
return "\n".join(lines)
projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)"
domain_definitions = "\n".join( def _format_metadata(metadata: dict) -> str:
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}" """Format preprocessor metadata as a compact context block."""
for d in config_data_types if not metadata:
if d in _DOMAIN_DESCRIPTIONS return ""
parts: list[str] = []
for key in ("subject", "from", "to", "date"):
if metadata.get(key):
parts.append(f"{key.capitalize()}: {metadata[key]}")
# any remaining keys
for key, val in metadata.items():
if key not in ("subject", "from", "to", "date") and val:
parts.append(f"{key}: {val}")
return "\n".join(parts)
def _get_extraction_rules(agent_config: dict, content_type: str) -> str:
"""Return the extraction_prompt for *content_type* from *agent_config*.
Falls back to a generic instruction when the type is not configured.
"""
for ct in agent_config.get("content_types", []):
if ct.get("id") == content_type:
prompt = ct.get("extraction_prompt", "").strip()
if prompt:
return prompt
return (
"Extract relevant information as tasks (action items), notes "
"(informational content), or timelines (dated events)."
) )
step1_template, step1_prompt_obj = get_prompt_or_fallback(
"batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT
)
system = step1_template.format(
domain_definitions=domain_definitions,
projects_list=projects_list,
)
lf = get_langfuse() def _get_no_match_behavior(agent_config: dict) -> str:
llm = get_llm() """Derive the 'no project match' instruction from global_rules."""
classifier_messages = [ rules = agent_config.get("global_rules", [])
SystemMessage(content=system), for rule in rules:
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), lower = rule.lower()
] if "no project" in lower or "no match" in lower or "skip" in lower:
try: return rule
if lf: return "create a new project with a concise name derived from the file content"
with lf.start_as_current_observation(
as_type="generation",
name="step1-classifier",
model=settings.LLM_ROUTER_MODEL,
prompt=step1_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
parsed = json.loads(raw.strip())
raw_project_id: str = str(parsed.get("project_id") or "new")
# Reject hallucinated UUIDs — only accept ids that exist in the fetched list.
project_id = raw_project_id if raw_project_id in valid_project_ids else "new"
new_project_name: str | None = (
str(parsed["new_project_name"]).strip() or None
if project_id == "new" and parsed.get("new_project_name")
else None
)
domains: list[str] = [
d for d in parsed.get("domains", [])
if d in config_data_types
]
if not domains:
domains = list(config_data_types)
return project_id, domains, new_project_name
except Exception as exc:
logger.warning(
"agent_runner: step1 classification failed for %r: %s", file_path, exc
)
return fallback
# ── Local agent runner (two-step per file) ──────────────────────────────── # ── Local agent runner (V2 — unified per-file flow) ───────────────────────
async def run_local_agent( async def run_local_agent(
@@ -625,16 +557,17 @@ async def run_local_agent(
device_mgr: DeviceConnectionManager, device_mgr: DeviceConnectionManager,
run_context: dict | None = None, run_context: dict | None = None,
) -> None: ) -> None:
"""Execute a local directory agent run using a two-step approach per file. """Execute a local directory agent run — V2 unified flow.
Step 1 — Classification (code + 1 LLM call per file, no tools): Phase A — Detect + Preprocess (zero LLM, per file):
Code scans directories and fetches all projects via WS. Python detects the content type from filename + content patterns and
For each file, LLM identifies the project and relevant domains. runs the appropriate handler (e.g. email_html) to produce clean text
and structured metadata.
Step 2 — Processing (code + 1 LLM call per file, with tools): Phase B — Single LLM call with tools (per file):
Code fetches existing entities for the identified project/domains. One LLM call handles project identification, duplicate checking, and
LLM receives file content + existing entities in context and uses record creation/update. ``create_*`` tool calls are counted to
tools to update existing records or create new ones. produce the accurate ``items_created`` metric.
""" """
run_id = run_log.id run_id = run_log.id
agent_id = (run_context or {}).get("agent_id") or config.id agent_id = (run_context or {}).get("agent_id") or config.id
@@ -669,12 +602,8 @@ async def run_local_agent(
errors: list[str] = [] errors: list[str] = []
items_processed = 0 items_processed = 0
items_created = 0 items_created = 0
agent_config: dict = config.agent_config or {}
custom_section = ( processing_tools = _build_processing_tools(config.data_types)
f"User instructions:\n{config.prompt_template}"
if config.prompt_template
else ""
)
try: try:
# ── Code: scan directories ─────────────────────────────────── # ── Code: scan directories ───────────────────────────────────
@@ -694,114 +623,82 @@ async def run_local_agent(
# ── Code: fetch all projects once ──────────────────────────── # ── Code: fetch all projects once ────────────────────────────
projects = await _fetch_projects() projects = await _fetch_projects()
projects_block = _format_projects(projects)
# Prompt template + Langfuse version linking (hot-swappable from UI).
unified_template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
for file_path in file_paths: for file_path in file_paths:
try: try:
# Read file content via code. # ── Phase A: read + detect + preprocess ─────────────
file_result = await execute_on_client( file_result = await execute_on_client(
action="read_file_content", data={"path": file_path} action="read_file_content", data={"path": file_path}
) )
file_content: str = file_result.get("content", "") raw_content: str = file_result.get("content", "")
if not file_content: if not raw_content.strip():
logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path) logger.debug(
"agent_runner: run=%s skipping empty file %r", run_id, file_path
)
continue continue
items_processed += 1 items_processed += 1
filename = os.path.basename(file_path)
content_type = detect_content_type(filename, raw_content)
preprocessed = preprocess(content_type, raw_content)
# Step 1 — classify file.
project_id, domains, new_project_name = await _classify_file(
file_path=file_path,
file_content=file_content,
projects=projects,
config_data_types=config.data_types,
)
logger.info( logger.info(
"agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s", "agent_runner: run=%s file=%r content_type=%s clean_len=%d",
run_id, run_id, file_path, content_type, len(preprocessed.clean_text),
file_path,
project_id,
new_project_name,
domains,
) )
# Step 2 — resolve project_id via CODE, then fetch entities. # ── Phase B: single LLM call ─────────────────────────
# Project creation is NEVER delegated to the Step 2 LLM. extraction_rules = _get_extraction_rules(agent_config, content_type)
if project_id == "new": no_match_behavior = _get_no_match_behavior(agent_config)
proj_name = new_project_name or "Untitled Project" global_rules_lines = "\n".join(
try: f"- {r}" for r in agent_config.get("global_rules", [])
proj_result = await execute_on_client(
action="insert",
table="projects",
data={"name": proj_name, "clientId": None},
) )
created = proj_result.get("row", {}) metadata_section = _format_metadata(preprocessed.metadata)
effective_project_id = created.get("id", "standalone")
# Add to local list so subsequent files can match it. system_prompt = compile_prompt(
if "id" in created: unified_template,
projects.append(created) prompt_obj,
logger.info( filename=filename,
"agent_runner: run=%s created project %r id=%s", metadata_section=metadata_section,
run_id, proj_name, effective_project_id, projects_list=projects_block,
) no_match_behavior=no_match_behavior,
except Exception as exc: extraction_rules=extraction_rules,
logger.warning( global_rules=global_rules_lines,
"agent_runner: run=%s failed to create project %r: %s", data_types=", ".join(config.data_types),
run_id, proj_name, exc,
)
effective_project_id = "standalone"
proj_name = "unknown"
project_context = (
f"Project: {proj_name} (id: {effective_project_id}). "
"Always set projectId to this id on every record you create."
)
else:
effective_project_id = project_id
proj = next((p for p in projects if p["id"] == project_id), None)
proj_name = proj.get("name", project_id) if proj else project_id
project_context = (
f"Project: {proj_name} (id: {project_id}). "
"Always set projectId to this id on every record you create."
) )
# "projects" domain is never passed to Step 2 — handled above in code.
domains = [d for d in domains if d != "projects"]
existing_blocks: list[str] = []
for domain in domains:
rows = await _fetch_domain_entities(domain, effective_project_id)
existing_blocks.append(_format_entities_for_context(domain, rows))
existing_context = "\n\n".join(existing_blocks)
step2_template, step2_prompt_obj = get_prompt_or_fallback(
"batch_processing", _BATCH_PROCESSING_PROMPT
)
system_prompt = step2_template.format(
existing_context=existing_context,
project_context=project_context,
data_types=", ".join(domains),
custom_prompt_section=custom_section,
)
processing_tools = _build_processing_tools(domains)
result_text = await _run_agent_with_tools(
system_prompt=system_prompt,
user_message = ( user_message = (
f"Process this file and extract relevant information.\n\n" f"Process this file and extract relevant information.\n\n"
f"File: {file_path}\n\nContent:\n{file_content}" f"File: {file_path}\n\n"
), f"Content:\n{preprocessed.clean_text}"
)
file_tool_calls: list[str] = []
result_text = await _run_agent_with_tools(
system_prompt=system_prompt,
user_message=user_message,
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
langfuse_prompt=step2_prompt_obj, langfuse_prompt=prompt_obj,
agent_name="step2-processor", agent_name="unified-processor",
_tool_calls_out=file_tool_calls,
) )
file_created = sum(
1 for name in file_tool_calls if name.startswith("create_")
)
items_created += file_created
logger.info( logger.info(
"agent_runner: run=%s file=%r result=%s", "agent_runner: run=%s file=%r created=%d result=%s",
run_id, run_id, file_path, file_created, result_text[:200],
file_path,
result_text[:200],
) )
except Exception as exc: except Exception as exc:
@@ -833,10 +730,11 @@ async def run_local_agent(
errors=errors, errors=errors,
) )
logger.info( logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d", "agent_runner: run=%s done status=%s processed=%d created=%d errors=%d",
run_id, run_id,
final_status, final_status,
items_processed, items_processed,
items_created,
len(errors), len(errors),
) )
@@ -997,7 +895,9 @@ async def run_cloud_agent(
cloud_template, cloud_prompt_obj = get_prompt_or_fallback( cloud_template, cloud_prompt_obj = get_prompt_or_fallback(
"batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT "batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT
) )
processing_prompt = cloud_template.format( processing_prompt = compile_prompt(
cloud_template,
cloud_prompt_obj,
data_types=", ".join(config.data_types), data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.", project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})", file_list=f"Message from {config.provider} (id: {msg.id})",

View File

@@ -615,8 +615,7 @@ async def _run_single_agent(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name=agent_name, name=agent_name,
user_id=user_id, metadata={"user_id": user_id, "session_id": trace_id},
session_id=trace_id,
input=message, input=message,
) )
if lf else None if lf else None
@@ -740,8 +739,7 @@ async def _run_single_agent_stream(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name=f"{agent_name}-stream", name=f"{agent_name}-stream",
user_id=user_id, metadata={"user_id": user_id, "session_id": trace_id},
session_id=trace_id,
input=message, input=message,
) )
if lf else None if lf else None

View File

@@ -80,10 +80,11 @@ def get_langfuse() -> Any | None:
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]: def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error. """Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
Returns ``(prompt_text, prompt_obj_or_None)``. Returns ``(raw_template, prompt_obj_or_None)``.
* ``prompt_text`` — the raw template string (variables not yet substituted). * ``raw_template`` — the uncompiled template string. Do NOT call ``.format()``
Callers perform variable substitution with Python's ``.format()``. on it directly; use :func:`compile_prompt` instead so the correct variable
syntax is applied (``{{var}}`` for Langfuse, ``{var}`` for the fallback).
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is * ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
unavailable / the fetch failed. Pass this to generation observations so unavailable / the fetch failed. Pass this to generation observations so
Langfuse links the generation to the exact prompt version in the UI. Langfuse links the generation to the exact prompt version in the UI.
@@ -102,6 +103,38 @@ def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
return fallback, None return fallback, None
def compile_prompt(template: str, prompt_obj: Any, **variables: Any) -> str:
"""Compile *template* with *variables*, choosing the right syntax.
* When *prompt_obj* is a real Langfuse prompt object, calls
``prompt_obj.compile(**variables)`` which handles ``{{variable}}``
substitution as defined in the Langfuse UI.
* When *prompt_obj* is ``None`` (Langfuse unavailable or fetch failed),
falls back to ``template.format(**variables)`` which handles the
``{variable}`` syntax used in the hardcoded fallback strings.
This keeps callers oblivious to which syntax is in use.
"""
if prompt_obj is not None:
try:
compiled = prompt_obj.compile(**variables)
# compile() returns a string for text prompts.
if isinstance(compiled, str):
return compiled
# Chat prompts return a list of dicts — join text parts.
if isinstance(compiled, list):
return "\n".join(
m.get("content", "") for m in compiled if isinstance(m, dict)
)
except Exception as exc:
logger.warning(
"langfuse: compile failed for prompt %r: %s — falling back to .format()",
getattr(prompt_obj, "name", "?"),
exc,
)
return template.format(**variables)
def extract_usage(response: Any) -> dict[str, int]: def extract_usage(response: Any) -> dict[str, int]:
"""Extract token usage from a LangChain AI message into Langfuse format.""" """Extract token usage from a LangChain AI message into Langfuse format."""
meta = getattr(response, "usage_metadata", None) meta = getattr(response, "usage_metadata", None)

View File

@@ -296,6 +296,7 @@ class LocalAgentConfig(Base):
directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list) directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list) data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="") prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
agent_config: Mapped[dict | None] = mapped_column(JSON, nullable=True)
file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list) file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *") schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *")
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)

View File

@@ -273,6 +273,27 @@ class WsFloatingDomain(BaseModel):
domain: WsDomain domain: WsDomain
# ── Agent Config V2 ───────────────────────────────────────────────────
class ContentTypeConfig(BaseModel):
"""Per-type extraction config produced by the journey chatbot."""
id: str
label: str = ""
detection_hint: str = ""
preprocessing: str = "generic" # handler name: "email_html", "plain_text", ...
extraction_prompt: str
class AgentConfig(BaseModel):
"""Structured agent configuration (replaces freeform prompt_template)."""
content_types: list[ContentTypeConfig] = []
global_rules: list[str] = []
data_types: list[str] = []
# ── Agent Catalog ───────────────────────────────────────────────────── # ── Agent Catalog ─────────────────────────────────────────────────────
class AgentCatalogItem(BaseModel): class AgentCatalogItem(BaseModel):

View File

@@ -6,26 +6,21 @@ a per-test session, and a FastAPI ``TestClient`` wired to use it.
from __future__ import annotations from __future__ import annotations
import json
import os
import time import time
import uuid import uuid
from collections.abc import AsyncGenerator, Generator from collections.abc import AsyncGenerator, Generator
from unittest.mock import patch
import boto3
import pytest import pytest
import pytest_asyncio import pytest_asyncio
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from jose import jwt from jose import jwt
from moto import mock_aws
from sqlalchemy import StaticPool, event from sqlalchemy import StaticPool, event
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.config.settings import settings from app.config.settings import settings
from app.db import Base, get_session from app.db import Base, get_session
from app.main import app from app.main import app
from app.models import Plugin, Subscription, User from app.models import Subscription, User
# ── Fixed test user IDs (one per tier) ─────────────────────────────── # ── Fixed test user IDs (one per tier) ───────────────────────────────
@@ -109,79 +104,6 @@ def client(db_session: AsyncSession) -> Generator[TestClient, None, None]: # n
app.dependency_overrides.pop(get_session, None) app.dependency_overrides.pop(get_session, None)
# ── Seed data helpers ────────────────────────────────────────────────
_SEED_PLUGINS = [
Plugin(
id="plugin-github-sync",
name="GitHub Sync",
description="Sync tasks with GitHub Issues and pull requests.",
version="1.0.0",
author_name="Adiuva",
category="productivity",
price_cents=0,
permissions=json.dumps(["read:tasks", "write:tasks"]),
status="approved",
s3_package_key="plugins/plugin-github-sync/1.0.0/package.zip",
install_count=0,
avg_rating=0.0,
),
Plugin(
id="plugin-slack-notify",
name="Slack Notifier",
description="Post task and timeline updates to Slack channels.",
version="1.2.0",
author_name="Adiuva",
category="communication",
price_cents=499,
permissions=json.dumps(["read:tasks", "read:timelines"]),
status="approved",
s3_package_key="plugins/plugin-slack-notify/1.2.0/package.zip",
install_count=0,
avg_rating=0.0,
),
Plugin(
id="plugin-time-tracker",
name="Time Tracker",
description="Track time spent on tasks with automatic reporting.",
version="0.9.1",
author_name="Third Party",
category="productivity",
price_cents=999,
permissions=json.dumps(["read:tasks", "write:tasks"]),
status="approved",
s3_package_key="plugins/plugin-time-tracker/0.9.1/package.zip",
install_count=0,
avg_rating=0.0,
),
]
@pytest_asyncio.fixture
async def seed_plugins(db_session: AsyncSession) -> list[Plugin]:
"""Insert the 3 default approved plugins and return them."""
plugins = []
for template in _SEED_PLUGINS:
p = Plugin(
id=template.id,
name=template.name,
description=template.description,
version=template.version,
author_name=template.author_name,
category=template.category,
price_cents=template.price_cents,
permissions=template.permissions,
status=template.status,
s3_package_key=template.s3_package_key,
install_count=template.install_count,
avg_rating=template.avg_rating,
)
db_session.add(p)
plugins.append(p)
await db_session.commit()
return plugins
# ── JWT helpers ────────────────────────────────────────────────────── # ── JWT helpers ──────────────────────────────────────────────────────
@@ -212,24 +134,21 @@ def auth_header(tier: str = "power", user_id: str | None = None) -> dict[str, st
return {"Authorization": f"Bearer {make_jwt(tier, user_id)}"} return {"Authorization": f"Bearer {make_jwt(tier, user_id)}"}
# ── S3 mock fixture ────────────────────────────────────────────────── # ── CLI options ───────────────────────────────────────────────────────
S3_TEST_BUCKET = "test-bucket" def pytest_addoption(parser):
S3_TEST_REGION = "us-east-1" parser.addoption(
"--preprocess-dir",
default=None,
@pytest.fixture help="Override fixture folder for preprocessor tests (must contain cases.yaml + data/)",
def s3_bucket(): )
"""Create a mocked S3 bucket via moto and patch BlobStore settings.""" parser.addoption(
with mock_aws(): "--runner-dir",
os.environ.setdefault("AWS_ACCESS_KEY_ID", "testing") default=None,
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testing") help="Override fixture folder for agent_runner_v2 eval tests (must contain cases.yaml + data/)",
os.environ.setdefault("AWS_DEFAULT_REGION", S3_TEST_REGION) )
client = boto3.client("s3", region_name=S3_TEST_REGION) parser.addoption(
client.create_bucket(Bucket=S3_TEST_BUCKET) "--journey-dir",
with patch("app.storage.blob_store.settings") as mock_settings: default=None,
mock_settings.S3_BUCKET = S3_TEST_BUCKET help="Override fixture folder for journey_v2 eval tests (must contain cases.yaml + data/)",
mock_settings.S3_REGION = S3_TEST_REGION )
mock_settings.AWS_ACCESS_KEY_ID = "testing"
mock_settings.AWS_SECRET_ACCESS_KEY = "testing"
yield S3_TEST_BUCKET

View File

@@ -0,0 +1,86 @@
# Agent Runner V2 — eval test cases (Step 2, requires real LLM)
#
# Each case drives one parametrized `test_eval_runner` invocation.
#
# Keys
# ----
# id: str unique identifier shown in pytest output
# description: str human-readable label
# file: str filename inside data/
# file_path: str path reported to the executor (affects project-matching via filename)
# projects: [alpha|beta] symbolic project names resolved by the test helper
#
# Optional pre-existing records (dedup tests)
# existing_tasks: list of {id, title, status, priority}
# existing_notes: list of {id, title, content}
# existing_timelines: list of {id, title, date}
#
# Assertions (one or more)
# expect_insert: <table> at least 1 insert row in this table (tasks|notes|timelines)
# expect_no_insert: true zero inserts in any table
# expect_project_id: <id> any insert must carry this projectId
# expect_dedup: true task inserts == 0 OR task updates >= 1 (dedup check)
#
# Langfuse
# score_name: str observation score name
- id: "2.1"
description: "Action email → create_task"
file: email_action.html
file_path: /emails/ProjectAlpha_action.html
projects: [alpha, beta]
expect_insert: tasks
score_name: runner.email_to_task
- id: "2.2"
description: "Informational email → create_note"
file: email_info.html
file_path: /emails/ProjectAlpha_info.html
projects: [alpha, beta]
expect_insert: notes
score_name: runner.email_to_note
- id: "2.3"
description: "Email with meeting date → create_timeline"
file: email_date.html
file_path: /emails/ProjectAlpha_kickoff.html
projects: [alpha, beta]
expect_insert: timelines
score_name: runner.email_to_timeline
- id: "2.4"
description: "Filename contains project name → correct project assigned"
file: email_action.html
file_path: /emails/ProjectAlpha_report.html
projects: [alpha, beta]
expect_project_id: proj-alpha
score_name: runner.project_filename
- id: "2.5"
description: "Email body mentions project → correct project assigned"
file: email_action.html
file_path: /emails/email_001.html
projects: [alpha, beta]
expect_project_id: proj-alpha
score_name: runner.project_content
- id: "2.6"
description: "Newsletter + global rule no-project → no creates"
file: email_no_project.html
file_path: /emails/newsletter.html
projects: [alpha, beta]
expect_no_insert: true
score_name: runner.no_project
- id: "2.7"
description: "Existing task with same title → dedup (update not create)"
file: email_action.html
file_path: /emails/ProjectAlpha_followup.html
projects: [alpha]
existing_tasks:
- id: task-existing
title: Fix the login bug
status: todo
priority: medium
expect_dedup: true
score_name: runner.dedup

View File

@@ -0,0 +1,7 @@
<html><head></head><body>
<p><b>From:</b> boss@company.com</p>
<p><b>To:</b> dev@company.com</p>
<p><b>Subject:</b> Fix the login bug</p>
<p><b>Date:</b> 2026-04-07</p>
<p>Hi,<br>Please fix the login bug in Project Alpha by Friday. High priority!</p>
</body></html>

View File

@@ -0,0 +1,5 @@
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>Subject:</b> Project Alpha kick-off meeting</p>
<p>The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.</p>
</body></html>

View File

@@ -0,0 +1,7 @@
<html><head></head><body>
<p><b>From:</b> pm@company.com</p>
<p><b>To:</b> team@company.com</p>
<p><b>Subject:</b> FYI: New policy for Project Alpha</p>
<p>Just a heads-up that starting next week all code reviews must be done
within 24 hours for Project Alpha. No action needed from you now.</p>
</body></html>

View File

@@ -0,0 +1,5 @@
<html><head></head><body>
<p><b>From:</b> newsletter@ads.com</p>
<p><b>Subject:</b> Weekly newsletter</p>
<p>Check out our latest deals on electronics!</p>
</body></html>

87
tests/fixtures/journey_v2/cases.yaml vendored Normal file
View File

@@ -0,0 +1,87 @@
# Journey V2 eval test cases — Step 4
#
# Each case simulates a complete journey session:
# 1. handle_journey_start is called with directory + data_types
# 2. handle_journey_message is called for each entry in user_messages
# 3. Assertions are evaluated on the final reply
#
# directory_files: list of {path, content_file} — content_file is relative to data/
#
# Assertion keys:
# expect_question: true → first reply must contain "?"
# expect_done: true → final reply must have done=True
# expect_valid_config: true → agent_config must be parseable as AgentConfig with content_types > 0
# expect_content_type_id: <str> → AgentConfig.content_types must contain an entry with this id
# expect_extraction_contains: <str> → first content_type extraction_prompt must contain this word
# expect_global_rules: true → AgentConfig.global_rules must be non-empty
- id: "4.1"
description: "Journey start explores directory, first reply contains a question"
directory: "/test/emails"
data_types: ["tasks", "notes", "timelines"]
directory_files:
- path: "/test/emails/outlook_export_2024.html"
content_file: "email_action.html"
user_messages: []
score_name: "journey.start"
expect_question: true
- id: "4.2"
description: "Full 3-turn conversation produces a valid AgentConfig JSON"
directory: "/test/emails"
data_types: ["tasks", "notes", "timelines"]
directory_files:
- path: "/test/emails/email_backup.html"
content_file: "email_action.html"
user_messages:
- "These are email exports from Outlook in HTML format"
- "Create tasks for emails with direct action requests, notes for informational emails"
- "Yes, that looks correct. No other rules."
score_name: "journey.valid_json"
expect_done: true
expect_valid_config: true
- id: "4.3"
description: "Journey detects email_html content type from directory exploration"
directory: "/test/emails"
data_types: ["tasks", "notes"]
directory_files:
- path: "/test/emails/message.html"
content_file: "email_action.html"
user_messages:
- "HTML email backups from my mail client, exported from Outlook"
- "Create tasks from emails that contain assignments or direct action items"
- "Correct, no other rules needed"
score_name: "journey.detect_email"
expect_done: true
expect_content_type_id: "email_html"
- id: "4.4"
description: "Custom user rule (only notes, no tasks) reflected in extraction_prompt"
directory: "/test/emails"
data_types: ["notes"]
directory_files:
- path: "/test/emails/email.html"
content_file: "email_info.html"
user_messages:
- "HTML emails from my work inbox"
- "Create only notes from all emails — I do not want tasks or timelines to be created"
- "Yes, exactly"
score_name: "journey.custom_rules"
expect_done: true
expect_extraction_contains: "note"
- id: "4.5"
description: "Global rule (no project = no entity) appears in AgentConfig.global_rules"
directory: "/test/emails"
data_types: ["tasks", "notes"]
directory_files:
- path: "/test/emails/email.html"
content_file: "email_action.html"
user_messages:
- "Email backups from Outlook"
- "Create tasks from action request emails, notes from informational emails"
- "If the email cannot be matched to any project, do not create any entity at all"
score_name: "journey.global_rules"
expect_done: true
expect_global_rules: true

View File

@@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Email: Fix the login bug</title>
<style>body { font-family: Arial; } .header { color: #666; }</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> boss@company.com</p>
<p><strong>To:</strong> dev@company.com</p>
<p><strong>Subject:</strong> Fix the login bug</p>
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:15:00 +0000</p>
</div>
<div class="body">
<p>Hi,</p>
<p>Please fix the login bug in Project Alpha as soon as possible.
Users are reporting that they can't log in with their Google accounts.
This is blocking the whole team. Please resolve it by Friday.</p>
<p>Thanks,<br>Boss</p>
</div>
</body>
</html>

View File

@@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Email: New policy update</title>
<style>body { font-family: Arial; }</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> hr@company.com</p>
<p><strong>To:</strong> all@company.com</p>
<p><strong>Subject:</strong> FYI: New remote work policy effective May 1</p>
<p><strong>Date:</strong> Tue, 8 Apr 2026 10:00:00 +0000</p>
</div>
<div class="body">
<p>Hi everyone,</p>
<p>Just a heads-up that starting May 1, 2026 the company will be moving to
a hybrid work model. You will be expected to come into the office at least
two days per week. More details will follow in the employee handbook.</p>
<p>Best,<br>HR Team</p>
</div>
</body>
</html>

View File

@@ -1,121 +1,68 @@
# Preprocessor test cases — Step 1 (Local Agent V2) # Preprocessor test cases
# #
# Schema per caso: # detect: <expected_type> → chiama detect_content_type(filename, content)
# id: "1.N" # process: <content_type> → chiama preprocess(content_type, content)
# description: str
# score_name: str # nome score inviato a Langfuse
# #
# Sorgente contenuto (una delle due): # Sorgente: file: <nome in data/> oppure generate: binary_noise
# file: <nome file in data/> # letto come testo UTF-8
# generate: binary_noise # contenuto generato dal runner (per test binari)
# #
# Per op=detect: # Assertions piatte (solo per process):
# op: detect # no_html: true clean_text senza tag HTML
# input_filename: str # filename passato a detect_content_type # min_chars: N len(clean_text) >= N
# expected_content_type: str # ratio_lt: F len(clean) / len(raw) < F
# # has_meta: [k, ...] chiavi presenti in metadata
# Per op=preprocess: # contains: str | [str] substring(s) presenti in clean_text
# op: preprocess # excludes: str | [str] substring(s) assenti da clean_text
# input_content_type: str # content_type passato a preprocess() # content_type: str result.content_type == questo valore
# assertions:
# no_html_tags: bool
# min_length: int
# compression_ratio_lt: float # len(clean) / len(raw) < soglia
# metadata_keys: [str, ...] # chiavi che devono essere in metadata
# contains: str | [str, ...] # substring(s) presenti in clean_text
# not_contains: str | [str, ...] # substring(s) assenti da clean_text
# content_type: str # valore atteso di result.content_type
cases:
# ── Detection tests ────────────────────────────────────────────────
- id: "1.1" - id: "1.1"
description: "Detect email HTML"
score_name: preprocess.detect_email
file: email_action.html file: email_action.html
op: detect detect: email_html
input_filename: email_export.html
expected_content_type: email_html
- id: "1.2" - id: "1.2"
description: "Detect generic HTML"
score_name: preprocess.detect_generic
file: generic_page.html file: generic_page.html
op: detect detect: generic_html
input_filename: index.html
expected_content_type: generic_html
- id: "1.3" - id: "1.3"
description: "Detect plain text"
score_name: preprocess.detect_text
file: notes.txt file: notes.txt
op: detect detect: plain_text
input_filename: notes.txt
expected_content_type: plain_text
- id: "1.4" - id: "1.4"
description: "Detect unknown (binary-like content)" file: archive.xyz
score_name: preprocess.detect_unknown
generate: binary_noise generate: binary_noise
op: detect detect: unknown
input_filename: archive.xyz
expected_content_type: unknown
# ── Preprocess tests ───────────────────────────────────────────────
- id: "1.5" - id: "1.5"
description: "Email: strip HTML tags"
file: email_action.html file: email_action.html
op: preprocess process: email_html
input_content_type: email_html no_html: true
assertions: min_chars: 50
no_html_tags: true ratio_lt: 0.8
min_length: 50
compression_ratio_lt: 0.8
- id: "1.6" - id: "1.6"
description: "Email: extract metadata (Subject + From)"
file: email_action.html file: email_action.html
op: preprocess process: email_html
input_content_type: email_html has_meta: [subject, from]
assertions:
metadata_keys: [subject, from]
- id: "1.7" - id: "1.7"
description: "Email: split thread — solo ultimo messaggio"
file: email_thread.html file: email_thread.html
op: preprocess process: email_html
input_content_type: email_html
assertions:
contains: "Sure, I'll handle the deploy" contains: "Sure, I'll handle the deploy"
not_contains: "Let's plan the deploy" excludes: "Let's plan the deploy"
- id: "1.8" - id: "1.8"
description: "Email: singolo messaggio senza thread"
file: email_single.html file: email_single.html
op: preprocess process: email_html
input_content_type: email_html
assertions:
contains: "deploy is done" contains: "deploy is done"
- id: "1.9" - id: "1.9"
description: "Email: HTML pesante con table layout"
file: email_heavy.html file: email_heavy.html
op: preprocess process: email_html
input_content_type: email_html no_html: true
assertions: min_chars: 30
no_html_tags: true excludes: [border-collapse, font-size]
min_length: 30
not_contains:
- "border-collapse"
- "font-size"
- id: "1.10" - id: "1.10"
description: "Fallback: file sconosciuto → testo restituito"
file: fallback.txt file: fallback.txt
op: preprocess process: unknown
input_content_type: unknown min_chars: 1
assertions:
min_length: 1
content_type: unknown content_type: unknown

View File

@@ -0,0 +1,432 @@
"""Tests for Local Agent V2 runner (Step 2).
Covers the unified per-file flow:
Phase A — detect + preprocess (Python, zero LLM)
Phase B — single LLM call with tools (classify + extract + create)
Fixture-based eval tests (2.12.7)
-----------------------------------
Cases are defined in tests/fixtures/agent_runner_v2/cases.yaml.
Email HTML files live in tests/fixtures/agent_runner_v2/data/.
Use --runner-dir to point at a custom folder (same structure required).
Unit tests (no LLM)
--------------------
2.8 items_created count → items_created == N create_* calls
2.9 Device offline → status=error
2.10 Empty file → items_processed=0, status=success
Run:
pytest tests/test_agent_runner_v2.py -v
pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only
pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only
pytest tests/test_agent_runner_v2.py -v --runner-dir /path/to/dir # custom fixtures
"""
from __future__ import annotations
import uuid
from contextlib import nullcontext
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import yaml
from app.core.agent_runner import (
_format_metadata,
_format_projects,
_get_extraction_rules,
_get_no_match_behavior,
_is_overdue,
run_local_agent,
)
from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import get_langfuse
from app.models import AgentRunLog, LocalAgentConfig
from tests.conftest import TEST_USER_IDS
# ── Constants ─────────────────────────────────────────────────────────────
_USER_ID = TEST_USER_IDS["power"]
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "agent_runner_v2"
_AGENT_CONFIG = {
"content_types": [
{
"id": "email_html",
"label": "Email HTML",
"detection_hint": "HTML file with From/To/Subject headers",
"preprocessing": "email_html",
"extraction_prompt": (
"If the email contains a direct action request or task assignment → create a task. "
"If the email contains informational content, updates, or FYI → create a note. "
"If the email mentions a specific date for a meeting or deadline → create a timeline entry."
),
}
],
"global_rules": [
"Se il file non è riconducibile a nessun progetto, non creare alcuna entità."
],
"data_types": ["tasks", "notes", "timelines"],
}
# Canonical project definitions, referenced symbolically in cases.yaml.
_PROJECTS: dict[str, dict] = {
"alpha": {"id": "proj-alpha", "name": "Project Alpha", "status": "active"},
"beta": {"id": "proj-beta", "name": "Project Beta", "status": "active"},
}
# ── Fixture loading ───────────────────────────────────────────────────────
def _fixtures_dir(config) -> Path:
override = config.getoption("--runner-dir")
return Path(override) if override else _DEFAULT_FIXTURE_DIR
def _load_cases(config) -> list[dict]:
return yaml.safe_load(
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
)
def _read_case_file(case: dict, data_dir: Path) -> str:
return (data_dir / case["file"]).read_text(encoding="utf-8")
def _resolve_projects(entries: list[str | dict]) -> list[dict]:
"""Resolve project list from YAML: symbolic names and/or inline dicts."""
result = []
for entry in entries:
if isinstance(entry, str):
if entry in _PROJECTS:
result.append(_PROJECTS[entry])
elif isinstance(entry, dict):
result.append(entry)
return result
# ── pytest_generate_tests — parametrize eval tests from YAML ─────────────
def pytest_generate_tests(metafunc):
if "runner_case" not in metafunc.fixturenames:
return
cases = _load_cases(metafunc.config)
metafunc.parametrize("runner_case", cases, ids=[c["id"] for c in cases])
# ── Test helpers ──────────────────────────────────────────────────────────
def _make_config(
agent_config: dict | None = None,
directory: str = "/emails",
device_id: str = "dev-001",
) -> LocalAgentConfig:
return LocalAgentConfig(
id=str(uuid.uuid4()),
user_id=_USER_ID,
device_id=device_id,
name="Test V2 Agent",
directory_paths=[directory],
data_types=["tasks", "notes", "timelines"],
prompt_template="",
agent_config=agent_config or _AGENT_CONFIG,
file_extensions=[".html", ".eml"],
schedule_cron="0 */6 * * *",
enabled=True,
last_run_at=None,
)
def _make_run_log(agent_id: str) -> AgentRunLog:
return AgentRunLog(
id=str(uuid.uuid4()),
agent_id=agent_id,
agent_type="local",
user_id=_USER_ID,
status="running",
started_at=datetime.now(timezone.utc),
)
def _make_manager(online: bool = True) -> DeviceConnectionManager:
mgr = DeviceConnectionManager()
if online:
ws = MagicMock()
ws.send_text = AsyncMock()
mgr.register(_USER_ID, "dev-001", ws)
return mgr
def _make_executor(
file_path: str,
file_content: str,
projects: list[dict] | None = None,
existing_tasks: list[dict] | None = None,
existing_notes: list[dict] | None = None,
existing_timelines: list[dict] | None = None,
) -> tuple[Any, list[dict]]:
"""Return (async_executor, captured_calls).
The executor handles all ``execute_on_client`` payloads:
directory listing, file reading, project/entity fetching, and CRUD.
"""
calls: list[dict] = []
_projects = projects if projects is not None else list(_PROJECTS.values())
async def _executor(payload: dict) -> dict:
action = payload.get("action", "")
table = payload.get("table", "")
data = payload.get("data") or {}
calls.append({"action": action, "table": table, "data": data})
if action == "list_directory":
return {"entries": [{"type": "file", "path": file_path}]}
if action == "get_file_metadata":
return {"modifiedAt": None}
if action == "read_file_content":
return {"content": file_content}
if action == "select":
if table == "projects":
return {"rows": _projects}
if table == "tasks":
return {"rows": existing_tasks or []}
if table == "notes":
return {"rows": existing_notes or []}
if table == "timelines":
return {"rows": existing_timelines or []}
return {"rows": []}
if action == "insert":
return {"row": {"id": str(uuid.uuid4()), **data}}
if action == "update":
return {"success": True}
return {}
return _executor, calls
# ── Unit: helper functions ────────────────────────────────────────────────
def test_format_projects_empty():
assert "(no projects" in _format_projects([])
def test_format_projects_with_data():
result = _format_projects([_PROJECTS["alpha"]])
assert "proj-alpha" in result
assert "Project Alpha" in result
def test_format_metadata_empty():
assert _format_metadata({}) == ""
def test_format_metadata_email():
meta = {"subject": "Fix bug", "from": "boss@co.com", "date": "2026-04-07"}
result = _format_metadata(meta)
assert "Fix bug" in result
assert "boss@co.com" in result
def test_get_extraction_rules_match():
rules = _get_extraction_rules(_AGENT_CONFIG, "email_html")
assert "task" in rules.lower()
def test_get_extraction_rules_fallback():
rules = _get_extraction_rules(_AGENT_CONFIG, "plain_text")
assert "extract" in rules.lower()
def test_get_no_match_behavior_from_global_rules():
behavior = _get_no_match_behavior(_AGENT_CONFIG)
assert behavior # non-empty
def test_get_no_match_behavior_default():
behavior = _get_no_match_behavior({})
assert "project" in behavior.lower()
# ── Unit: 2.9 — device offline ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_2_9_device_offline():
"""2.9 No device online → status=error, no executor created."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager(online=False)
with patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
assert kwargs["status"] == "error"
assert any("not connected" in e for e in kwargs.get("errors", []))
# ── Unit: 2.10 — empty file ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_2_10_empty_file():
"""2.10 File with empty content → skipped, items_processed=0, success."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path="/emails/empty.html",
file_content="",
projects=[_PROJECTS["alpha"]],
)
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
assert kwargs["items_processed"] == 0
assert kwargs["status"] == "success"
assert kwargs["items_created"] == 0
# ── Unit: 2.8 — items_created count ─────────────────────────────────────
@pytest.mark.asyncio
async def test_2_8_items_created_count():
"""2.8 items_created == number of create_* tool calls per run."""
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, _calls = _make_executor(
file_path="/emails/action.html",
file_content="<html><body><p>Fix the login bug in Project Alpha.</p></body></html>",
projects=[_PROJECTS["alpha"]],
)
async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str:
if _tool_calls_out is not None:
_tool_calls_out.extend(["create_task", "create_note", "update_task"])
return "Done."
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._run_agent_with_tools", side_effect=mock_run_agent), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
# Only create_task + create_note count (not update_task).
assert kwargs["items_created"] == 2
assert kwargs["items_processed"] == 1
# ── Eval: 2.12.7 — fixture-driven, real LLM + Langfuse scoring ──────────
#
# Cases loaded from tests/fixtures/agent_runner_v2/cases.yaml.
# Supported assertions (from YAML):
# expect_insert: <table> → at least 1 insert in that table
# expect_no_insert: true → zero inserts in any table
# expect_project_id: <id> → any insert carries this projectId
# expect_dedup: true → task inserts == 0 OR task updates >= 1
# ─────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.eval
async def test_eval_runner(runner_case, pytestconfig):
"""Parametrized eval test — one invocation per YAML case."""
case: dict = runner_case
data_dir = _fixtures_dir(pytestconfig) / "data"
file_content = _read_case_file(case, data_dir)
projects = _resolve_projects(case.get("projects", []))
config = _make_config()
run_log = _make_run_log(config.id)
mgr = _make_manager()
executor, calls = _make_executor(
file_path=case["file_path"],
file_content=file_content,
projects=projects,
existing_tasks=case.get("existing_tasks"),
existing_notes=case.get("existing_notes"),
existing_timelines=case.get("existing_timelines"),
)
lf = get_langfuse()
obs_ctx = lf.start_as_current_observation(
name=f"eval-runner-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
metadata={"step": "2", "case_id": case["id"]},
) if lf else nullcontext()
with obs_ctx as obs:
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
inserts = [c for c in calls if c["action"] == "insert"]
score, comment = _evaluate_case(case, calls, kwargs)
if obs is not None:
obs.score(
name=case.get("score_name", f"runner.case_{case['id']}"),
value=score,
comment=comment,
)
if lf:
lf.flush()
assert score == 1.0, f"[{case['id']}] {case.get('description', '')}{comment}"
def _evaluate_case(case: dict, calls: list[dict], finalize_kwargs: dict) -> tuple[float, str]:
"""Return (score, comment) for a YAML case given the captured executor calls."""
inserts = [c for c in calls if c["action"] == "insert"]
if case.get("expect_no_insert"):
score = 1.0 if len(inserts) == 0 else 0.0
return score, f"inserts={len(inserts)} (expected 0)"
if "expect_insert" in case:
tables = case["expect_insert"]
if isinstance(tables, str):
tables = [tables]
missing = [t for t in tables if not any(c["table"] == t for c in inserts)]
score = 1.0 if not missing else 0.0
counts = {t: sum(1 for c in inserts if c["table"] == t) for t in tables}
return score, f"inserts={counts}" + (f" missing={missing}" if missing else "")
if "expect_project_id" in case:
expected_pid = case["expect_project_id"]
correct = any(c.get("data", {}).get("projectId") == expected_pid for c in inserts)
score = 1.0 if correct else 0.0
all_pids = [c.get("data", {}).get("projectId") for c in inserts]
return score, f"projectIds={all_pids} (expected {expected_pid!r})"
if case.get("expect_dedup"):
task_creates = [c for c in inserts if c["table"] == "tasks"]
task_updates = [c for c in calls if c["action"] == "update" and c["table"] == "tasks"]
score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0
return score, f"task_creates={len(task_creates)} task_updates={len(task_updates)}"
return 0.0, "no assertion defined in case"

349
tests/test_journey_v2.py Normal file
View File

@@ -0,0 +1,349 @@
"""Tests for Local Agent V2 journey setup (Step 4).
Covers the chatbot journey that produces a structured AgentConfig JSON
instead of a freeform prompt_template string.
Unit tests (no LLM)
--------------------
4.6a _extract_agent_config: valid JSON → returns serialised config
4.6b _extract_agent_config: invalid JSON → returns None
4.6c _extract_agent_config: markers absent → returns None
4.6d _extract_agent_config: only START marker → returns None
4.6e Session not found → done=True, agent_config=None
4.6f Nudge uses AGENT_CONFIG_START/END markers (not old PROMPT_TEMPLATE)
Eval tests (real LLM + Langfuse scoring)
-----------------------------------------
Cases are defined in tests/fixtures/journey_v2/cases.yaml.
Email HTML files live in tests/fixtures/journey_v2/data/.
Use --journey-dir to point at a custom folder (same structure required).
Run:
pytest tests/test_journey_v2.py -v
pytest tests/test_journey_v2.py -v -k "4_6" # unit only
pytest tests/test_journey_v2.py -v -k "eval" # LLM evals only
pytest tests/test_journey_v2.py -v --journey-dir /p # custom fixtures
"""
from __future__ import annotations
import uuid
from contextlib import nullcontext
from pathlib import Path
from typing import Any
from unittest.mock import patch
import pytest
import yaml
from app.api.routes.agent_setup import (
_CONFIG_END,
_CONFIG_START,
_MAX_TURNS,
_extract_agent_config,
_sessions,
handle_journey_message,
handle_journey_start,
)
from app.core.langfuse_client import get_langfuse
from app.core.ws_context import clear_client_executor, set_client_executor
from app.schemas import AgentConfig
from tests.conftest import TEST_USER_IDS
# ── Constants ─────────────────────────────────────────────────────────────
_USER_ID = TEST_USER_IDS["power"]
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "journey_v2"
# ── Fixture loading ───────────────────────────────────────────────────────
def _fixtures_dir(config) -> Path:
override = config.getoption("--journey-dir")
return Path(override) if override else _DEFAULT_FIXTURE_DIR
def _load_cases(config) -> list[dict]:
return yaml.safe_load(
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
)
def _read_data_file(filename: str, fixtures_dir: Path) -> str:
return (fixtures_dir / "data" / filename).read_text(encoding="utf-8")
# ── pytest_generate_tests ─────────────────────────────────────────────────
def pytest_generate_tests(metafunc):
if "journey_case" not in metafunc.fixturenames:
return
cases = _load_cases(metafunc.config)
metafunc.parametrize("journey_case", cases, ids=[c["id"] for c in cases])
# ── Executor builder ──────────────────────────────────────────────────────
def _make_fs_executor(directory_files: list[dict], fixtures_dir: Path):
"""Return an async callback that simulates filesystem tool responses.
Matches the signature expected by ``set_client_executor`` / ``execute_on_client``:
receives the full ``payload`` dict and returns a result dict.
``directory_files`` is a list of ``{path, content_file}`` dicts;
``content_file`` is relative to ``fixtures_dir/data/``.
"""
file_map: dict[str, str] = {
entry["path"]: _read_data_file(entry["content_file"], fixtures_dir)
for entry in directory_files
}
async def _executor(payload: dict) -> dict:
action = payload.get("action", "")
data = payload.get("data") or {}
if action == "list_directory":
return {"entries": [
{"type": "file", "name": p.split("/")[-1], "path": p}
for p in file_map
]}
if action == "read_file_content":
path = data.get("path", "")
return {"content": file_map.get(path, "")}
if action == "get_file_metadata":
path = data.get("path", "")
name = path.split("/")[-1]
ext = "." + name.rsplit(".", 1)[-1] if "." in name else ""
return {"name": name, "extension": ext, "size": 1024,
"createdAt": None, "modifiedAt": None}
return {}
return _executor
# ── Journey runner helper ─────────────────────────────────────────────────
async def _run_journey(user_id: str, case: dict, executor) -> dict[str, Any]:
"""Drive start + all user_messages for a case. Returns the final reply dict.
Mirrors ``device_ws._handle_journey_start/message``: sets the client
executor (so filesystem tools work) before each handler call.
"""
session_id = str(uuid.uuid4())
try:
set_client_executor(executor)
reply = await handle_journey_start(user_id, {
"agent_type": "local",
"directory": case["directory"],
"data_types": case["data_types"],
"session_id": session_id,
})
for msg in case.get("user_messages", []):
if reply.get("done"):
break
set_client_executor(executor)
reply = await handle_journey_message(user_id, {
"session_id": reply["session_id"],
"message": msg,
})
finally:
clear_client_executor()
_sessions.pop(session_id, None)
return reply
# ── Assertion helper ──────────────────────────────────────────────────────
def _evaluate_case(case: dict, reply: dict) -> tuple[float, str]:
"""Return (score, comment) for a journey case given the final reply dict."""
if case.get("expect_question"):
has_q = "?" in reply.get("message", "")
return (1.0 if has_q else 0.0), f"first_reply_has_question={has_q}"
if case.get("expect_done") and not reply.get("done"):
return 0.0, "expected done=True but journey did not complete"
agent_config_raw = reply.get("agent_config")
if case.get("expect_valid_config"):
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
valid = len(parsed.content_types) > 0
return (1.0 if valid else 0.0), f"content_types={len(parsed.content_types)}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_content_type_id"):
expected_id = case["expect_content_type_id"]
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
ids = [ct.id for ct in parsed.content_types]
found = expected_id in ids
return (1.0 if found else 0.0), f"content_type_ids={ids}, expected={expected_id}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_extraction_contains"):
keyword = case["expect_extraction_contains"].lower()
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
if not parsed.content_types:
return 0.0, "no content_types in config"
prompt = parsed.content_types[0].extraction_prompt.lower()
found = keyword in prompt
return (1.0 if found else 0.0), f"keyword='{keyword}' in extraction_prompt={found}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_global_rules"):
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
has_rules = len(parsed.global_rules) > 0
return (1.0 if has_rules else 0.0), f"global_rules={parsed.global_rules}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
return 1.0, "no specific assertion"
# ── Unit tests ────────────────────────────────────────────────────────────
def test_4_6a_extract_valid_json():
"""_extract_agent_config: valid JSON between markers → returns serialised config."""
config = AgentConfig(
content_types=[],
global_rules=["No project = no entity"],
data_types=["tasks"],
)
text = f"Some preamble\n{_CONFIG_START}\n{config.model_dump_json()}\n{_CONFIG_END}\nTrailing"
result = _extract_agent_config(text)
assert result is not None
parsed = AgentConfig.model_validate_json(result)
assert parsed.global_rules == ["No project = no entity"]
def test_4_6b_extract_invalid_json():
"""_extract_agent_config: malformed JSON between markers → returns None."""
text = f"{_CONFIG_START}\n{{not: valid json\n{_CONFIG_END}"
assert _extract_agent_config(text) is None
def test_4_6c_extract_markers_absent():
"""_extract_agent_config: no markers at all → returns None."""
assert _extract_agent_config("No markers here at all") is None
def test_4_6d_extract_only_start_marker():
"""_extract_agent_config: START without END → returns None."""
assert _extract_agent_config(f"text {_CONFIG_START} no end marker") is None
@pytest.mark.asyncio
async def test_4_6e_session_not_found():
"""4.6e Session not found → done=True, agent_config=None, informative message."""
reply = await handle_journey_message(_USER_ID, {
"session_id": "nonexistent-session-id",
"message": "Hello",
})
assert reply["done"] is True
assert reply["agent_config"] is None
assert "not found" in reply["message"].lower() or "expired" in reply["message"].lower()
@pytest.mark.asyncio
async def test_4_6f_nudge_uses_new_markers():
"""4.6f Nudge injected after max turns uses AGENT_CONFIG markers, not PROMPT_TEMPLATE."""
session_id = str(uuid.uuid4())
captured_histories: list[list[dict]] = []
async def _mock_llm(system_prompt, history, tools, **kwargs) -> str:
captured_histories.append(list(history))
# Return plain text — no markers — to trigger the nudge path.
return "I still need more information from you."
from app.api.routes.agent_setup import JourneySession
fake_session = JourneySession(
session_id=session_id,
user_id=_USER_ID,
agent_type="local",
directory="/test",
data_types=["tasks"],
system_prompt="system",
langfuse_prompt=None,
)
# Fill history to the turn limit so the next message triggers the nudge.
for i in range(_MAX_TURNS):
fake_session.history.append({"role": "user", "content": f"msg {i}"})
fake_session.history.append({"role": "assistant", "content": "ok"})
_sessions[session_id] = fake_session
try:
with patch("app.api.routes.agent_setup._call_llm_with_tools", side_effect=_mock_llm):
await handle_journey_message(_USER_ID, {
"session_id": session_id,
"message": "one more message to trigger nudge",
})
finally:
_sessions.pop(session_id, None)
# Second LLM call receives the nudge appended to history.
assert len(captured_histories) >= 2, "Expected ≥ 2 LLM calls (main reply + nudge)"
nudge_history = captured_histories[1]
user_msgs = " ".join(t["content"] for t in nudge_history if t["role"] == "user")
assert _CONFIG_START in user_msgs, f"Nudge must reference {_CONFIG_START}"
assert _CONFIG_END in user_msgs, f"Nudge must reference {_CONFIG_END}"
assert "PROMPT_TEMPLATE" not in user_msgs, "Old PROMPT_TEMPLATE markers must not appear in nudge"
# ── Eval tests (real LLM + Langfuse) ─────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.eval
async def test_eval_journey(journey_case, pytestconfig):
"""Parametrized eval test — one invocation per YAML case."""
case: dict = journey_case
fixtures_dir = _fixtures_dir(pytestconfig)
executor = _make_fs_executor(case.get("directory_files", []), fixtures_dir)
lf = get_langfuse()
obs_ctx = lf.start_as_current_observation(
name=f"eval-journey-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
metadata={"step": "4", "case_id": case["id"]},
) if lf else nullcontext()
with obs_ctx as obs:
reply = await _run_journey(_USER_ID, case, executor)
score, comment = _evaluate_case(case, reply)
if obs is not None:
obs.score(
name=case.get("score_name", f"journey.case_{case['id']}"),
value=score,
comment=comment,
)
if lf:
lf.flush()
assert score == 1.0, f"[{case['id']}] {case.get('description', '')}{comment}"

View File

@@ -1,171 +1,98 @@
"""Tests for the preprocessor system (Step 1 — Local Agent V2). """Tests for the preprocessor system (Step 1 — Local Agent V2).
Fixtures are driven by:
tests/fixtures/preprocessors/cases.yaml — test case definitions
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
Run: Run:
pytest tests/test_preprocessors.py -v pytest tests/test_preprocessors.py -v
pytest tests/test_preprocessors.py -v --preprocess-dir /path/to/folder
# Only detection tests The folder must contain cases.yaml + data/.
pytest tests/test_preprocessors.py -v -k detect
# Only preprocess tests
pytest tests/test_preprocessors.py -v -k preprocess
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
""" """
from __future__ import annotations from __future__ import annotations
import re import re
from pathlib import Path from pathlib import Path
from typing import Any
import pytest import pytest
import yaml import yaml
from app.core.langfuse_client import get_langfuse
from app.core.preprocessors import detect_content_type, preprocess from app.core.preprocessors import detect_content_type, preprocess
# ── Paths ────────────────────────────────────────────────────────────── _DEFAULT_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors" _GENERATORS = {
_DATA_DIR = _FIXTURES_DIR / "data"
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
# ── Content generators ─────────────────────────────────────────────────
_GENERATORS: dict[str, str] = {
# High ratio of non-printable chars → triggers "unknown" heuristic
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20, "binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
} }
def _load_cases() -> list[dict]: def _fixtures_dir(config) -> Path:
with _CASES_FILE.open(encoding="utf-8") as f: override = config.getoption("--preprocess-dir")
return yaml.safe_load(f)["cases"] return Path(override) if override else _DEFAULT_DIR
def _read_content(case: dict) -> str: def _load_cases(config) -> list[dict]:
return yaml.safe_load((_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8"))
def _content(case: dict, data_dir: Path) -> str:
if "generate" in case: if "generate" in case:
key = case["generate"] return _GENERATORS[case["generate"]]
if key not in _GENERATORS: return (data_dir / case["file"]).read_text(encoding="utf-8")
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
return _GENERATORS[key]
file_path = _DATA_DIR / case["file"]
return file_path.read_text(encoding="utf-8")
# ── Langfuse helper ─────────────────────────────────────────────────── # ── parametrize at collection time via pytest hook ────────────────────
def _lf_score(score_name: str, value: float, comment: str = "") -> None: def pytest_generate_tests(metafunc):
lf = get_langfuse() if "preprocess_case" not in metafunc.fixturenames:
if lf: return
trace = lf.trace(name=f"eval-{score_name}") cases = _load_cases(metafunc.config)
lf.score( test_name = metafunc.function.__name__
trace_id=trace.id, if test_name == "test_detect":
name=score_name, subset = [c for c in cases if "detect" in c]
value=value, else:
data_type="NUMERIC", subset = [c for c in cases if "process" in c]
comment=comment, metafunc.parametrize("preprocess_case", subset, ids=[c["id"] for c in subset])
)
lf.flush()
# ── Assertion engine ────────────────────────────────────────────────── # ── detect ────────────────────────────────────────────────────────────
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> list[str]: def test_detect(preprocess_case, pytestconfig) -> None:
"""Run all assertions declared in the YAML case. Returns failure messages.""" case = preprocess_case
failures: list[str] = [] data_dir = _fixtures_dir(pytestconfig) / "data"
raw = _content(case, data_dir)
filename = case.get("file", "")
ct = detect_content_type(filename, raw)
expected = case["detect"]
assert ct == expected, f"[{case['id']}] expected {expected!r}, got {ct!r}"
if assertions.get("no_html_tags"):
if re.search(r"<[^>]+>", result.clean_text):
failures.append("clean_text still contains HTML tags")
min_len = assertions.get("min_length") # ── preprocess ────────────────────────────────────────────────────────
if min_len is not None:
if len(result.clean_text) < min_len:
failures.append(
f"clean_text too short: {len(result.clean_text)} < {min_len}"
)
ratio_lt = assertions.get("compression_ratio_lt") def test_preprocess(preprocess_case, pytestconfig) -> None:
if ratio_lt is not None and len(raw) > 0: case = preprocess_case
data_dir = _fixtures_dir(pytestconfig) / "data"
raw = _content(case, data_dir)
result = preprocess(case["process"], raw)
if case.get("no_html"):
assert not re.search(r"<[^>]+>", result.clean_text), "clean_text contains HTML tags"
if "min_chars" in case:
assert len(result.clean_text) >= case["min_chars"], \
f"clean_text too short: {len(result.clean_text)} < {case['min_chars']}"
if "ratio_lt" in case:
ratio = len(result.clean_text) / len(raw) ratio = len(result.clean_text) / len(raw)
if ratio >= ratio_lt: assert ratio < case["ratio_lt"], f"compression ratio {ratio:.2f} >= {case['ratio_lt']}"
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
meta_keys = assertions.get("metadata_keys", []) for key in case.get("has_meta", []):
for key in meta_keys: assert result.metadata.get(key), f"metadata missing {key!r} (got {result.metadata})"
if not result.metadata.get(key):
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
contains = assertions.get("contains") for item in ([case["contains"]] if isinstance(case.get("contains"), str) else case.get("contains", [])):
if contains: assert item in result.clean_text, f"clean_text missing {item!r}"
items = [contains] if isinstance(contains, str) else contains
for item in items:
if item not in result.clean_text:
failures.append(f"clean_text missing expected substring: {item!r}")
not_contains = assertions.get("not_contains") for item in ([case["excludes"]] if isinstance(case.get("excludes"), str) else case.get("excludes", [])):
if not_contains: assert item not in result.clean_text, f"clean_text contains forbidden {item!r}"
items = [not_contains] if isinstance(not_contains, str) else not_contains
for item in items:
if item in result.clean_text:
failures.append(f"clean_text contains forbidden substring: {item!r}")
expected_ct = assertions.get("content_type") if "content_type" in case:
if expected_ct and result.content_type != expected_ct: assert result.content_type == case["content_type"], \
failures.append( f"expected content_type {case['content_type']!r}, got {result.content_type!r}"
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
)
return failures
# ── Parametrized: detect ──────────────────────────────────────────────
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
@pytest.mark.parametrize(
"case",
_detect_cases,
ids=[c["id"] for c in _detect_cases],
)
def test_detect(case: dict) -> None:
raw = _read_content(case)
ct = detect_content_type(case["input_filename"], raw)
expected = case["expected_content_type"]
score = 1.0 if ct == expected else 0.0
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
assert ct == expected, (
f"[{case['id']}] {case['description']}: "
f"expected content_type={expected!r}, got {ct!r}"
)
# ── Parametrized: preprocess ──────────────────────────────────────────
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
@pytest.mark.parametrize(
"case",
_preprocess_cases,
ids=[c["id"] for c in _preprocess_cases],
)
def test_preprocess(case: dict) -> None:
raw = _read_content(case)
result = preprocess(case["input_content_type"], raw)
assertions = case.get("assertions", {})
failures = _run_assertions(assertions, result, raw)
assert not failures, (
f"[{case['id']}] {case['description']}{len(failures)} assertion(s) failed:\n"
+ "\n".join(f"{f}" for f in failures)
)