Compare commits
15 Commits
feature/mi
...
e672b58b6f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e672b58b6f | ||
|
|
d8add7e8cb | ||
|
|
c6c4578f9a | ||
|
|
3aa0b36a6c | ||
|
|
fa231a3642 | ||
|
|
d91c98f86d | ||
|
|
c0619f5c4d | ||
|
|
da282229ff | ||
|
|
7fa6ad5760 | ||
|
|
dcd14220ca | ||
|
|
3cc32569d9 | ||
|
|
bf445ac2ce | ||
|
|
a2d6d689e4 | ||
|
|
aa8bcbf0d8 | ||
|
|
1ce1d492b0 |
@@ -39,6 +39,13 @@ QDRANT_URL=
|
||||
QDRANT_API_KEY=
|
||||
# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333
|
||||
|
||||
# ── Langfuse (leave empty to disable observability) ───────────────────────────
|
||||
LANGFUSE_SECRET_KEY=
|
||||
LANGFUSE_PUBLIC_KEY=
|
||||
# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default)
|
||||
# LANGFUSE_HOST=https://us.cloud.langfuse.com # US
|
||||
# LANGFUSE_HOST=http://localhost:3000 # Self-hosted
|
||||
|
||||
# ── CORS ──────────────────────────────────────────────────────────────────────
|
||||
# Comma-separated list parsed by Settings (override default if needed)
|
||||
# CORS_ORIGINS=["app://.","http://localhost:3000"]
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
"""add agent_config to local_agent_configs
|
||||
|
||||
Revision ID: a3b9c0d1e2f3
|
||||
Revises: 9a1f2d0b6c7e
|
||||
Create Date: 2026-04-07 00:00:00.000000
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "a3b9c0d1e2f3"
|
||||
down_revision: Union[str, None] = "9a1f2d0b6c7e"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"local_agent_configs",
|
||||
sa.Column("agent_config", sa.JSON(), nullable=True),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("local_agent_configs", "agent_config")
|
||||
@@ -1,11 +1,11 @@
|
||||
"""Chatbot Journey — WS-based guided conversation to build an agent prompt_template.
|
||||
"""Chatbot Journey — WS-based guided conversation to build an AgentConfig.
|
||||
|
||||
The journey is driven entirely through WebSocket frames (no REST endpoints).
|
||||
The device WS handler dispatches ``journey_start`` and ``journey_message``
|
||||
frames to the functions exported here.
|
||||
|
||||
Journey flow:
|
||||
1. FE sends ``journey_start`` frame with basic agent config (directory,
|
||||
1. FE sends ``journey_start`` frame with basic agent info (directory,
|
||||
data_types, schedule).
|
||||
2. Server creates an in-memory session, sets up a WS executor so the
|
||||
setup LLM can use file-system tools, does a first directory scrape,
|
||||
@@ -13,10 +13,11 @@ Journey flow:
|
||||
3. FE sends ``journey_message`` frames for each user reply.
|
||||
4. Server appends the user message, calls the LLM (which may read files
|
||||
via tools), and sends back a ``journey_reply``.
|
||||
5. After 3-5 turns the LLM wraps up by emitting a ``prompt_template``
|
||||
block delimited by ``PROMPT_TEMPLATE_START`` / ``PROMPT_TEMPLATE_END``.
|
||||
6. Server parses the block, sends ``journey_reply`` with ``done=True``
|
||||
and the template. FE stores it locally.
|
||||
5. After 3-5 turns the LLM wraps up by emitting an ``AgentConfig`` JSON
|
||||
block delimited by ``AGENT_CONFIG_START`` / ``AGENT_CONFIG_END``.
|
||||
6. Server parses and validates the JSON with Pydantic, sends
|
||||
``journey_reply`` with ``done=True`` and the serialised config.
|
||||
FE stores it locally.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -31,7 +32,10 @@ from typing import Any
|
||||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||||
|
||||
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
|
||||
from app.config.settings import settings
|
||||
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback
|
||||
from app.core.llm import get_llm
|
||||
from app.schemas import AgentConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -39,9 +43,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
_SESSION_TTL_SECONDS: int = 1800 # 30 minutes
|
||||
|
||||
# Sentinel strings used to delimit the LLM-produced prompt_template.
|
||||
_TEMPLATE_START = "PROMPT_TEMPLATE_START"
|
||||
_TEMPLATE_END = "PROMPT_TEMPLATE_END"
|
||||
# Sentinel strings used to delimit the LLM-produced AgentConfig JSON.
|
||||
_CONFIG_START = "AGENT_CONFIG_START"
|
||||
_CONFIG_END = "AGENT_CONFIG_END"
|
||||
|
||||
# Minimum turns before we consider nudging the LLM to wrap up.
|
||||
_MIN_TURNS_BEFORE_NUDGE: int = 3
|
||||
@@ -62,6 +66,7 @@ class JourneySession:
|
||||
data_types: list[str]
|
||||
history: list[dict[str, Any]] = field(default_factory=list)
|
||||
system_prompt: str = ""
|
||||
langfuse_prompt: Any = None
|
||||
created_at: float = field(default_factory=time.monotonic)
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
@@ -83,61 +88,76 @@ def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
|
||||
return s
|
||||
|
||||
|
||||
# ── System prompt builder ─────────────────────────────────────────────────
|
||||
# ── System prompt ─────────────────────────────────────────────────────────
|
||||
|
||||
_SYSTEM_PROMPT_TEMPLATE = """\
|
||||
_JOURNEY_SYSTEM_PROMPT = """\
|
||||
You are a friendly assistant helping a freelancer configure a data-extraction agent.
|
||||
Your job is to understand exactly what data the user wants to extract from their
|
||||
local directory and produce a detailed prompt_template that a separate AI will use
|
||||
as its instruction set.
|
||||
|
||||
The extraction agent already has this base behaviour built in:
|
||||
- Reads each file using file-system tools.
|
||||
- Creates records (tasks, notes, timelines, projects) via CRUD tools.
|
||||
- Sets isAiSuggested=1 on every new record.
|
||||
- Only extracts data explicitly present in the files — it never invents information.
|
||||
The user's custom prompt is appended AFTER this base behaviour, so focus on
|
||||
what to look for and how to map it — not on the general extraction mechanics.
|
||||
Your job is to understand what files the user has in their directory and produce a
|
||||
structured AgentConfig JSON that the extraction agent will use as its instruction set.
|
||||
|
||||
You have access to file-system tools to explore the user's directory:
|
||||
- list_directory: to see folder structure
|
||||
- read_file_content: to peek at file contents
|
||||
- get_file_metadata: to check file info
|
||||
- list_directory: see folder structure and file names
|
||||
- read_file_content: peek at a file's content
|
||||
- get_file_metadata: check file size, extension, dates
|
||||
|
||||
The user's configured directory is: {directory}
|
||||
Target data types: {data_types}
|
||||
|
||||
IMPORTANT — project assignment is handled automatically by the main agent runner
|
||||
before the custom prompt is ever used. You MUST NOT ask the user about projects,
|
||||
projectId, or how to link records to projects. Never include projectId logic or
|
||||
project creation instructions in the generated prompt_template.
|
||||
## Your process
|
||||
|
||||
Start by exploring the directory to understand its structure. Then ask concise,
|
||||
focused questions one at a time. Cover these topics (not necessarily in this order):
|
||||
1. The type and format of the source content (confirmed by your exploration).
|
||||
2. How fields should be mapped (e.g. filename → task title).
|
||||
3. Priority or status rules (e.g. "urgent" keyword → high priority).
|
||||
4. Any special handling, date extraction, or exclusions.
|
||||
### Step 1 — Explore the directory
|
||||
Use list_directory and read_file_content to understand what types of files are present
|
||||
(HTML emails, plain-text documents, CSVs, etc.).
|
||||
|
||||
Once you reach 90% confidence, output the final prompt_template between these exact
|
||||
markers on their own lines:
|
||||
### Step 2 — Identify content types
|
||||
For each distinct file type found, decide:
|
||||
- A short id (e.g. "email_html", "plain_text", "csv")
|
||||
- Which preprocessing handler to use: "email_html" for HTML emails, "generic" for everything else
|
||||
- A human-readable label and optional detection_hint
|
||||
|
||||
{template_start}
|
||||
<the complete extraction prompt here>
|
||||
{template_end}
|
||||
### Step 3 — Ask focused questions (one at a time)
|
||||
Cover these topics based on what you discovered:
|
||||
1. How to map content to entity types (task / note / timeline entry)
|
||||
2. Field mapping rules (e.g. email Subject → task title, filename → note title)
|
||||
3. Priority or status rules (e.g. "urgent" in subject → high priority)
|
||||
4. Date extraction (e.g. "by Friday" → dueDate)
|
||||
5. Exclusion rules (e.g. skip newsletters, skip files with no project match)
|
||||
|
||||
The prompt_template must be a self-contained instruction for an AI that reads files
|
||||
and must perform CRUD operations using tools to create records. It should specify:
|
||||
- What entity types to create (tasks, notes, timelines) — never projects.
|
||||
- How to map file content to record fields (camelCase: title, status, priority,
|
||||
dueDate, content, etc.) — never include projectId.
|
||||
- That isAiSuggested must be set to 1 on every new record.
|
||||
- Concrete examples of mappings based on what you discovered in the directory.
|
||||
### Step 4 — Produce the AgentConfig JSON
|
||||
Once you are ≥ 90% confident, output the final config between these exact markers
|
||||
(each on its own line):
|
||||
|
||||
{config_start}
|
||||
{{
|
||||
"content_types": [
|
||||
{{
|
||||
"id": "email_html",
|
||||
"label": "Email HTML",
|
||||
"detection_hint": "HTML file with From/To/Subject headers",
|
||||
"preprocessing": "email_html",
|
||||
"extraction_prompt": "Detailed extraction instructions for this content type..."
|
||||
}}
|
||||
],
|
||||
"global_rules": [
|
||||
"If the file cannot be matched to any project, do not create any entity."
|
||||
],
|
||||
"data_types": {data_types_json}
|
||||
}}
|
||||
{config_end}
|
||||
|
||||
## Rules for the extraction_prompt field
|
||||
- Describe when to create a task vs note vs timeline entry (be specific and concrete)
|
||||
- Include field mapping rules based on what you found in the directory
|
||||
- Include priority/status/date rules if applicable
|
||||
- Do NOT include projectId logic — the runner handles project assignment automatically
|
||||
- Do NOT mention isAiSuggested — the runner always sets it to 1
|
||||
|
||||
## Constraints
|
||||
- Never ask about projects, projectId, or how to link records to projects
|
||||
- Never include projectId or project creation logic in the generated config
|
||||
- Keep asking questions until ≥ 90% confident, then output the JSON immediately
|
||||
|
||||
{existing_section}\
|
||||
Keep asking clarifying questions until you are at least 90% confident you have
|
||||
enough information to generate an accurate prompt_template. Once you reach that
|
||||
confidence level, stop asking and produce the final template immediately.
|
||||
Begin by exploring the directory, then ask your first question.\
|
||||
"""
|
||||
|
||||
@@ -145,33 +165,53 @@ Begin by exploring the directory, then ask your first question.\
|
||||
def _build_system_prompt(
|
||||
directory: str,
|
||||
data_types: list[str],
|
||||
existing_template: str | None = None,
|
||||
) -> str:
|
||||
existing_config: str | None = None,
|
||||
) -> tuple[str, Any]:
|
||||
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
|
||||
existing_section = (
|
||||
f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
|
||||
f"---\n{existing_template}\n---\n"
|
||||
if existing_template
|
||||
"\nThe user already has the following AgentConfig — refine it based on their answers:\n"
|
||||
f"```json\n{existing_config}\n```\n"
|
||||
if existing_config
|
||||
else ""
|
||||
)
|
||||
return _SYSTEM_PROMPT_TEMPLATE.format(
|
||||
template, prompt_obj = get_prompt_or_fallback(
|
||||
"journey_system", _JOURNEY_SYSTEM_PROMPT
|
||||
)
|
||||
compiled = compile_prompt(
|
||||
template,
|
||||
prompt_obj,
|
||||
directory=directory,
|
||||
data_types=", ".join(data_types),
|
||||
template_start=_TEMPLATE_START,
|
||||
template_end=_TEMPLATE_END,
|
||||
data_types_json=json.dumps(data_types),
|
||||
config_start=_CONFIG_START,
|
||||
config_end=_CONFIG_END,
|
||||
existing_section=existing_section,
|
||||
)
|
||||
return compiled, prompt_obj
|
||||
|
||||
|
||||
# ── Template extraction ───────────────────────────────────────────────────
|
||||
# ── AgentConfig extraction ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _extract_template(text: str) -> str | None:
|
||||
"""Return the text between PROMPT_TEMPLATE_START and PROMPT_TEMPLATE_END, or None."""
|
||||
if _TEMPLATE_START not in text or _TEMPLATE_END not in text:
|
||||
def _extract_agent_config(text: str) -> str | None:
|
||||
"""Return validated AgentConfig JSON string from between markers, or None.
|
||||
|
||||
Parses the JSON with Pydantic to ensure it conforms to the schema before
|
||||
returning. Returns None if markers are absent or JSON is invalid.
|
||||
"""
|
||||
if _CONFIG_START not in text or _CONFIG_END not in text:
|
||||
return None
|
||||
start_idx = text.index(_CONFIG_START) + len(_CONFIG_START)
|
||||
end_idx = text.index(_CONFIG_END)
|
||||
raw = text[start_idx:end_idx].strip()
|
||||
if not raw:
|
||||
return None
|
||||
try:
|
||||
parsed = AgentConfig.model_validate_json(raw)
|
||||
return parsed.model_dump_json()
|
||||
except Exception as exc:
|
||||
logger.warning("agent_setup: failed to parse AgentConfig JSON: %s", exc)
|
||||
return None
|
||||
start_idx = text.index(_TEMPLATE_START) + len(_TEMPLATE_START)
|
||||
end_idx = text.index(_TEMPLATE_END)
|
||||
return text[start_idx:end_idx].strip() or None
|
||||
|
||||
|
||||
# ── LLM call with tool support ───────────────────────────────────────────
|
||||
@@ -199,12 +239,17 @@ async def _call_llm_with_tools(
|
||||
system_prompt: str,
|
||||
history: list[dict[str, Any]],
|
||||
tools: list[Any],
|
||||
*,
|
||||
user_id: str = "",
|
||||
session_id: str = "",
|
||||
langfuse_prompt: Any = None,
|
||||
) -> str:
|
||||
"""Build LangChain messages from history and invoke the LLM with tools.
|
||||
|
||||
Handles tool-calling loops: if the LLM calls tools, execute them and
|
||||
continue until a final text response is produced.
|
||||
"""
|
||||
lf = get_langfuse()
|
||||
messages: list[Any] = [SystemMessage(content=system_prompt)]
|
||||
for turn in history:
|
||||
if turn["role"] == "user":
|
||||
@@ -216,11 +261,40 @@ async def _call_llm_with_tools(
|
||||
llm_with_tools = llm.bind_tools(tools)
|
||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||
|
||||
_span_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="span",
|
||||
name="journey-setup",
|
||||
metadata={"user_id": user_id or None, "session_id": session_id or None},
|
||||
input=history[-1]["content"] if history else "",
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||
|
||||
try:
|
||||
for _ in range(_MAX_TOOL_STEPS):
|
||||
_gen_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name="journey-setup-llm",
|
||||
model=settings.LLM_MODEL,
|
||||
prompt=langfuse_prompt,
|
||||
input=messages,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
if _gen_ctx:
|
||||
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||
_gen_ctx.__exit__(None, None, None)
|
||||
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
if _span:
|
||||
_span.update(output=_as_text(response.content))
|
||||
return _as_text(response.content)
|
||||
|
||||
for call in response.tool_calls:
|
||||
@@ -247,7 +321,15 @@ async def _call_llm_with_tools(
|
||||
|
||||
# Fallback: exceeded max steps.
|
||||
final = await llm.ainvoke(messages)
|
||||
return _as_text(final.content)
|
||||
final_text = _as_text(final.content)
|
||||
if _span:
|
||||
_span.update(output=final_text)
|
||||
return final_text
|
||||
finally:
|
||||
if _span_ctx:
|
||||
_span_ctx.__exit__(None, None, None)
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
|
||||
# ── Journey handlers (called from device_ws.py) ──────────────────────────
|
||||
@@ -265,12 +347,12 @@ async def handle_journey_start(
|
||||
agent_type = frame.get("agent_type", "local")
|
||||
directory = frame.get("directory", "")
|
||||
data_types = frame.get("data_types", [])
|
||||
existing_template = frame.get("existing_template")
|
||||
existing_config = frame.get("existing_config")
|
||||
|
||||
# Use the session_id provided by the FE so the reply matches the
|
||||
# listener key; fall back to a generated one if absent.
|
||||
session_id = frame.get("session_id") or str(uuid.uuid4())
|
||||
system_prompt = _build_system_prompt(directory, data_types, existing_template)
|
||||
system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_config)
|
||||
|
||||
session = JourneySession(
|
||||
session_id=session_id,
|
||||
@@ -279,12 +361,11 @@ async def handle_journey_start(
|
||||
directory=directory,
|
||||
data_types=data_types,
|
||||
system_prompt=system_prompt,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
)
|
||||
|
||||
# The LLM will explore the directory using FILESYSTEM_TOOLS via the
|
||||
# ws_context executor (already set by the WS handler before calling us).
|
||||
# Seed with an initial user message — some providers (e.g. GitHub Copilot)
|
||||
# require at least one user/input message to be present.
|
||||
# Seed with an initial user message — some providers require at least one
|
||||
# user/input message to be present.
|
||||
seed_history: list[dict[str, Any]] = [
|
||||
{"role": "user", "content": "Hi, I'm ready to set up my agent. Please explore my directory and ask me your first question."},
|
||||
]
|
||||
@@ -292,6 +373,9 @@ async def handle_journey_start(
|
||||
system_prompt=system_prompt,
|
||||
history=seed_history,
|
||||
tools=list(FILESYSTEM_TOOLS),
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
)
|
||||
|
||||
session.history.extend(seed_history)
|
||||
@@ -305,14 +389,14 @@ async def handle_journey_start(
|
||||
directory,
|
||||
)
|
||||
|
||||
# Check if the LLM produced the template on the first turn (unlikely but possible).
|
||||
prompt_template = _extract_template(ai_reply)
|
||||
done = prompt_template is not None
|
||||
# Check if the LLM produced the config on the first turn (unlikely but possible).
|
||||
agent_config = _extract_agent_config(ai_reply)
|
||||
done = agent_config is not None
|
||||
|
||||
display_message = ai_reply
|
||||
if done:
|
||||
display_message = (
|
||||
ai_reply[: ai_reply.index(_TEMPLATE_START)].strip()
|
||||
ai_reply[: ai_reply.index(_CONFIG_START)].strip()
|
||||
or "Here is your agent configuration. You can save it or continue refining."
|
||||
)
|
||||
_sessions.pop(session_id, None)
|
||||
@@ -322,7 +406,7 @@ async def handle_journey_start(
|
||||
"session_id": session_id,
|
||||
"message": display_message,
|
||||
"done": done,
|
||||
"prompt_template": prompt_template,
|
||||
"agent_config": agent_config,
|
||||
}
|
||||
|
||||
|
||||
@@ -345,7 +429,7 @@ async def handle_journey_message(
|
||||
"session_id": session_id,
|
||||
"message": "Journey session not found or expired. Please start a new setup.",
|
||||
"done": True,
|
||||
"prompt_template": None,
|
||||
"agent_config": None,
|
||||
}
|
||||
|
||||
# Append user turn.
|
||||
@@ -356,22 +440,24 @@ async def handle_journey_message(
|
||||
system_prompt=session.system_prompt,
|
||||
history=session.history,
|
||||
tools=list(FILESYSTEM_TOOLS),
|
||||
user_id=session.user_id,
|
||||
session_id=session_id,
|
||||
langfuse_prompt=session.langfuse_prompt,
|
||||
)
|
||||
|
||||
session.history.append({"role": "assistant", "content": ai_reply})
|
||||
|
||||
# Check if the LLM produced the final template.
|
||||
prompt_template = _extract_template(ai_reply)
|
||||
done = prompt_template is not None
|
||||
# Check if the LLM produced the final config.
|
||||
agent_config = _extract_agent_config(ai_reply)
|
||||
done = agent_config is not None
|
||||
|
||||
# If the LLM didn't produce a template, nudge it once it has asked enough
|
||||
# questions (>= _MIN_TURNS_BEFORE_NUDGE) or hits the hard safety cap.
|
||||
# If the LLM didn't produce a config, nudge it once it hits the hard safety cap.
|
||||
if not done:
|
||||
turns = sum(1 for t in session.history if t["role"] == "user")
|
||||
if turns >= _MAX_TURNS:
|
||||
nudge_content = (
|
||||
"[System: You have enough information. Please generate the final "
|
||||
f"prompt_template now, wrapped in {_TEMPLATE_START} / {_TEMPLATE_END} markers.]"
|
||||
f"AgentConfig JSON now, wrapped in {_CONFIG_START} / {_CONFIG_END} markers.]"
|
||||
)
|
||||
session.history.append({"role": "user", "content": nudge_content})
|
||||
|
||||
@@ -379,19 +465,22 @@ async def handle_journey_message(
|
||||
system_prompt=session.system_prompt,
|
||||
history=session.history,
|
||||
tools=list(FILESYSTEM_TOOLS),
|
||||
user_id=session.user_id,
|
||||
session_id=session_id,
|
||||
langfuse_prompt=session.langfuse_prompt,
|
||||
)
|
||||
session.history.append({"role": "assistant", "content": nudge_reply})
|
||||
|
||||
prompt_template = _extract_template(nudge_reply)
|
||||
if prompt_template is not None:
|
||||
agent_config = _extract_agent_config(nudge_reply)
|
||||
if agent_config is not None:
|
||||
done = True
|
||||
ai_reply = nudge_reply
|
||||
|
||||
display_message = ai_reply
|
||||
if done:
|
||||
display_message = (
|
||||
ai_reply[: ai_reply.index(_TEMPLATE_START)].strip()
|
||||
if _TEMPLATE_START in ai_reply
|
||||
ai_reply[: ai_reply.index(_CONFIG_START)].strip()
|
||||
if _CONFIG_START in ai_reply
|
||||
else "Here is your agent configuration. You can save it or continue refining."
|
||||
)
|
||||
_sessions.pop(session_id, None)
|
||||
@@ -402,5 +491,5 @@ async def handle_journey_message(
|
||||
"session_id": session_id,
|
||||
"message": display_message,
|
||||
"done": done,
|
||||
"prompt_template": prompt_template,
|
||||
"agent_config": agent_config,
|
||||
}
|
||||
|
||||
@@ -52,6 +52,10 @@ class Settings(BaseSettings):
|
||||
|
||||
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
|
||||
|
||||
LANGFUSE_SECRET_KEY: str = ""
|
||||
LANGFUSE_PUBLIC_KEY: str = ""
|
||||
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
|
||||
|
||||
ENV: Literal["dev", "prod"] = "dev"
|
||||
|
||||
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
||||
|
||||
@@ -2,12 +2,12 @@
|
||||
|
||||
Drives two agent types:
|
||||
|
||||
* **Local directory agent** — two-step execution per file:
|
||||
Step 1 (Classification) uses code to fetch all projects and asks the LLM
|
||||
to identify which project the file belongs to and which domains are relevant.
|
||||
Step 2 (Processing) fetches existing entities for that project/domains via
|
||||
code and runs an LLM with tools — existing data in context enforces
|
||||
update-first naturally.
|
||||
* **Local directory agent** — V2 unified flow per file:
|
||||
Phase A (Detect + Preprocess, zero LLM): Python detects the content type
|
||||
and strips markup/noise, producing clean text + metadata.
|
||||
Phase B (Single LLM call with tools): the LLM identifies the project,
|
||||
checks for duplicates via list_* tools, and creates/updates records.
|
||||
``items_created`` is counted from ``create_*`` tool calls.
|
||||
|
||||
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
|
||||
Teams, Outlook) and pushes extracted items to Electron.
|
||||
@@ -29,6 +29,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
@@ -42,8 +43,11 @@ from app.agents.note_agent import NOTE_TOOLS
|
||||
from app.agents.project_agent import PROJECT_TOOLS
|
||||
from app.agents.task_agent import TASK_TOOLS
|
||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||
from app.config.settings import settings
|
||||
from app.core.device_manager import DeviceConnectionManager
|
||||
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback
|
||||
from app.core.llm import get_llm
|
||||
from app.core.preprocessors import detect_content_type, preprocess
|
||||
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
|
||||
from app.db import async_session
|
||||
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
|
||||
@@ -79,88 +83,43 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
|
||||
"timelines": TIMELINE_TOOLS,
|
||||
}
|
||||
|
||||
# ── Step 1: Classification prompt ─────────────────────────────────────────
|
||||
# ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ──
|
||||
|
||||
_DOMAIN_DESCRIPTIONS: dict[str, str] = {
|
||||
"tasks": (
|
||||
"Action items, to-dos, deliverables — anything that describes work to be done, "
|
||||
"assigned to someone, or tracked with a due date or status."
|
||||
),
|
||||
"notes": (
|
||||
"Documentation, meeting notes, summaries, reference material — "
|
||||
"written content meant to be read and referenced rather than acted on."
|
||||
),
|
||||
"timelines": (
|
||||
"Project milestones, deadlines, scheduled events — "
|
||||
"specific dates that mark a point in the progress of a project."
|
||||
),
|
||||
"projects": (
|
||||
"High-level project entities — only relevant if the file clearly introduces "
|
||||
"a new project or updates the scope of an existing one."
|
||||
),
|
||||
}
|
||||
|
||||
_STEP1_SYSTEM_PROMPT = """\
|
||||
You are a file classifier for a freelance project management tool.
|
||||
|
||||
Your job is to match a file to an existing project and identify which data domains to extract.
|
||||
|
||||
## Project matching rules (STRICT — follow in order)
|
||||
|
||||
1. Search the file content for any mention of a project name, client name, acronym, or topic
|
||||
that overlaps with the existing projects listed below.
|
||||
2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough.
|
||||
3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort
|
||||
when the file has zero meaningful connection to any listed project.
|
||||
4. When in doubt, pick the closest match from the list.
|
||||
|
||||
## Response format
|
||||
|
||||
Respond ONLY with a JSON object — no markdown, no explanation:
|
||||
|
||||
{{"project_id": "<exact id from the list below, or new>", "new_project_name": "<concise 2-5 word name, only when project_id is new>", "domains": ["tasks", "notes"]}}
|
||||
|
||||
## Domain definitions (only consider domains in the allowed list)
|
||||
|
||||
{domain_definitions}
|
||||
|
||||
## Existing projects
|
||||
|
||||
{projects_list}
|
||||
"""
|
||||
|
||||
# ── Step 2: Processing prompt ─────────────────────────────────────────────
|
||||
|
||||
_PROCESSING_SYSTEM_PROMPT = """\
|
||||
_UNIFIED_PROCESSING_PROMPT = """\
|
||||
You are a data extraction assistant for a freelance project management tool.
|
||||
|
||||
Your task: extract structured data from the file content and persist it using the available tools.
|
||||
## Your process (follow this exact order)
|
||||
|
||||
## Mandatory process — follow this order for EVERY item you extract
|
||||
### 1. Identify the project
|
||||
File: {filename}
|
||||
{metadata_section}
|
||||
|
||||
1. READ the existing records listed below for the relevant domain.
|
||||
2. SEARCH for a match by title, topic, or semantic similarity.
|
||||
3. If a match exists → call the update_* tool with the existing record's id.
|
||||
4. If no match exists → call the create_* tool and set isAiSuggested=1.
|
||||
Existing projects:
|
||||
{projects_list}
|
||||
|
||||
NEVER call create_* without first checking the existing records.
|
||||
NEVER duplicate a record that already exists under a different wording.
|
||||
Match this file to an existing project using the filename and content clues.
|
||||
If no project matches, {no_match_behavior}.
|
||||
|
||||
## Existing records (source of truth)
|
||||
### 2. Check existing records
|
||||
Once you identify the project, use list_tasks / list_notes / list_timelines
|
||||
(filtered by projectId) to see what already exists.
|
||||
NEVER create a record that already exists under the same or similar title.
|
||||
|
||||
{existing_context}
|
||||
### 3. Extract and create / update
|
||||
{extraction_rules}
|
||||
|
||||
## Context
|
||||
|
||||
Project: {project_context}
|
||||
Domains to extract: {data_types}
|
||||
|
||||
{custom_prompt_section}
|
||||
### Rules
|
||||
- Set isAiSuggested=1 on every new record.
|
||||
- Set projectId on every record (use the id from the project list above).
|
||||
- Update existing records when a match is found by title or topic.
|
||||
- Do NOT invent data — only extract what is clearly stated in the content.
|
||||
- Target entity types: {data_types}.
|
||||
{global_rules}
|
||||
"""
|
||||
|
||||
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
|
||||
|
||||
_CLOUD_PROCESSING_PROMPT = """\
|
||||
_BATCH_CLOUD_PROCESSING_PROMPT = """\
|
||||
You are a data extraction and management assistant for a freelance project
|
||||
management tool.
|
||||
|
||||
@@ -268,8 +227,17 @@ async def _run_agent_with_tools(
|
||||
user_message: str,
|
||||
tools: list[Any],
|
||||
max_steps: int,
|
||||
user_id: str = "",
|
||||
langfuse_prompt: Any = None,
|
||||
agent_name: str = "batch-agent",
|
||||
_tool_calls_out: list[str] | None = None,
|
||||
) -> str:
|
||||
"""Run an LLM agent with tool-calling, returning the final text response."""
|
||||
"""Run an LLM agent with tool-calling, returning the final text response.
|
||||
|
||||
If *_tool_calls_out* is provided, the name of every tool called during the
|
||||
run is appended to it (used by the caller to count ``create_*`` calls).
|
||||
"""
|
||||
lf = get_langfuse()
|
||||
llm = get_llm()
|
||||
llm_with_tools = llm.bind_tools(tools)
|
||||
messages: list[Any] = [
|
||||
@@ -279,12 +247,42 @@ async def _run_agent_with_tools(
|
||||
|
||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||
|
||||
_span_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="span",
|
||||
name=agent_name,
|
||||
metadata={"user_id": user_id} if user_id else None,
|
||||
input=user_message,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||
|
||||
try:
|
||||
for _ in range(max_steps):
|
||||
_gen_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name=f"{agent_name}-llm",
|
||||
model=settings.LLM_MODEL,
|
||||
prompt=langfuse_prompt,
|
||||
input=messages,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
if _gen_ctx:
|
||||
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||
_gen_ctx.__exit__(None, None, None)
|
||||
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
return _as_text(response.content)
|
||||
final_text = _as_text(response.content)
|
||||
if _span:
|
||||
_span.update(output=final_text)
|
||||
return final_text
|
||||
|
||||
for call in response.tool_calls:
|
||||
call_id = str(call.get("id", ""))
|
||||
@@ -296,6 +294,9 @@ async def _run_agent_with_tools(
|
||||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||||
)
|
||||
|
||||
if _tool_calls_out is not None:
|
||||
_tool_calls_out.append(call_name)
|
||||
|
||||
tool_fn = tool_map.get(call_name)
|
||||
if tool_fn is None:
|
||||
tool_output = f"Unknown tool: {call_name}"
|
||||
@@ -310,7 +311,15 @@ async def _run_agent_with_tools(
|
||||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||||
|
||||
final = await llm.ainvoke(messages)
|
||||
return _as_text(final.content)
|
||||
final_text = _as_text(final.content)
|
||||
if _span:
|
||||
_span.update(output=final_text)
|
||||
return final_text
|
||||
finally:
|
||||
if _span_ctx:
|
||||
_span_ctx.__exit__(None, None, None)
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
|
||||
# ── Tool list builder ─────────────────────────────────────────────────────
|
||||
@@ -479,83 +488,66 @@ def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
|
||||
return f"Existing {domain}:\n" + "\n".join(lines)
|
||||
|
||||
|
||||
# ── Step 1: LLM file classifier ───────────────────────────────────────────
|
||||
# ── V2 helper functions ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _classify_file(
|
||||
file_path: str,
|
||||
file_content: str,
|
||||
projects: list[dict],
|
||||
config_data_types: list[str],
|
||||
) -> tuple[str, list[str], str | None]:
|
||||
"""Call the LLM to classify a file by project and relevant domains.
|
||||
|
||||
Returns ``(project_id_or_"new", domains, new_project_name_or_None)``.
|
||||
- ``project_id`` is an existing project UUID, or ``"new"`` when no match found.
|
||||
- ``new_project_name`` is only set when ``project_id == "new"``.
|
||||
Falls back to ``("new", config_data_types, None)`` on any error.
|
||||
"""
|
||||
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
|
||||
|
||||
if not file_content.strip():
|
||||
return fallback
|
||||
|
||||
valid_project_ids = {p["id"] for p in projects}
|
||||
|
||||
def _fmt_project(p: dict) -> str:
|
||||
def _format_projects(projects: list[dict]) -> str:
|
||||
"""Format the project list for the unified system prompt."""
|
||||
if not projects:
|
||||
return " (no projects yet)"
|
||||
lines: list[str] = []
|
||||
for p in projects:
|
||||
summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip()
|
||||
summary_part = f" — {summary[:100]}" if summary else ""
|
||||
return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}"
|
||||
lines.append(
|
||||
f" - id={p['id']} | name={p.get('name', '')} | "
|
||||
f"status={p.get('status', '')}{summary_part}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)"
|
||||
|
||||
domain_definitions = "\n".join(
|
||||
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}"
|
||||
for d in config_data_types
|
||||
if d in _DOMAIN_DESCRIPTIONS
|
||||
def _format_metadata(metadata: dict) -> str:
|
||||
"""Format preprocessor metadata as a compact context block."""
|
||||
if not metadata:
|
||||
return ""
|
||||
parts: list[str] = []
|
||||
for key in ("subject", "from", "to", "date"):
|
||||
if metadata.get(key):
|
||||
parts.append(f"{key.capitalize()}: {metadata[key]}")
|
||||
# any remaining keys
|
||||
for key, val in metadata.items():
|
||||
if key not in ("subject", "from", "to", "date") and val:
|
||||
parts.append(f"{key}: {val}")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def _get_extraction_rules(agent_config: dict, content_type: str) -> str:
|
||||
"""Return the extraction_prompt for *content_type* from *agent_config*.
|
||||
|
||||
Falls back to a generic instruction when the type is not configured.
|
||||
"""
|
||||
for ct in agent_config.get("content_types", []):
|
||||
if ct.get("id") == content_type:
|
||||
prompt = ct.get("extraction_prompt", "").strip()
|
||||
if prompt:
|
||||
return prompt
|
||||
return (
|
||||
"Extract relevant information as tasks (action items), notes "
|
||||
"(informational content), or timelines (dated events)."
|
||||
)
|
||||
|
||||
system = _STEP1_SYSTEM_PROMPT.format(
|
||||
domain_definitions=domain_definitions,
|
||||
projects_list=projects_list,
|
||||
)
|
||||
|
||||
llm = get_llm()
|
||||
try:
|
||||
response = await llm.ainvoke([
|
||||
SystemMessage(content=system),
|
||||
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
|
||||
])
|
||||
raw = _as_text(response.content).strip()
|
||||
# Strip markdown fences if the model wraps the JSON.
|
||||
if raw.startswith("```"):
|
||||
raw = raw.split("```")[1]
|
||||
if raw.startswith("json"):
|
||||
raw = raw[4:]
|
||||
parsed = json.loads(raw.strip())
|
||||
raw_project_id: str = str(parsed.get("project_id") or "new")
|
||||
# Reject hallucinated UUIDs — only accept ids that exist in the fetched list.
|
||||
project_id = raw_project_id if raw_project_id in valid_project_ids else "new"
|
||||
new_project_name: str | None = (
|
||||
str(parsed["new_project_name"]).strip() or None
|
||||
if project_id == "new" and parsed.get("new_project_name")
|
||||
else None
|
||||
)
|
||||
domains: list[str] = [
|
||||
d for d in parsed.get("domains", [])
|
||||
if d in config_data_types
|
||||
]
|
||||
if not domains:
|
||||
domains = list(config_data_types)
|
||||
return project_id, domains, new_project_name
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"agent_runner: step1 classification failed for %r: %s", file_path, exc
|
||||
)
|
||||
return fallback
|
||||
def _get_no_match_behavior(agent_config: dict) -> str:
|
||||
"""Derive the 'no project match' instruction from global_rules."""
|
||||
rules = agent_config.get("global_rules", [])
|
||||
for rule in rules:
|
||||
lower = rule.lower()
|
||||
if "no project" in lower or "no match" in lower or "skip" in lower:
|
||||
return rule
|
||||
return "create a new project with a concise name derived from the file content"
|
||||
|
||||
|
||||
# ── Local agent runner (two-step per file) ────────────────────────────────
|
||||
# ── Local agent runner (V2 — unified per-file flow) ───────────────────────
|
||||
|
||||
|
||||
async def run_local_agent(
|
||||
@@ -565,16 +557,17 @@ async def run_local_agent(
|
||||
device_mgr: DeviceConnectionManager,
|
||||
run_context: dict | None = None,
|
||||
) -> None:
|
||||
"""Execute a local directory agent run using a two-step approach per file.
|
||||
"""Execute a local directory agent run — V2 unified flow.
|
||||
|
||||
Step 1 — Classification (code + 1 LLM call per file, no tools):
|
||||
Code scans directories and fetches all projects via WS.
|
||||
For each file, LLM identifies the project and relevant domains.
|
||||
Phase A — Detect + Preprocess (zero LLM, per file):
|
||||
Python detects the content type from filename + content patterns and
|
||||
runs the appropriate handler (e.g. email_html) to produce clean text
|
||||
and structured metadata.
|
||||
|
||||
Step 2 — Processing (code + 1 LLM call per file, with tools):
|
||||
Code fetches existing entities for the identified project/domains.
|
||||
LLM receives file content + existing entities in context and uses
|
||||
tools to update existing records or create new ones.
|
||||
Phase B — Single LLM call with tools (per file):
|
||||
One LLM call handles project identification, duplicate checking, and
|
||||
record creation/update. ``create_*`` tool calls are counted to
|
||||
produce the accurate ``items_created`` metric.
|
||||
"""
|
||||
run_id = run_log.id
|
||||
agent_id = (run_context or {}).get("agent_id") or config.id
|
||||
@@ -609,12 +602,8 @@ async def run_local_agent(
|
||||
errors: list[str] = []
|
||||
items_processed = 0
|
||||
items_created = 0
|
||||
|
||||
custom_section = (
|
||||
f"User instructions:\n{config.prompt_template}"
|
||||
if config.prompt_template
|
||||
else ""
|
||||
)
|
||||
agent_config: dict = config.agent_config or {}
|
||||
processing_tools = _build_processing_tools(config.data_types)
|
||||
|
||||
try:
|
||||
# ── Code: scan directories ───────────────────────────────────
|
||||
@@ -634,108 +623,82 @@ async def run_local_agent(
|
||||
|
||||
# ── Code: fetch all projects once ────────────────────────────
|
||||
projects = await _fetch_projects()
|
||||
projects_block = _format_projects(projects)
|
||||
|
||||
# Prompt template + Langfuse version linking (hot-swappable from UI).
|
||||
unified_template, prompt_obj = get_prompt_or_fallback(
|
||||
"unified_processing", _UNIFIED_PROCESSING_PROMPT
|
||||
)
|
||||
|
||||
for file_path in file_paths:
|
||||
try:
|
||||
# Read file content via code.
|
||||
# ── Phase A: read + detect + preprocess ─────────────
|
||||
file_result = await execute_on_client(
|
||||
action="read_file_content", data={"path": file_path}
|
||||
)
|
||||
file_content: str = file_result.get("content", "")
|
||||
if not file_content:
|
||||
logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path)
|
||||
raw_content: str = file_result.get("content", "")
|
||||
if not raw_content.strip():
|
||||
logger.debug(
|
||||
"agent_runner: run=%s skipping empty file %r", run_id, file_path
|
||||
)
|
||||
continue
|
||||
|
||||
items_processed += 1
|
||||
filename = os.path.basename(file_path)
|
||||
content_type = detect_content_type(filename, raw_content)
|
||||
preprocessed = preprocess(content_type, raw_content)
|
||||
|
||||
# Step 1 — classify file.
|
||||
project_id, domains, new_project_name = await _classify_file(
|
||||
file_path=file_path,
|
||||
file_content=file_content,
|
||||
projects=projects,
|
||||
config_data_types=config.data_types,
|
||||
)
|
||||
logger.info(
|
||||
"agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s",
|
||||
run_id,
|
||||
file_path,
|
||||
project_id,
|
||||
new_project_name,
|
||||
domains,
|
||||
"agent_runner: run=%s file=%r content_type=%s clean_len=%d",
|
||||
run_id, file_path, content_type, len(preprocessed.clean_text),
|
||||
)
|
||||
|
||||
# Step 2 — resolve project_id via CODE, then fetch entities.
|
||||
# Project creation is NEVER delegated to the Step 2 LLM.
|
||||
if project_id == "new":
|
||||
proj_name = new_project_name or "Untitled Project"
|
||||
try:
|
||||
proj_result = await execute_on_client(
|
||||
action="insert",
|
||||
table="projects",
|
||||
data={"name": proj_name, "clientId": None},
|
||||
# ── Phase B: single LLM call ─────────────────────────
|
||||
extraction_rules = _get_extraction_rules(agent_config, content_type)
|
||||
no_match_behavior = _get_no_match_behavior(agent_config)
|
||||
global_rules_lines = "\n".join(
|
||||
f"- {r}" for r in agent_config.get("global_rules", [])
|
||||
)
|
||||
created = proj_result.get("row", {})
|
||||
effective_project_id = created.get("id", "standalone")
|
||||
# Add to local list so subsequent files can match it.
|
||||
if "id" in created:
|
||||
projects.append(created)
|
||||
logger.info(
|
||||
"agent_runner: run=%s created project %r id=%s",
|
||||
run_id, proj_name, effective_project_id,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"agent_runner: run=%s failed to create project %r: %s",
|
||||
run_id, proj_name, exc,
|
||||
)
|
||||
effective_project_id = "standalone"
|
||||
proj_name = "unknown"
|
||||
project_context = (
|
||||
f"Project: {proj_name} (id: {effective_project_id}). "
|
||||
"Always set projectId to this id on every record you create."
|
||||
)
|
||||
else:
|
||||
effective_project_id = project_id
|
||||
proj = next((p for p in projects if p["id"] == project_id), None)
|
||||
proj_name = proj.get("name", project_id) if proj else project_id
|
||||
project_context = (
|
||||
f"Project: {proj_name} (id: {project_id}). "
|
||||
"Always set projectId to this id on every record you create."
|
||||
metadata_section = _format_metadata(preprocessed.metadata)
|
||||
|
||||
system_prompt = compile_prompt(
|
||||
unified_template,
|
||||
prompt_obj,
|
||||
filename=filename,
|
||||
metadata_section=metadata_section,
|
||||
projects_list=projects_block,
|
||||
no_match_behavior=no_match_behavior,
|
||||
extraction_rules=extraction_rules,
|
||||
global_rules=global_rules_lines,
|
||||
data_types=", ".join(config.data_types),
|
||||
)
|
||||
|
||||
# "projects" domain is never passed to Step 2 — handled above in code.
|
||||
domains = [d for d in domains if d != "projects"]
|
||||
|
||||
existing_blocks: list[str] = []
|
||||
for domain in domains:
|
||||
rows = await _fetch_domain_entities(domain, effective_project_id)
|
||||
existing_blocks.append(_format_entities_for_context(domain, rows))
|
||||
|
||||
existing_context = "\n\n".join(existing_blocks)
|
||||
|
||||
system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
|
||||
existing_context=existing_context,
|
||||
project_context=project_context,
|
||||
data_types=", ".join(domains),
|
||||
custom_prompt_section=custom_section,
|
||||
)
|
||||
|
||||
processing_tools = _build_processing_tools(domains)
|
||||
|
||||
result_text = await _run_agent_with_tools(
|
||||
system_prompt=system_prompt,
|
||||
user_message = (
|
||||
f"Process this file and extract relevant information.\n\n"
|
||||
f"File: {file_path}\n\nContent:\n{file_content}"
|
||||
),
|
||||
f"File: {file_path}\n\n"
|
||||
f"Content:\n{preprocessed.clean_text}"
|
||||
)
|
||||
|
||||
file_tool_calls: list[str] = []
|
||||
result_text = await _run_agent_with_tools(
|
||||
system_prompt=system_prompt,
|
||||
user_message=user_message,
|
||||
tools=processing_tools,
|
||||
max_steps=_MAX_PROCESSING_STEPS,
|
||||
user_id=user_id,
|
||||
langfuse_prompt=prompt_obj,
|
||||
agent_name="unified-processor",
|
||||
_tool_calls_out=file_tool_calls,
|
||||
)
|
||||
|
||||
file_created = sum(
|
||||
1 for name in file_tool_calls if name.startswith("create_")
|
||||
)
|
||||
items_created += file_created
|
||||
|
||||
logger.info(
|
||||
"agent_runner: run=%s file=%r result=%s",
|
||||
run_id,
|
||||
file_path,
|
||||
result_text[:200],
|
||||
"agent_runner: run=%s file=%r created=%d result=%s",
|
||||
run_id, file_path, file_created, result_text[:200],
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
@@ -767,10 +730,11 @@ async def run_local_agent(
|
||||
errors=errors,
|
||||
)
|
||||
logger.info(
|
||||
"agent_runner: run=%s done status=%s processed=%d errors=%d",
|
||||
"agent_runner: run=%s done status=%s processed=%d created=%d errors=%d",
|
||||
run_id,
|
||||
final_status,
|
||||
items_processed,
|
||||
items_created,
|
||||
len(errors),
|
||||
)
|
||||
|
||||
@@ -928,7 +892,12 @@ async def run_cloud_agent(
|
||||
continue
|
||||
items_processed += 1
|
||||
|
||||
processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
|
||||
cloud_template, cloud_prompt_obj = get_prompt_or_fallback(
|
||||
"batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT
|
||||
)
|
||||
processing_prompt = compile_prompt(
|
||||
cloud_template,
|
||||
cloud_prompt_obj,
|
||||
data_types=", ".join(config.data_types),
|
||||
project_context="Determine the appropriate project from the message context.",
|
||||
file_list=f"Message from {config.provider} (id: {msg.id})",
|
||||
@@ -941,6 +910,9 @@ async def run_cloud_agent(
|
||||
user_message=f"Process this message content:\n\n{content_text[:8000]}",
|
||||
tools=processing_tools,
|
||||
max_steps=_MAX_PROCESSING_STEPS,
|
||||
user_id=user_id,
|
||||
langfuse_prompt=cloud_prompt_obj,
|
||||
agent_name="cloud-processor",
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
|
||||
|
||||
@@ -16,7 +16,9 @@ from app.agents.note_agent import NOTE_TOOLS
|
||||
from app.agents.project_agent import PROJECT_TOOLS
|
||||
from app.agents.task_agent import TASK_TOOLS
|
||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
||||
from app.core.llm import get_llm
|
||||
from app.config.settings import settings
|
||||
from app.core.memory_middleware import MemoryMiddleware
|
||||
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
|
||||
from app.db import async_session
|
||||
@@ -26,7 +28,7 @@ logger = logging.getLogger(__name__)
|
||||
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
||||
FloatingDomainSection = Literal["task", "timeline", "note"]
|
||||
|
||||
_HOME_SINGLE_AGENT_SYSTEM = (
|
||||
_HOME_SYSTEM_PROMPT = (
|
||||
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
||||
"Always use tools for factual data retrieval before answering. "
|
||||
"When the user asks to remember, forget, or update what you know about them, use memory tools. "
|
||||
@@ -39,7 +41,7 @@ _HOME_SINGLE_AGENT_SYSTEM = (
|
||||
"For upcoming tasks, after tag lines add a short recommendation based on due date and priority."
|
||||
)
|
||||
|
||||
_FLOATING_SINGLE_AGENT_SYSTEM = (
|
||||
_FLOATING_SYSTEM_PROMPT = (
|
||||
"You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
||||
"Stay focused on the floating scope in context.scope and answer concisely. "
|
||||
"Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. "
|
||||
@@ -48,7 +50,7 @@ _FLOATING_SINGLE_AGENT_SYSTEM = (
|
||||
"If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
|
||||
)
|
||||
|
||||
_FLOATING_DOMAIN_CLASSIFIER_SYSTEM = (
|
||||
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
|
||||
"You are a strict domain classifier for websocket floating requests. "
|
||||
"Return ONLY a JSON object with keys: type, id, section. "
|
||||
"Allowed type values: task, timeline, project, node. "
|
||||
@@ -536,9 +538,8 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
|
||||
|
||||
try:
|
||||
llm = get_llm()
|
||||
response = await llm.ainvoke(
|
||||
[
|
||||
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM),
|
||||
classifier_messages = [
|
||||
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
|
||||
HumanMessage(
|
||||
content=(
|
||||
f"Message:\n{message}\n\n"
|
||||
@@ -546,7 +547,22 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
|
||||
)
|
||||
),
|
||||
]
|
||||
lf = get_langfuse()
|
||||
_, classifier_prompt_obj = get_prompt_or_fallback(
|
||||
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
|
||||
)
|
||||
if lf:
|
||||
with lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name="floating-classifier",
|
||||
model=settings.LLM_MODEL,
|
||||
prompt=classifier_prompt_obj,
|
||||
input=classifier_messages,
|
||||
) as gen:
|
||||
response = await llm.ainvoke(classifier_messages)
|
||||
gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||
else:
|
||||
response = await llm.ainvoke(classifier_messages)
|
||||
parsed = _parse_json_object(_as_text(response.content))
|
||||
if parsed is not None:
|
||||
domain = _normalize_domain_payload(parsed, project_id)
|
||||
@@ -571,8 +587,11 @@ async def _run_single_agent(
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
max_steps: int = 6,
|
||||
langfuse_prompt: Any = None,
|
||||
agent_name: str = "agent",
|
||||
) -> str:
|
||||
trace_id = _trace_id_from_context(context)
|
||||
lf = get_langfuse()
|
||||
llm = get_llm()
|
||||
tools = _all_tools_for_user(user_id, trace_id)
|
||||
model_context = _context_for_model(context)
|
||||
@@ -591,9 +610,36 @@ async def _run_single_agent(
|
||||
tool_calls_count = 0
|
||||
collected: list[dict[str, Any]] = []
|
||||
set_tool_result_collector(collected)
|
||||
|
||||
_span_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="span",
|
||||
name=agent_name,
|
||||
metadata={"user_id": user_id, "session_id": trace_id},
|
||||
input=message,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||
|
||||
try:
|
||||
for _ in range(max_steps):
|
||||
_gen_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name=f"{agent_name}-llm",
|
||||
model=settings.LLM_MODEL,
|
||||
prompt=langfuse_prompt,
|
||||
input=messages,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
if _gen_ctx:
|
||||
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||
_gen_ctx.__exit__(None, None, None)
|
||||
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
@@ -605,6 +651,8 @@ async def _run_single_agent(
|
||||
tool_calls_count,
|
||||
len(final_text),
|
||||
)
|
||||
if _span:
|
||||
_span.update(output=final_text)
|
||||
return final_text
|
||||
|
||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||
@@ -644,9 +692,15 @@ async def _run_single_agent(
|
||||
tool_calls_count,
|
||||
len(final_text),
|
||||
)
|
||||
if _span:
|
||||
_span.update(output=final_text)
|
||||
return final_text
|
||||
finally:
|
||||
clear_tool_result_collector()
|
||||
if _span_ctx:
|
||||
_span_ctx.__exit__(None, None, None)
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
|
||||
async def _run_single_agent_stream(
|
||||
@@ -656,8 +710,11 @@ async def _run_single_agent_stream(
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
max_steps: int = 6,
|
||||
langfuse_prompt: Any = None,
|
||||
agent_name: str = "agent",
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
trace_id = _trace_id_from_context(context)
|
||||
lf = get_langfuse()
|
||||
llm = get_llm()
|
||||
tools = _all_tools_for_user(user_id, trace_id)
|
||||
model_context = _context_for_model(context)
|
||||
@@ -677,9 +734,37 @@ async def _run_single_agent_stream(
|
||||
streamed_chars = 0
|
||||
collected: list[dict[str, Any]] = []
|
||||
set_tool_result_collector(collected)
|
||||
|
||||
_span_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="span",
|
||||
name=f"{agent_name}-stream",
|
||||
metadata={"user_id": user_id, "session_id": trace_id},
|
||||
input=message,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||
streamed_text: list[str] = []
|
||||
|
||||
try:
|
||||
for _ in range(max_steps):
|
||||
_gen_ctx = (
|
||||
lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name=f"{agent_name}-llm",
|
||||
model=settings.LLM_MODEL,
|
||||
prompt=langfuse_prompt,
|
||||
input=messages,
|
||||
)
|
||||
if lf else None
|
||||
)
|
||||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
if _gen_ctx:
|
||||
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||
_gen_ctx.__exit__(None, None, None)
|
||||
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
@@ -688,6 +773,7 @@ async def _run_single_agent_stream(
|
||||
token = _as_text(getattr(chunk, "content", ""))
|
||||
if token:
|
||||
streamed_chars += len(token)
|
||||
streamed_text.append(token)
|
||||
emitted_any = True
|
||||
yield "token", token
|
||||
|
||||
@@ -696,6 +782,7 @@ async def _run_single_agent_stream(
|
||||
fallback_text = _as_text(response.content)
|
||||
if fallback_text:
|
||||
streamed_chars += len(fallback_text)
|
||||
streamed_text.append(fallback_text)
|
||||
yield "token", fallback_text
|
||||
logger.info(
|
||||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||||
@@ -704,6 +791,8 @@ async def _run_single_agent_stream(
|
||||
tool_calls_count,
|
||||
streamed_chars,
|
||||
)
|
||||
if _span:
|
||||
_span.update(output="".join(streamed_text))
|
||||
return
|
||||
|
||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||
@@ -738,6 +827,7 @@ async def _run_single_agent_stream(
|
||||
token = _as_text(getattr(chunk, "content", ""))
|
||||
if token:
|
||||
streamed_chars += len(token)
|
||||
streamed_text.append(token)
|
||||
yield "token", token
|
||||
logger.info(
|
||||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||||
@@ -746,17 +836,28 @@ async def _run_single_agent_stream(
|
||||
tool_calls_count,
|
||||
streamed_chars,
|
||||
)
|
||||
if _span:
|
||||
_span.update(output="".join(streamed_text))
|
||||
finally:
|
||||
clear_tool_result_collector()
|
||||
if _span_ctx:
|
||||
_span_ctx.__exit__(None, None, None)
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
|
||||
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
||||
prepared_context = await _prepare_context(message, context)
|
||||
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||
"home_system", _HOME_SYSTEM_PROMPT
|
||||
)
|
||||
response = await _run_single_agent(
|
||||
user_id=user_id,
|
||||
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
|
||||
system_prompt=system_prompt,
|
||||
message=message,
|
||||
context=prepared_context,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
agent_name="home-agent",
|
||||
)
|
||||
return _normalize_tagged_list_lines(response, message)
|
||||
|
||||
@@ -764,11 +865,16 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
||||
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
|
||||
prepared_context = await _prepare_context(message, context)
|
||||
domain = await _infer_floating_domain(message, prepared_context)
|
||||
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||
"floating_system", _FLOATING_SYSTEM_PROMPT
|
||||
)
|
||||
response = await _run_single_agent(
|
||||
user_id=user_id,
|
||||
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
|
||||
system_prompt=system_prompt,
|
||||
message=message,
|
||||
context=prepared_context,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
agent_name="floating-agent",
|
||||
)
|
||||
sanitized = _strip_floating_markup(response)
|
||||
if not sanitized and response:
|
||||
@@ -782,12 +888,17 @@ async def run_home_stream(
|
||||
context: dict[str, Any],
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
prepared_context = await _prepare_context(message, context)
|
||||
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||
"home_system", _HOME_SYSTEM_PROMPT
|
||||
)
|
||||
text_chunks: list[str] = []
|
||||
async for event in _run_single_agent_stream(
|
||||
user_id=user_id,
|
||||
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
|
||||
system_prompt=system_prompt,
|
||||
message=message,
|
||||
context=prepared_context,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
agent_name="home-agent",
|
||||
):
|
||||
event_type, data = event
|
||||
if event_type != "token":
|
||||
@@ -809,14 +920,19 @@ async def run_floating_stream(
|
||||
domain = await _infer_floating_domain(message, prepared_context)
|
||||
yield "floating_domain", domain
|
||||
|
||||
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||
"floating_system", _FLOATING_SYSTEM_PROMPT
|
||||
)
|
||||
sanitizer = _FloatingStreamSanitizer()
|
||||
emitted_sanitized = False
|
||||
raw_chunks: list[str] = []
|
||||
async for event in _run_single_agent_stream(
|
||||
user_id=user_id,
|
||||
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
|
||||
system_prompt=system_prompt,
|
||||
message=message,
|
||||
context=prepared_context,
|
||||
langfuse_prompt=langfuse_prompt,
|
||||
agent_name="floating-agent",
|
||||
):
|
||||
event_type, data = event
|
||||
if event_type != "token":
|
||||
|
||||
147
app/core/langfuse_client.py
Normal file
147
app/core/langfuse_client.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""Langfuse observability — singleton client and prompt helpers.
|
||||
|
||||
If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set,
|
||||
all helpers are no-ops so the app works without Langfuse configured.
|
||||
|
||||
Usage
|
||||
-----
|
||||
Tracing::
|
||||
|
||||
from app.core.langfuse_client import get_langfuse
|
||||
|
||||
lf = get_langfuse()
|
||||
if lf:
|
||||
with lf.start_as_current_observation(as_type="span", name="my-agent") as span:
|
||||
span.update(input=user_message)
|
||||
# ... do work ...
|
||||
span.update(output=result)
|
||||
lf.flush()
|
||||
|
||||
Prompt management::
|
||||
|
||||
from app.core.langfuse_client import get_prompt_or_fallback
|
||||
|
||||
text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT)
|
||||
# Use text as the system prompt; pass prompt_obj to generations for linking.
|
||||
|
||||
Linking a prompt to a generation::
|
||||
|
||||
with lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name="llm-call",
|
||||
model="gpt-4o",
|
||||
prompt=prompt_obj, # links generation → prompt version in the UI
|
||||
input=messages,
|
||||
) as gen:
|
||||
response = await llm.ainvoke(messages)
|
||||
gen.update(output=response.content, usage=_usage(response))
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_client: Any = None
|
||||
_initialized: bool = False
|
||||
|
||||
|
||||
def get_langfuse() -> Any | None:
|
||||
"""Return the Langfuse singleton, or ``None`` when not configured."""
|
||||
global _client, _initialized
|
||||
if _initialized:
|
||||
return _client
|
||||
_initialized = True
|
||||
|
||||
from app.config.settings import settings # local import to avoid circular deps
|
||||
|
||||
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
|
||||
logger.debug("langfuse: not configured — observability disabled")
|
||||
return None
|
||||
|
||||
try:
|
||||
from langfuse import Langfuse
|
||||
|
||||
_client = Langfuse(
|
||||
secret_key=settings.LANGFUSE_SECRET_KEY,
|
||||
public_key=settings.LANGFUSE_PUBLIC_KEY,
|
||||
host=settings.LANGFUSE_HOST,
|
||||
)
|
||||
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST)
|
||||
except Exception as exc:
|
||||
logger.warning("langfuse: failed to initialize: %s", exc)
|
||||
_client = None
|
||||
|
||||
return _client
|
||||
|
||||
|
||||
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
|
||||
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
|
||||
|
||||
Returns ``(raw_template, prompt_obj_or_None)``.
|
||||
|
||||
* ``raw_template`` — the uncompiled template string. Do NOT call ``.format()``
|
||||
on it directly; use :func:`compile_prompt` instead so the correct variable
|
||||
syntax is applied (``{{var}}`` for Langfuse, ``{var}`` for the fallback).
|
||||
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
|
||||
unavailable / the fetch failed. Pass this to generation observations so
|
||||
Langfuse links the generation to the exact prompt version in the UI.
|
||||
"""
|
||||
lf = get_langfuse()
|
||||
if lf is None:
|
||||
return fallback, None
|
||||
|
||||
try:
|
||||
prompt = lf.get_prompt(name, label="production", fallback=fallback)
|
||||
# For text-type prompts .prompt holds the raw template string.
|
||||
raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback
|
||||
return raw, prompt
|
||||
except Exception as exc:
|
||||
logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc)
|
||||
return fallback, None
|
||||
|
||||
|
||||
def compile_prompt(template: str, prompt_obj: Any, **variables: Any) -> str:
|
||||
"""Compile *template* with *variables*, choosing the right syntax.
|
||||
|
||||
* When *prompt_obj* is a real Langfuse prompt object, calls
|
||||
``prompt_obj.compile(**variables)`` which handles ``{{variable}}``
|
||||
substitution as defined in the Langfuse UI.
|
||||
* When *prompt_obj* is ``None`` (Langfuse unavailable or fetch failed),
|
||||
falls back to ``template.format(**variables)`` which handles the
|
||||
``{variable}`` syntax used in the hardcoded fallback strings.
|
||||
|
||||
This keeps callers oblivious to which syntax is in use.
|
||||
"""
|
||||
if prompt_obj is not None:
|
||||
try:
|
||||
compiled = prompt_obj.compile(**variables)
|
||||
# compile() returns a string for text prompts.
|
||||
if isinstance(compiled, str):
|
||||
return compiled
|
||||
# Chat prompts return a list of dicts — join text parts.
|
||||
if isinstance(compiled, list):
|
||||
return "\n".join(
|
||||
m.get("content", "") for m in compiled if isinstance(m, dict)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"langfuse: compile failed for prompt %r: %s — falling back to .format()",
|
||||
getattr(prompt_obj, "name", "?"),
|
||||
exc,
|
||||
)
|
||||
return template.format(**variables)
|
||||
|
||||
|
||||
def extract_usage(response: Any) -> dict[str, int]:
|
||||
"""Extract token usage from a LangChain AI message into Langfuse format."""
|
||||
meta = getattr(response, "usage_metadata", None)
|
||||
if not meta:
|
||||
return {}
|
||||
return {
|
||||
"input": int(meta.get("input_tokens", 0)),
|
||||
"output": int(meta.get("output_tokens", 0)),
|
||||
"total": int(meta.get("total_tokens", 0)),
|
||||
}
|
||||
104
app/core/preprocessors/__init__.py
Normal file
104
app/core/preprocessors/__init__.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""Preprocessor registry: detect content type and dispatch to handlers.
|
||||
|
||||
Public API
|
||||
----------
|
||||
detect_content_type(filename, raw_content) -> str
|
||||
Heuristic detection based on file extension and content patterns.
|
||||
|
||||
preprocess(content_type, raw_content) -> PreprocessResult
|
||||
Dispatch to the appropriate handler.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from app.core.preprocessors.base import PreprocessResult
|
||||
|
||||
# ── Heuristics ────────────────────────────────────────────────────────
|
||||
|
||||
# Patterns that strongly suggest an email HTML file
|
||||
_EMAIL_SIGNALS = re.compile(
|
||||
r"(Subject:|From:|To:|Date:|Sent:|MIME-Version:|Content-Type:\s*text/html)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Patterns that suggest a generic HTML page (not an email)
|
||||
_GENERIC_HTML_SIGNALS = re.compile(
|
||||
r"<(nav|main|header|footer|article|section)\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def detect_content_type(filename: str, raw_content: str) -> str:
|
||||
"""Return a content-type string for the given file.
|
||||
|
||||
Supported types: ``"email_html"``, ``"generic_html"``,
|
||||
``"plain_text"``, ``"unknown"``.
|
||||
"""
|
||||
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
||||
|
||||
if ext == "txt":
|
||||
return "plain_text"
|
||||
|
||||
if ext in ("html", "htm", "eml", "mhtml", "mht"):
|
||||
# Prefer email detection over generic HTML
|
||||
if _EMAIL_SIGNALS.search(raw_content[:4096]):
|
||||
return "email_html"
|
||||
if _GENERIC_HTML_SIGNALS.search(raw_content[:4096]) or "<html" in raw_content[:200].lower():
|
||||
return "generic_html"
|
||||
# .html without clear signals — check for any email header
|
||||
if re.search(r"^(From|To|Subject|Date):", raw_content[:2048], re.MULTILINE | re.IGNORECASE):
|
||||
return "email_html"
|
||||
return "generic_html"
|
||||
|
||||
# Plain text files with email headers
|
||||
if ext in ("", "txt") or not ext:
|
||||
if _EMAIL_SIGNALS.search(raw_content[:4096]):
|
||||
return "email_html"
|
||||
|
||||
# Detect binary content
|
||||
try:
|
||||
raw_content.encode("utf-8")
|
||||
except (UnicodeEncodeError, AttributeError):
|
||||
return "unknown"
|
||||
|
||||
# Non-text bytes heuristic: high ratio of non-printable chars
|
||||
sample = raw_content[:512]
|
||||
non_printable = sum(1 for c in sample if ord(c) < 32 and c not in "\r\n\t")
|
||||
if len(sample) > 0 and non_printable / len(sample) > 0.1:
|
||||
return "unknown"
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
# ── Generic fallback handler ──────────────────────────────────────────
|
||||
|
||||
def _preprocess_generic(raw_content: str, content_type: str) -> PreprocessResult:
|
||||
"""Strip HTML tags if present, return text as-is."""
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
text = BeautifulSoup(raw_content, "html.parser").get_text(separator="\n")
|
||||
except ImportError:
|
||||
# No BeautifulSoup — strip tags with a simple regex
|
||||
text = re.sub(r"<[^>]+>", "", raw_content)
|
||||
|
||||
text = re.sub(r"\n{3,}", "\n\n", text).strip()
|
||||
return PreprocessResult(content_type=content_type, clean_text=text, metadata={})
|
||||
|
||||
|
||||
# ── Dispatch ──────────────────────────────────────────────────────────
|
||||
|
||||
def preprocess(content_type: str, raw_content: str) -> PreprocessResult:
|
||||
"""Dispatch *raw_content* to the handler registered for *content_type*.
|
||||
|
||||
Falls back to the generic handler for unknown types.
|
||||
"""
|
||||
if content_type == "email_html":
|
||||
from app.core.preprocessors.email_html import preprocess_email_html
|
||||
return preprocess_email_html(raw_content)
|
||||
|
||||
return _preprocess_generic(raw_content, content_type)
|
||||
|
||||
|
||||
__all__ = ["detect_content_type", "preprocess", "PreprocessResult"]
|
||||
25
app/core/preprocessors/base.py
Normal file
25
app/core/preprocessors/base.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Base types for the preprocessor system."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class PreprocessResult:
|
||||
"""Output of a preprocessor handler.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
content_type:
|
||||
The detected content type (e.g. ``"email_html"``, ``"plain_text"``).
|
||||
clean_text:
|
||||
Human-readable text stripped of markup/binary noise.
|
||||
metadata:
|
||||
Dict of extracted metadata (keys vary by handler).
|
||||
Common keys: ``subject``, ``from``, ``to``, ``date``, ``filename``.
|
||||
"""
|
||||
|
||||
content_type: str
|
||||
clean_text: str
|
||||
metadata: dict = field(default_factory=dict)
|
||||
111
app/core/preprocessors/email_html.py
Normal file
111
app/core/preprocessors/email_html.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Preprocessor for email HTML files.
|
||||
|
||||
Handles:
|
||||
- HTML stripping via BeautifulSoup
|
||||
- Metadata extraction (Subject, From, To, Date)
|
||||
- Thread splitting — isolates the latest reply
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app.core.preprocessors.base import PreprocessResult
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
# ── Thread split markers ──────────────────────────────────────────────
|
||||
|
||||
# Matches patterns like:
|
||||
# "On Mon, Apr 7, 2026 at 10:00 AM, Alice <alice@co.com> wrote:"
|
||||
# "-----Original Message-----"
|
||||
# "> " (plain-text quote prefix)
|
||||
_THREAD_PATTERNS = [
|
||||
re.compile(r"^On\s+.+wrote\s*:", re.IGNORECASE | re.MULTILINE),
|
||||
re.compile(r"^-{3,}\s*(original message|forwarded message)\s*-{3,}", re.IGNORECASE | re.MULTILINE),
|
||||
re.compile(r"^>{1,}\s+\S", re.MULTILINE),
|
||||
re.compile(r"^From:\s+.+\nSent:\s+", re.IGNORECASE | re.MULTILINE),
|
||||
]
|
||||
|
||||
# ── Metadata patterns (applied on raw HTML / plain fallback) ──────────
|
||||
|
||||
_META_PATTERNS: dict[str, list[re.Pattern]] = {
|
||||
"subject": [
|
||||
re.compile(r"<title>(.+?)</title>", re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r"Subject:\s*(.+)", re.IGNORECASE),
|
||||
],
|
||||
"from": [
|
||||
re.compile(r'<meta[^>]+name=["\']?from["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||
re.compile(r"From:\s*(.+)", re.IGNORECASE),
|
||||
],
|
||||
"to": [
|
||||
re.compile(r'<meta[^>]+name=["\']?to["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||
re.compile(r"To:\s*(.+)", re.IGNORECASE),
|
||||
],
|
||||
"date": [
|
||||
re.compile(r'<meta[^>]+name=["\']?date["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||
re.compile(r"Date:\s*(.+)", re.IGNORECASE),
|
||||
re.compile(r"Sent:\s*(.+)", re.IGNORECASE),
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _extract_metadata(raw_html: str, text: str) -> dict:
|
||||
"""Extract Subject/From/To/Date from raw HTML or plain text."""
|
||||
metadata: dict[str, str] = {}
|
||||
for field, patterns in _META_PATTERNS.items():
|
||||
for pat in patterns:
|
||||
m = pat.search(raw_html) or pat.search(text)
|
||||
if m:
|
||||
metadata[field] = m.group(1).strip()
|
||||
break
|
||||
return metadata
|
||||
|
||||
|
||||
def _split_thread(text: str) -> str:
|
||||
"""Return only the latest message in a threaded email."""
|
||||
earliest_pos: int | None = None
|
||||
for pat in _THREAD_PATTERNS:
|
||||
m = pat.search(text)
|
||||
if m and (earliest_pos is None or m.start() < earliest_pos):
|
||||
earliest_pos = m.start()
|
||||
|
||||
if earliest_pos is not None and earliest_pos > 0:
|
||||
return text[:earliest_pos].strip()
|
||||
return text.strip()
|
||||
|
||||
|
||||
def preprocess_email_html(raw_content: str) -> PreprocessResult:
|
||||
"""Strip HTML, extract metadata, split thread from an email HTML file."""
|
||||
try:
|
||||
from bs4 import BeautifulSoup # lazy import — optional dep
|
||||
except ImportError as exc:
|
||||
raise ImportError(
|
||||
"beautifulsoup4 is required for email_html preprocessing. "
|
||||
"Install it with: pip install beautifulsoup4"
|
||||
) from exc
|
||||
|
||||
# Parse with lxml if available, fall back to html.parser
|
||||
try:
|
||||
soup = BeautifulSoup(raw_content, "lxml")
|
||||
except Exception:
|
||||
soup = BeautifulSoup(raw_content, "html.parser")
|
||||
|
||||
# Remove noise tags
|
||||
for tag in soup(["style", "script", "head", "noscript"]):
|
||||
tag.decompose()
|
||||
|
||||
clean_text = soup.get_text(separator="\n")
|
||||
# Collapse excessive blank lines
|
||||
clean_text = re.sub(r"\n{3,}", "\n\n", clean_text).strip()
|
||||
|
||||
metadata = _extract_metadata(raw_content, clean_text)
|
||||
latest_message = _split_thread(clean_text)
|
||||
|
||||
return PreprocessResult(
|
||||
content_type="email_html",
|
||||
clean_text=latest_message,
|
||||
metadata=metadata,
|
||||
)
|
||||
@@ -296,6 +296,7 @@ class LocalAgentConfig(Base):
|
||||
directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
|
||||
data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
|
||||
prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
|
||||
agent_config: Mapped[dict | None] = mapped_column(JSON, nullable=True)
|
||||
file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
|
||||
schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *")
|
||||
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
||||
|
||||
@@ -273,6 +273,27 @@ class WsFloatingDomain(BaseModel):
|
||||
domain: WsDomain
|
||||
|
||||
|
||||
# ── Agent Config V2 ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
class ContentTypeConfig(BaseModel):
|
||||
"""Per-type extraction config produced by the journey chatbot."""
|
||||
|
||||
id: str
|
||||
label: str = ""
|
||||
detection_hint: str = ""
|
||||
preprocessing: str = "generic" # handler name: "email_html", "plain_text", ...
|
||||
extraction_prompt: str
|
||||
|
||||
|
||||
class AgentConfig(BaseModel):
|
||||
"""Structured agent configuration (replaces freeform prompt_template)."""
|
||||
|
||||
content_types: list[ContentTypeConfig] = []
|
||||
global_rules: list[str] = []
|
||||
data_types: list[str] = []
|
||||
|
||||
|
||||
# ── Agent Catalog ─────────────────────────────────────────────────────
|
||||
|
||||
class AgentCatalogItem(BaseModel):
|
||||
|
||||
@@ -32,4 +32,8 @@ google-auth-oauthlib>=1.2.0
|
||||
google-auth-httplib2>=0.2.0
|
||||
msal>=1.28.0
|
||||
cryptography>=42.0.0
|
||||
langfuse>=2.0.0
|
||||
beautifulsoup4>=4.12.0
|
||||
lxml>=5.0.0
|
||||
PyYAML>=6.0.0
|
||||
ruff>=0.8.0
|
||||
|
||||
@@ -6,26 +6,21 @@ a per-test session, and a FastAPI ``TestClient`` wired to use it.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator, Generator
|
||||
from unittest.mock import patch
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from fastapi.testclient import TestClient
|
||||
from jose import jwt
|
||||
from moto import mock_aws
|
||||
from sqlalchemy import StaticPool, event
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from app.config.settings import settings
|
||||
from app.db import Base, get_session
|
||||
from app.main import app
|
||||
from app.models import Plugin, Subscription, User
|
||||
from app.models import Subscription, User
|
||||
|
||||
# ── Fixed test user IDs (one per tier) ───────────────────────────────
|
||||
|
||||
@@ -109,79 +104,6 @@ def client(db_session: AsyncSession) -> Generator[TestClient, None, None]: # n
|
||||
app.dependency_overrides.pop(get_session, None)
|
||||
|
||||
|
||||
# ── Seed data helpers ────────────────────────────────────────────────
|
||||
|
||||
_SEED_PLUGINS = [
|
||||
Plugin(
|
||||
id="plugin-github-sync",
|
||||
name="GitHub Sync",
|
||||
description="Sync tasks with GitHub Issues and pull requests.",
|
||||
version="1.0.0",
|
||||
author_name="Adiuva",
|
||||
category="productivity",
|
||||
price_cents=0,
|
||||
permissions=json.dumps(["read:tasks", "write:tasks"]),
|
||||
status="approved",
|
||||
s3_package_key="plugins/plugin-github-sync/1.0.0/package.zip",
|
||||
install_count=0,
|
||||
avg_rating=0.0,
|
||||
),
|
||||
Plugin(
|
||||
id="plugin-slack-notify",
|
||||
name="Slack Notifier",
|
||||
description="Post task and timeline updates to Slack channels.",
|
||||
version="1.2.0",
|
||||
author_name="Adiuva",
|
||||
category="communication",
|
||||
price_cents=499,
|
||||
permissions=json.dumps(["read:tasks", "read:timelines"]),
|
||||
status="approved",
|
||||
s3_package_key="plugins/plugin-slack-notify/1.2.0/package.zip",
|
||||
install_count=0,
|
||||
avg_rating=0.0,
|
||||
),
|
||||
Plugin(
|
||||
id="plugin-time-tracker",
|
||||
name="Time Tracker",
|
||||
description="Track time spent on tasks with automatic reporting.",
|
||||
version="0.9.1",
|
||||
author_name="Third Party",
|
||||
category="productivity",
|
||||
price_cents=999,
|
||||
permissions=json.dumps(["read:tasks", "write:tasks"]),
|
||||
status="approved",
|
||||
s3_package_key="plugins/plugin-time-tracker/0.9.1/package.zip",
|
||||
install_count=0,
|
||||
avg_rating=0.0,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def seed_plugins(db_session: AsyncSession) -> list[Plugin]:
|
||||
"""Insert the 3 default approved plugins and return them."""
|
||||
plugins = []
|
||||
for template in _SEED_PLUGINS:
|
||||
p = Plugin(
|
||||
id=template.id,
|
||||
name=template.name,
|
||||
description=template.description,
|
||||
version=template.version,
|
||||
author_name=template.author_name,
|
||||
category=template.category,
|
||||
price_cents=template.price_cents,
|
||||
permissions=template.permissions,
|
||||
status=template.status,
|
||||
s3_package_key=template.s3_package_key,
|
||||
install_count=template.install_count,
|
||||
avg_rating=template.avg_rating,
|
||||
)
|
||||
db_session.add(p)
|
||||
plugins.append(p)
|
||||
await db_session.commit()
|
||||
return plugins
|
||||
|
||||
|
||||
# ── JWT helpers ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -212,24 +134,21 @@ def auth_header(tier: str = "power", user_id: str | None = None) -> dict[str, st
|
||||
return {"Authorization": f"Bearer {make_jwt(tier, user_id)}"}
|
||||
|
||||
|
||||
# ── S3 mock fixture ──────────────────────────────────────────────────
|
||||
# ── CLI options ───────────────────────────────────────────────────────
|
||||
|
||||
S3_TEST_BUCKET = "test-bucket"
|
||||
S3_TEST_REGION = "us-east-1"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3_bucket():
|
||||
"""Create a mocked S3 bucket via moto and patch BlobStore settings."""
|
||||
with mock_aws():
|
||||
os.environ.setdefault("AWS_ACCESS_KEY_ID", "testing")
|
||||
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testing")
|
||||
os.environ.setdefault("AWS_DEFAULT_REGION", S3_TEST_REGION)
|
||||
client = boto3.client("s3", region_name=S3_TEST_REGION)
|
||||
client.create_bucket(Bucket=S3_TEST_BUCKET)
|
||||
with patch("app.storage.blob_store.settings") as mock_settings:
|
||||
mock_settings.S3_BUCKET = S3_TEST_BUCKET
|
||||
mock_settings.S3_REGION = S3_TEST_REGION
|
||||
mock_settings.AWS_ACCESS_KEY_ID = "testing"
|
||||
mock_settings.AWS_SECRET_ACCESS_KEY = "testing"
|
||||
yield S3_TEST_BUCKET
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption(
|
||||
"--preprocess-dir",
|
||||
default=None,
|
||||
help="Override fixture folder for preprocessor tests (must contain cases.yaml + data/)",
|
||||
)
|
||||
parser.addoption(
|
||||
"--runner-dir",
|
||||
default=None,
|
||||
help="Override fixture folder for agent_runner_v2 eval tests (must contain cases.yaml + data/)",
|
||||
)
|
||||
parser.addoption(
|
||||
"--journey-dir",
|
||||
default=None,
|
||||
help="Override fixture folder for journey_v2 eval tests (must contain cases.yaml + data/)",
|
||||
)
|
||||
|
||||
86
tests/fixtures/agent_runner_v2/cases.yaml
vendored
Normal file
86
tests/fixtures/agent_runner_v2/cases.yaml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
# Agent Runner V2 — eval test cases (Step 2, requires real LLM)
|
||||
#
|
||||
# Each case drives one parametrized `test_eval_runner` invocation.
|
||||
#
|
||||
# Keys
|
||||
# ----
|
||||
# id: str unique identifier shown in pytest output
|
||||
# description: str human-readable label
|
||||
# file: str filename inside data/
|
||||
# file_path: str path reported to the executor (affects project-matching via filename)
|
||||
# projects: [alpha|beta] symbolic project names resolved by the test helper
|
||||
#
|
||||
# Optional pre-existing records (dedup tests)
|
||||
# existing_tasks: list of {id, title, status, priority}
|
||||
# existing_notes: list of {id, title, content}
|
||||
# existing_timelines: list of {id, title, date}
|
||||
#
|
||||
# Assertions (one or more)
|
||||
# expect_insert: <table> at least 1 insert row in this table (tasks|notes|timelines)
|
||||
# expect_no_insert: true zero inserts in any table
|
||||
# expect_project_id: <id> any insert must carry this projectId
|
||||
# expect_dedup: true task inserts == 0 OR task updates >= 1 (dedup check)
|
||||
#
|
||||
# Langfuse
|
||||
# score_name: str observation score name
|
||||
|
||||
- id: "2.1"
|
||||
description: "Action email → create_task"
|
||||
file: email_action.html
|
||||
file_path: /emails/ProjectAlpha_action.html
|
||||
projects: [alpha, beta]
|
||||
expect_insert: tasks
|
||||
score_name: runner.email_to_task
|
||||
|
||||
- id: "2.2"
|
||||
description: "Informational email → create_note"
|
||||
file: email_info.html
|
||||
file_path: /emails/ProjectAlpha_info.html
|
||||
projects: [alpha, beta]
|
||||
expect_insert: notes
|
||||
score_name: runner.email_to_note
|
||||
|
||||
- id: "2.3"
|
||||
description: "Email with meeting date → create_timeline"
|
||||
file: email_date.html
|
||||
file_path: /emails/ProjectAlpha_kickoff.html
|
||||
projects: [alpha, beta]
|
||||
expect_insert: timelines
|
||||
score_name: runner.email_to_timeline
|
||||
|
||||
- id: "2.4"
|
||||
description: "Filename contains project name → correct project assigned"
|
||||
file: email_action.html
|
||||
file_path: /emails/ProjectAlpha_report.html
|
||||
projects: [alpha, beta]
|
||||
expect_project_id: proj-alpha
|
||||
score_name: runner.project_filename
|
||||
|
||||
- id: "2.5"
|
||||
description: "Email body mentions project → correct project assigned"
|
||||
file: email_action.html
|
||||
file_path: /emails/email_001.html
|
||||
projects: [alpha, beta]
|
||||
expect_project_id: proj-alpha
|
||||
score_name: runner.project_content
|
||||
|
||||
- id: "2.6"
|
||||
description: "Newsletter + global rule no-project → no creates"
|
||||
file: email_no_project.html
|
||||
file_path: /emails/newsletter.html
|
||||
projects: [alpha, beta]
|
||||
expect_no_insert: true
|
||||
score_name: runner.no_project
|
||||
|
||||
- id: "2.7"
|
||||
description: "Existing task with same title → dedup (update not create)"
|
||||
file: email_action.html
|
||||
file_path: /emails/ProjectAlpha_followup.html
|
||||
projects: [alpha]
|
||||
existing_tasks:
|
||||
- id: task-existing
|
||||
title: Fix the login bug
|
||||
status: todo
|
||||
priority: medium
|
||||
expect_dedup: true
|
||||
score_name: runner.dedup
|
||||
7
tests/fixtures/agent_runner_v2/data/email_action.html
vendored
Normal file
7
tests/fixtures/agent_runner_v2/data/email_action.html
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
<html><head></head><body>
|
||||
<p><b>From:</b> boss@company.com</p>
|
||||
<p><b>To:</b> dev@company.com</p>
|
||||
<p><b>Subject:</b> Fix the login bug</p>
|
||||
<p><b>Date:</b> 2026-04-07</p>
|
||||
<p>Hi,<br>Please fix the login bug in Project Alpha by Friday. High priority!</p>
|
||||
</body></html>
|
||||
5
tests/fixtures/agent_runner_v2/data/email_date.html
vendored
Normal file
5
tests/fixtures/agent_runner_v2/data/email_date.html
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
<html><head></head><body>
|
||||
<p><b>From:</b> pm@company.com</p>
|
||||
<p><b>Subject:</b> Project Alpha kick-off meeting</p>
|
||||
<p>The kick-off meeting for Project Alpha is scheduled for 2026-04-15 at 10:00.</p>
|
||||
</body></html>
|
||||
7
tests/fixtures/agent_runner_v2/data/email_info.html
vendored
Normal file
7
tests/fixtures/agent_runner_v2/data/email_info.html
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
<html><head></head><body>
|
||||
<p><b>From:</b> pm@company.com</p>
|
||||
<p><b>To:</b> team@company.com</p>
|
||||
<p><b>Subject:</b> FYI: New policy for Project Alpha</p>
|
||||
<p>Just a heads-up that starting next week all code reviews must be done
|
||||
within 24 hours for Project Alpha. No action needed from you now.</p>
|
||||
</body></html>
|
||||
5
tests/fixtures/agent_runner_v2/data/email_no_project.html
vendored
Normal file
5
tests/fixtures/agent_runner_v2/data/email_no_project.html
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
<html><head></head><body>
|
||||
<p><b>From:</b> newsletter@ads.com</p>
|
||||
<p><b>Subject:</b> Weekly newsletter</p>
|
||||
<p>Check out our latest deals on electronics!</p>
|
||||
</body></html>
|
||||
87
tests/fixtures/journey_v2/cases.yaml
vendored
Normal file
87
tests/fixtures/journey_v2/cases.yaml
vendored
Normal file
@@ -0,0 +1,87 @@
|
||||
# Journey V2 eval test cases — Step 4
|
||||
#
|
||||
# Each case simulates a complete journey session:
|
||||
# 1. handle_journey_start is called with directory + data_types
|
||||
# 2. handle_journey_message is called for each entry in user_messages
|
||||
# 3. Assertions are evaluated on the final reply
|
||||
#
|
||||
# directory_files: list of {path, content_file} — content_file is relative to data/
|
||||
#
|
||||
# Assertion keys:
|
||||
# expect_question: true → first reply must contain "?"
|
||||
# expect_done: true → final reply must have done=True
|
||||
# expect_valid_config: true → agent_config must be parseable as AgentConfig with content_types > 0
|
||||
# expect_content_type_id: <str> → AgentConfig.content_types must contain an entry with this id
|
||||
# expect_extraction_contains: <str> → first content_type extraction_prompt must contain this word
|
||||
# expect_global_rules: true → AgentConfig.global_rules must be non-empty
|
||||
|
||||
- id: "4.1"
|
||||
description: "Journey start explores directory, first reply contains a question"
|
||||
directory: "/test/emails"
|
||||
data_types: ["tasks", "notes", "timelines"]
|
||||
directory_files:
|
||||
- path: "/test/emails/outlook_export_2024.html"
|
||||
content_file: "email_action.html"
|
||||
user_messages: []
|
||||
score_name: "journey.start"
|
||||
expect_question: true
|
||||
|
||||
- id: "4.2"
|
||||
description: "Full 3-turn conversation produces a valid AgentConfig JSON"
|
||||
directory: "/test/emails"
|
||||
data_types: ["tasks", "notes", "timelines"]
|
||||
directory_files:
|
||||
- path: "/test/emails/email_backup.html"
|
||||
content_file: "email_action.html"
|
||||
user_messages:
|
||||
- "These are email exports from Outlook in HTML format"
|
||||
- "Create tasks for emails with direct action requests, notes for informational emails"
|
||||
- "Yes, that looks correct. No other rules."
|
||||
score_name: "journey.valid_json"
|
||||
expect_done: true
|
||||
expect_valid_config: true
|
||||
|
||||
- id: "4.3"
|
||||
description: "Journey detects email_html content type from directory exploration"
|
||||
directory: "/test/emails"
|
||||
data_types: ["tasks", "notes"]
|
||||
directory_files:
|
||||
- path: "/test/emails/message.html"
|
||||
content_file: "email_action.html"
|
||||
user_messages:
|
||||
- "HTML email backups from my mail client, exported from Outlook"
|
||||
- "Create tasks from emails that contain assignments or direct action items"
|
||||
- "Correct, no other rules needed"
|
||||
score_name: "journey.detect_email"
|
||||
expect_done: true
|
||||
expect_content_type_id: "email_html"
|
||||
|
||||
- id: "4.4"
|
||||
description: "Custom user rule (only notes, no tasks) reflected in extraction_prompt"
|
||||
directory: "/test/emails"
|
||||
data_types: ["notes"]
|
||||
directory_files:
|
||||
- path: "/test/emails/email.html"
|
||||
content_file: "email_info.html"
|
||||
user_messages:
|
||||
- "HTML emails from my work inbox"
|
||||
- "Create only notes from all emails — I do not want tasks or timelines to be created"
|
||||
- "Yes, exactly"
|
||||
score_name: "journey.custom_rules"
|
||||
expect_done: true
|
||||
expect_extraction_contains: "note"
|
||||
|
||||
- id: "4.5"
|
||||
description: "Global rule (no project = no entity) appears in AgentConfig.global_rules"
|
||||
directory: "/test/emails"
|
||||
data_types: ["tasks", "notes"]
|
||||
directory_files:
|
||||
- path: "/test/emails/email.html"
|
||||
content_file: "email_action.html"
|
||||
user_messages:
|
||||
- "Email backups from Outlook"
|
||||
- "Create tasks from action request emails, notes from informational emails"
|
||||
- "If the email cannot be matched to any project, do not create any entity at all"
|
||||
score_name: "journey.global_rules"
|
||||
expect_done: true
|
||||
expect_global_rules: true
|
||||
23
tests/fixtures/journey_v2/data/email_action.html
vendored
Normal file
23
tests/fixtures/journey_v2/data/email_action.html
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Email: Fix the login bug</title>
|
||||
<style>body { font-family: Arial; } .header { color: #666; }</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="header">
|
||||
<p><strong>From:</strong> boss@company.com</p>
|
||||
<p><strong>To:</strong> dev@company.com</p>
|
||||
<p><strong>Subject:</strong> Fix the login bug</p>
|
||||
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:15:00 +0000</p>
|
||||
</div>
|
||||
<div class="body">
|
||||
<p>Hi,</p>
|
||||
<p>Please fix the login bug in Project Alpha as soon as possible.
|
||||
Users are reporting that they can't log in with their Google accounts.
|
||||
This is blocking the whole team. Please resolve it by Friday.</p>
|
||||
<p>Thanks,<br>Boss</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
23
tests/fixtures/journey_v2/data/email_info.html
vendored
Normal file
23
tests/fixtures/journey_v2/data/email_info.html
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Email: New policy update</title>
|
||||
<style>body { font-family: Arial; }</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="header">
|
||||
<p><strong>From:</strong> hr@company.com</p>
|
||||
<p><strong>To:</strong> all@company.com</p>
|
||||
<p><strong>Subject:</strong> FYI: New remote work policy effective May 1</p>
|
||||
<p><strong>Date:</strong> Tue, 8 Apr 2026 10:00:00 +0000</p>
|
||||
</div>
|
||||
<div class="body">
|
||||
<p>Hi everyone,</p>
|
||||
<p>Just a heads-up that starting May 1, 2026 the company will be moving to
|
||||
a hybrid work model. You will be expected to come into the office at least
|
||||
two days per week. More details will follow in the employee handbook.</p>
|
||||
<p>Best,<br>HR Team</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
68
tests/fixtures/preprocessors/cases.yaml
vendored
Normal file
68
tests/fixtures/preprocessors/cases.yaml
vendored
Normal file
@@ -0,0 +1,68 @@
|
||||
# Preprocessor test cases
|
||||
#
|
||||
# detect: <expected_type> → chiama detect_content_type(filename, content)
|
||||
# process: <content_type> → chiama preprocess(content_type, content)
|
||||
#
|
||||
# Sorgente: file: <nome in data/> oppure generate: binary_noise
|
||||
#
|
||||
# Assertions piatte (solo per process):
|
||||
# no_html: true clean_text senza tag HTML
|
||||
# min_chars: N len(clean_text) >= N
|
||||
# ratio_lt: F len(clean) / len(raw) < F
|
||||
# has_meta: [k, ...] chiavi presenti in metadata
|
||||
# contains: str | [str] substring(s) presenti in clean_text
|
||||
# excludes: str | [str] substring(s) assenti da clean_text
|
||||
# content_type: str result.content_type == questo valore
|
||||
|
||||
- id: "1.1"
|
||||
file: email_action.html
|
||||
detect: email_html
|
||||
|
||||
- id: "1.2"
|
||||
file: generic_page.html
|
||||
detect: generic_html
|
||||
|
||||
- id: "1.3"
|
||||
file: notes.txt
|
||||
detect: plain_text
|
||||
|
||||
- id: "1.4"
|
||||
file: archive.xyz
|
||||
generate: binary_noise
|
||||
detect: unknown
|
||||
|
||||
- id: "1.5"
|
||||
file: email_action.html
|
||||
process: email_html
|
||||
no_html: true
|
||||
min_chars: 50
|
||||
ratio_lt: 0.8
|
||||
|
||||
- id: "1.6"
|
||||
file: email_action.html
|
||||
process: email_html
|
||||
has_meta: [subject, from]
|
||||
|
||||
- id: "1.7"
|
||||
file: email_thread.html
|
||||
process: email_html
|
||||
contains: "Sure, I'll handle the deploy"
|
||||
excludes: "Let's plan the deploy"
|
||||
|
||||
- id: "1.8"
|
||||
file: email_single.html
|
||||
process: email_html
|
||||
contains: "deploy is done"
|
||||
|
||||
- id: "1.9"
|
||||
file: email_heavy.html
|
||||
process: email_html
|
||||
no_html: true
|
||||
min_chars: 30
|
||||
excludes: [border-collapse, font-size]
|
||||
|
||||
- id: "1.10"
|
||||
file: fallback.txt
|
||||
process: unknown
|
||||
min_chars: 1
|
||||
content_type: unknown
|
||||
25
tests/fixtures/preprocessors/data/email_action.html
vendored
Normal file
25
tests/fixtures/preprocessors/data/email_action.html
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Fix the login bug</title>
|
||||
<style>
|
||||
body { font-family: Arial, sans-serif; color: #333; margin: 0; padding: 20px; }
|
||||
.header { background: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }
|
||||
.body { padding: 20px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="header">
|
||||
<p><strong>From:</strong> boss@company.com</p>
|
||||
<p><strong>To:</strong> dev@company.com</p>
|
||||
<p><strong>Subject:</strong> Fix the login bug</p>
|
||||
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:00:00 +0200</p>
|
||||
</div>
|
||||
<div class="body">
|
||||
<p>Hi,</p>
|
||||
<p>Please fix the login bug by Friday. It is blocking the release.</p>
|
||||
<p>Priority: high. Let me know if you need anything.</p>
|
||||
<p>Thanks,<br>Boss</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
49
tests/fixtures/preprocessors/data/email_heavy.html
vendored
Normal file
49
tests/fixtures/preprocessors/data/email_heavy.html
vendored
Normal file
@@ -0,0 +1,49 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<style>
|
||||
table { border-collapse: collapse; width: 100%; max-width: 600px; margin: 0 auto; }
|
||||
td { padding: 8px 12px; border: 1px solid #dddddd; font-size: 12px; color: #444444; }
|
||||
.header-row { background-color: #003366; color: #ffffff; font-weight: bold; }
|
||||
.label-col { background-color: #f0f0f0; width: 80px; font-weight: bold; }
|
||||
.footer-row { font-size: 10px; color: #999999; text-align: center; }
|
||||
</style>
|
||||
</head>
|
||||
<body bgcolor="#eeeeee">
|
||||
<center>
|
||||
<table cellpadding="0" cellspacing="0">
|
||||
<tr class="header-row">
|
||||
<td colspan="2">Company Internal Update</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="label-col">From:</td>
|
||||
<td>newsletter@corp.com</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="label-col">Subject:</td>
|
||||
<td>Q1 Results Update</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="label-col">Date:</td>
|
||||
<td>Apr 7, 2026</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td colspan="2">
|
||||
<table width="100%" cellpadding="10">
|
||||
<tr>
|
||||
<td>
|
||||
<p style="font-size:14px; font-weight:bold;">Dear Team,</p>
|
||||
<p>Q1 results are in. Revenue up 15% year-over-year.</p>
|
||||
<p>Please review the attached report and share any feedback by EOW.</p>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</td>
|
||||
</tr>
|
||||
<tr class="footer-row">
|
||||
<td colspan="2">Confidential — do not forward outside the company.</td>
|
||||
</tr>
|
||||
</table>
|
||||
</center>
|
||||
</body>
|
||||
</html>
|
||||
8
tests/fixtures/preprocessors/data/email_single.html
vendored
Normal file
8
tests/fixtures/preprocessors/data/email_single.html
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
<!DOCTYPE html>
|
||||
<html><body>
|
||||
<p><strong>From:</strong> alice@co.com</p>
|
||||
<p><strong>To:</strong> team@co.com</p>
|
||||
<p><strong>Subject:</strong> Quick update</p>
|
||||
<p><strong>Date:</strong> Tue, 7 Apr 2026 10:30:00 +0200</p>
|
||||
<p>The deploy is done. Everything looks good. No issues so far.</p>
|
||||
</body></html>
|
||||
24
tests/fixtures/preprocessors/data/email_thread.html
vendored
Normal file
24
tests/fixtures/preprocessors/data/email_thread.html
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
<!DOCTYPE html>
|
||||
<html><body>
|
||||
<div class="message-latest">
|
||||
<p><strong>From:</strong> alice@co.com</p>
|
||||
<p><strong>Subject:</strong> Re: Re: Deploy plan</p>
|
||||
<p>Sure, I'll handle the deploy.</p>
|
||||
</div>
|
||||
|
||||
<p>On Mon, Apr 6, 2026 at 3:00 PM, Bob <bob@co.com> wrote:</p>
|
||||
<blockquote>
|
||||
<p>From: bob@co.com</p>
|
||||
<p>Can you handle the deploy?</p>
|
||||
<p>On Sun, Apr 5, 2026 at 1:00 PM, Alice <alice@co.com> wrote:</p>
|
||||
<blockquote>
|
||||
<p>From: alice@co.com</p>
|
||||
<p>Let's plan the deploy for Monday.</p>
|
||||
<p>On Sat, Apr 4, 2026 at 11:00 AM, Charlie <charlie@co.com> wrote:</p>
|
||||
<blockquote>
|
||||
<p>From: charlie@co.com</p>
|
||||
<p>We need to schedule the deploy. What day works?</p>
|
||||
</blockquote>
|
||||
</blockquote>
|
||||
</blockquote>
|
||||
</body></html>
|
||||
3
tests/fixtures/preprocessors/data/fallback.txt
vendored
Normal file
3
tests/fixtures/preprocessors/data/fallback.txt
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
random text content without any structure
|
||||
line two with some words
|
||||
line three and more content here
|
||||
35
tests/fixtures/preprocessors/data/generic_page.html
vendored
Normal file
35
tests/fixtures/preprocessors/data/generic_page.html
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>My Web App</title>
|
||||
<link rel="stylesheet" href="styles.css">
|
||||
</head>
|
||||
<body>
|
||||
<nav>
|
||||
<a href="/">Home</a>
|
||||
<a href="/about">About</a>
|
||||
<a href="/contact">Contact</a>
|
||||
</nav>
|
||||
<main>
|
||||
<header>
|
||||
<h1>Welcome to My App</h1>
|
||||
</header>
|
||||
<article>
|
||||
<p>This is a generic web page with no email headers.</p>
|
||||
<p>It has navigation, main content, and a footer.</p>
|
||||
</article>
|
||||
<section>
|
||||
<h2>Features</h2>
|
||||
<ul>
|
||||
<li>Fast</li>
|
||||
<li>Reliable</li>
|
||||
<li>Secure</li>
|
||||
</ul>
|
||||
</section>
|
||||
</main>
|
||||
<footer>
|
||||
<p>© 2026 My App</p>
|
||||
</footer>
|
||||
</body>
|
||||
</html>
|
||||
15
tests/fixtures/preprocessors/data/notes.txt
vendored
Normal file
15
tests/fixtures/preprocessors/data/notes.txt
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
Meeting notes - April 7, 2026
|
||||
|
||||
Attendees: Alice, Bob, Charlie
|
||||
|
||||
Discussion points:
|
||||
- Deploy scheduled for Friday
|
||||
- Bug fix for login must be completed by Thursday
|
||||
- Review Q1 numbers before EOW
|
||||
|
||||
Action items:
|
||||
- Alice: fix login bug
|
||||
- Bob: prepare deploy checklist
|
||||
- Charlie: send Q1 report
|
||||
|
||||
Next meeting: April 14, 2026
|
||||
432
tests/test_agent_runner_v2.py
Normal file
432
tests/test_agent_runner_v2.py
Normal file
@@ -0,0 +1,432 @@
|
||||
"""Tests for Local Agent V2 runner (Step 2).
|
||||
|
||||
Covers the unified per-file flow:
|
||||
Phase A — detect + preprocess (Python, zero LLM)
|
||||
Phase B — single LLM call with tools (classify + extract + create)
|
||||
|
||||
Fixture-based eval tests (2.1–2.7)
|
||||
-----------------------------------
|
||||
Cases are defined in tests/fixtures/agent_runner_v2/cases.yaml.
|
||||
Email HTML files live in tests/fixtures/agent_runner_v2/data/.
|
||||
Use --runner-dir to point at a custom folder (same structure required).
|
||||
|
||||
Unit tests (no LLM)
|
||||
--------------------
|
||||
2.8 items_created count → items_created == N create_* calls
|
||||
2.9 Device offline → status=error
|
||||
2.10 Empty file → items_processed=0, status=success
|
||||
|
||||
Run:
|
||||
pytest tests/test_agent_runner_v2.py -v
|
||||
pytest tests/test_agent_runner_v2.py -v -k "2_9 or 2_10 or 2_8" # unit only
|
||||
pytest tests/test_agent_runner_v2.py -v -k "eval" # LLM evals only
|
||||
pytest tests/test_agent_runner_v2.py -v --runner-dir /path/to/dir # custom fixtures
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from contextlib import nullcontext
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from app.core.agent_runner import (
|
||||
_format_metadata,
|
||||
_format_projects,
|
||||
_get_extraction_rules,
|
||||
_get_no_match_behavior,
|
||||
_is_overdue,
|
||||
run_local_agent,
|
||||
)
|
||||
from app.core.device_manager import DeviceConnectionManager
|
||||
from app.core.langfuse_client import get_langfuse
|
||||
from app.models import AgentRunLog, LocalAgentConfig
|
||||
from tests.conftest import TEST_USER_IDS
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
_USER_ID = TEST_USER_IDS["power"]
|
||||
|
||||
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "agent_runner_v2"
|
||||
|
||||
_AGENT_CONFIG = {
|
||||
"content_types": [
|
||||
{
|
||||
"id": "email_html",
|
||||
"label": "Email HTML",
|
||||
"detection_hint": "HTML file with From/To/Subject headers",
|
||||
"preprocessing": "email_html",
|
||||
"extraction_prompt": (
|
||||
"If the email contains a direct action request or task assignment → create a task. "
|
||||
"If the email contains informational content, updates, or FYI → create a note. "
|
||||
"If the email mentions a specific date for a meeting or deadline → create a timeline entry."
|
||||
),
|
||||
}
|
||||
],
|
||||
"global_rules": [
|
||||
"Se il file non è riconducibile a nessun progetto, non creare alcuna entità."
|
||||
],
|
||||
"data_types": ["tasks", "notes", "timelines"],
|
||||
}
|
||||
|
||||
# Canonical project definitions, referenced symbolically in cases.yaml.
|
||||
_PROJECTS: dict[str, dict] = {
|
||||
"alpha": {"id": "proj-alpha", "name": "Project Alpha", "status": "active"},
|
||||
"beta": {"id": "proj-beta", "name": "Project Beta", "status": "active"},
|
||||
}
|
||||
|
||||
|
||||
# ── Fixture loading ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _fixtures_dir(config) -> Path:
|
||||
override = config.getoption("--runner-dir")
|
||||
return Path(override) if override else _DEFAULT_FIXTURE_DIR
|
||||
|
||||
|
||||
def _load_cases(config) -> list[dict]:
|
||||
return yaml.safe_load(
|
||||
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
|
||||
)
|
||||
|
||||
|
||||
def _read_case_file(case: dict, data_dir: Path) -> str:
|
||||
return (data_dir / case["file"]).read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def _resolve_projects(entries: list[str | dict]) -> list[dict]:
|
||||
"""Resolve project list from YAML: symbolic names and/or inline dicts."""
|
||||
result = []
|
||||
for entry in entries:
|
||||
if isinstance(entry, str):
|
||||
if entry in _PROJECTS:
|
||||
result.append(_PROJECTS[entry])
|
||||
elif isinstance(entry, dict):
|
||||
result.append(entry)
|
||||
return result
|
||||
|
||||
|
||||
# ── pytest_generate_tests — parametrize eval tests from YAML ─────────────
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
if "runner_case" not in metafunc.fixturenames:
|
||||
return
|
||||
cases = _load_cases(metafunc.config)
|
||||
metafunc.parametrize("runner_case", cases, ids=[c["id"] for c in cases])
|
||||
|
||||
|
||||
# ── Test helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_config(
|
||||
agent_config: dict | None = None,
|
||||
directory: str = "/emails",
|
||||
device_id: str = "dev-001",
|
||||
) -> LocalAgentConfig:
|
||||
return LocalAgentConfig(
|
||||
id=str(uuid.uuid4()),
|
||||
user_id=_USER_ID,
|
||||
device_id=device_id,
|
||||
name="Test V2 Agent",
|
||||
directory_paths=[directory],
|
||||
data_types=["tasks", "notes", "timelines"],
|
||||
prompt_template="",
|
||||
agent_config=agent_config or _AGENT_CONFIG,
|
||||
file_extensions=[".html", ".eml"],
|
||||
schedule_cron="0 */6 * * *",
|
||||
enabled=True,
|
||||
last_run_at=None,
|
||||
)
|
||||
|
||||
|
||||
def _make_run_log(agent_id: str) -> AgentRunLog:
|
||||
return AgentRunLog(
|
||||
id=str(uuid.uuid4()),
|
||||
agent_id=agent_id,
|
||||
agent_type="local",
|
||||
user_id=_USER_ID,
|
||||
status="running",
|
||||
started_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
|
||||
def _make_manager(online: bool = True) -> DeviceConnectionManager:
|
||||
mgr = DeviceConnectionManager()
|
||||
if online:
|
||||
ws = MagicMock()
|
||||
ws.send_text = AsyncMock()
|
||||
mgr.register(_USER_ID, "dev-001", ws)
|
||||
return mgr
|
||||
|
||||
|
||||
def _make_executor(
|
||||
file_path: str,
|
||||
file_content: str,
|
||||
projects: list[dict] | None = None,
|
||||
existing_tasks: list[dict] | None = None,
|
||||
existing_notes: list[dict] | None = None,
|
||||
existing_timelines: list[dict] | None = None,
|
||||
) -> tuple[Any, list[dict]]:
|
||||
"""Return (async_executor, captured_calls).
|
||||
|
||||
The executor handles all ``execute_on_client`` payloads:
|
||||
directory listing, file reading, project/entity fetching, and CRUD.
|
||||
"""
|
||||
calls: list[dict] = []
|
||||
_projects = projects if projects is not None else list(_PROJECTS.values())
|
||||
|
||||
async def _executor(payload: dict) -> dict:
|
||||
action = payload.get("action", "")
|
||||
table = payload.get("table", "")
|
||||
data = payload.get("data") or {}
|
||||
calls.append({"action": action, "table": table, "data": data})
|
||||
|
||||
if action == "list_directory":
|
||||
return {"entries": [{"type": "file", "path": file_path}]}
|
||||
|
||||
if action == "get_file_metadata":
|
||||
return {"modifiedAt": None}
|
||||
|
||||
if action == "read_file_content":
|
||||
return {"content": file_content}
|
||||
|
||||
if action == "select":
|
||||
if table == "projects":
|
||||
return {"rows": _projects}
|
||||
if table == "tasks":
|
||||
return {"rows": existing_tasks or []}
|
||||
if table == "notes":
|
||||
return {"rows": existing_notes or []}
|
||||
if table == "timelines":
|
||||
return {"rows": existing_timelines or []}
|
||||
return {"rows": []}
|
||||
|
||||
if action == "insert":
|
||||
return {"row": {"id": str(uuid.uuid4()), **data}}
|
||||
|
||||
if action == "update":
|
||||
return {"success": True}
|
||||
|
||||
return {}
|
||||
|
||||
return _executor, calls
|
||||
|
||||
|
||||
# ── Unit: helper functions ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_format_projects_empty():
|
||||
assert "(no projects" in _format_projects([])
|
||||
|
||||
|
||||
def test_format_projects_with_data():
|
||||
result = _format_projects([_PROJECTS["alpha"]])
|
||||
assert "proj-alpha" in result
|
||||
assert "Project Alpha" in result
|
||||
|
||||
|
||||
def test_format_metadata_empty():
|
||||
assert _format_metadata({}) == ""
|
||||
|
||||
|
||||
def test_format_metadata_email():
|
||||
meta = {"subject": "Fix bug", "from": "boss@co.com", "date": "2026-04-07"}
|
||||
result = _format_metadata(meta)
|
||||
assert "Fix bug" in result
|
||||
assert "boss@co.com" in result
|
||||
|
||||
|
||||
def test_get_extraction_rules_match():
|
||||
rules = _get_extraction_rules(_AGENT_CONFIG, "email_html")
|
||||
assert "task" in rules.lower()
|
||||
|
||||
|
||||
def test_get_extraction_rules_fallback():
|
||||
rules = _get_extraction_rules(_AGENT_CONFIG, "plain_text")
|
||||
assert "extract" in rules.lower()
|
||||
|
||||
|
||||
def test_get_no_match_behavior_from_global_rules():
|
||||
behavior = _get_no_match_behavior(_AGENT_CONFIG)
|
||||
assert behavior # non-empty
|
||||
|
||||
|
||||
def test_get_no_match_behavior_default():
|
||||
behavior = _get_no_match_behavior({})
|
||||
assert "project" in behavior.lower()
|
||||
|
||||
|
||||
# ── Unit: 2.9 — device offline ───────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_2_9_device_offline():
|
||||
"""2.9 No device online → status=error, no executor created."""
|
||||
config = _make_config()
|
||||
run_log = _make_run_log(config.id)
|
||||
mgr = _make_manager(online=False)
|
||||
|
||||
with patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
|
||||
await run_local_agent(_USER_ID, config, run_log, mgr)
|
||||
|
||||
_, kwargs = mock_fin.call_args
|
||||
assert kwargs["status"] == "error"
|
||||
assert any("not connected" in e for e in kwargs.get("errors", []))
|
||||
|
||||
|
||||
# ── Unit: 2.10 — empty file ──────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_2_10_empty_file():
|
||||
"""2.10 File with empty content → skipped, items_processed=0, success."""
|
||||
config = _make_config()
|
||||
run_log = _make_run_log(config.id)
|
||||
mgr = _make_manager()
|
||||
|
||||
executor, calls = _make_executor(
|
||||
file_path="/emails/empty.html",
|
||||
file_content="",
|
||||
projects=[_PROJECTS["alpha"]],
|
||||
)
|
||||
|
||||
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
|
||||
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
|
||||
await run_local_agent(_USER_ID, config, run_log, mgr)
|
||||
|
||||
_, kwargs = mock_fin.call_args
|
||||
assert kwargs["items_processed"] == 0
|
||||
assert kwargs["status"] == "success"
|
||||
assert kwargs["items_created"] == 0
|
||||
|
||||
|
||||
# ── Unit: 2.8 — items_created count ─────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_2_8_items_created_count():
|
||||
"""2.8 items_created == number of create_* tool calls per run."""
|
||||
config = _make_config()
|
||||
run_log = _make_run_log(config.id)
|
||||
mgr = _make_manager()
|
||||
|
||||
executor, _calls = _make_executor(
|
||||
file_path="/emails/action.html",
|
||||
file_content="<html><body><p>Fix the login bug in Project Alpha.</p></body></html>",
|
||||
projects=[_PROJECTS["alpha"]],
|
||||
)
|
||||
|
||||
async def mock_run_agent(*, _tool_calls_out=None, **kw) -> str:
|
||||
if _tool_calls_out is not None:
|
||||
_tool_calls_out.extend(["create_task", "create_note", "update_task"])
|
||||
return "Done."
|
||||
|
||||
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
|
||||
patch("app.core.agent_runner._run_agent_with_tools", side_effect=mock_run_agent), \
|
||||
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
|
||||
await run_local_agent(_USER_ID, config, run_log, mgr)
|
||||
|
||||
_, kwargs = mock_fin.call_args
|
||||
# Only create_task + create_note count (not update_task).
|
||||
assert kwargs["items_created"] == 2
|
||||
assert kwargs["items_processed"] == 1
|
||||
|
||||
|
||||
# ── Eval: 2.1–2.7 — fixture-driven, real LLM + Langfuse scoring ──────────
|
||||
#
|
||||
# Cases loaded from tests/fixtures/agent_runner_v2/cases.yaml.
|
||||
# Supported assertions (from YAML):
|
||||
# expect_insert: <table> → at least 1 insert in that table
|
||||
# expect_no_insert: true → zero inserts in any table
|
||||
# expect_project_id: <id> → any insert carries this projectId
|
||||
# expect_dedup: true → task inserts == 0 OR task updates >= 1
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.eval
|
||||
async def test_eval_runner(runner_case, pytestconfig):
|
||||
"""Parametrized eval test — one invocation per YAML case."""
|
||||
case: dict = runner_case
|
||||
data_dir = _fixtures_dir(pytestconfig) / "data"
|
||||
file_content = _read_case_file(case, data_dir)
|
||||
projects = _resolve_projects(case.get("projects", []))
|
||||
|
||||
config = _make_config()
|
||||
run_log = _make_run_log(config.id)
|
||||
mgr = _make_manager()
|
||||
|
||||
executor, calls = _make_executor(
|
||||
file_path=case["file_path"],
|
||||
file_content=file_content,
|
||||
projects=projects,
|
||||
existing_tasks=case.get("existing_tasks"),
|
||||
existing_notes=case.get("existing_notes"),
|
||||
existing_timelines=case.get("existing_timelines"),
|
||||
)
|
||||
|
||||
lf = get_langfuse()
|
||||
obs_ctx = lf.start_as_current_observation(
|
||||
name=f"eval-runner-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
|
||||
metadata={"step": "2", "case_id": case["id"]},
|
||||
) if lf else nullcontext()
|
||||
|
||||
with obs_ctx as obs:
|
||||
with patch("app.core.agent_runner._make_agent_executor", return_value=executor), \
|
||||
patch("app.core.agent_runner._finalize_run", new_callable=AsyncMock) as mock_fin:
|
||||
await run_local_agent(_USER_ID, config, run_log, mgr)
|
||||
|
||||
_, kwargs = mock_fin.call_args
|
||||
inserts = [c for c in calls if c["action"] == "insert"]
|
||||
score, comment = _evaluate_case(case, calls, kwargs)
|
||||
|
||||
if obs is not None:
|
||||
obs.score(
|
||||
name=case.get("score_name", f"runner.case_{case['id']}"),
|
||||
value=score,
|
||||
comment=comment,
|
||||
)
|
||||
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
assert score == 1.0, f"[{case['id']}] {case.get('description', '')} — {comment}"
|
||||
|
||||
|
||||
def _evaluate_case(case: dict, calls: list[dict], finalize_kwargs: dict) -> tuple[float, str]:
|
||||
"""Return (score, comment) for a YAML case given the captured executor calls."""
|
||||
inserts = [c for c in calls if c["action"] == "insert"]
|
||||
|
||||
if case.get("expect_no_insert"):
|
||||
score = 1.0 if len(inserts) == 0 else 0.0
|
||||
return score, f"inserts={len(inserts)} (expected 0)"
|
||||
|
||||
if "expect_insert" in case:
|
||||
tables = case["expect_insert"]
|
||||
if isinstance(tables, str):
|
||||
tables = [tables]
|
||||
missing = [t for t in tables if not any(c["table"] == t for c in inserts)]
|
||||
score = 1.0 if not missing else 0.0
|
||||
counts = {t: sum(1 for c in inserts if c["table"] == t) for t in tables}
|
||||
return score, f"inserts={counts}" + (f" missing={missing}" if missing else "")
|
||||
|
||||
if "expect_project_id" in case:
|
||||
expected_pid = case["expect_project_id"]
|
||||
correct = any(c.get("data", {}).get("projectId") == expected_pid for c in inserts)
|
||||
score = 1.0 if correct else 0.0
|
||||
all_pids = [c.get("data", {}).get("projectId") for c in inserts]
|
||||
return score, f"projectIds={all_pids} (expected {expected_pid!r})"
|
||||
|
||||
if case.get("expect_dedup"):
|
||||
task_creates = [c for c in inserts if c["table"] == "tasks"]
|
||||
task_updates = [c for c in calls if c["action"] == "update" and c["table"] == "tasks"]
|
||||
score = 1.0 if len(task_creates) == 0 or len(task_updates) >= 1 else 0.0
|
||||
return score, f"task_creates={len(task_creates)} task_updates={len(task_updates)}"
|
||||
|
||||
return 0.0, "no assertion defined in case"
|
||||
349
tests/test_journey_v2.py
Normal file
349
tests/test_journey_v2.py
Normal file
@@ -0,0 +1,349 @@
|
||||
"""Tests for Local Agent V2 journey setup (Step 4).
|
||||
|
||||
Covers the chatbot journey that produces a structured AgentConfig JSON
|
||||
instead of a freeform prompt_template string.
|
||||
|
||||
Unit tests (no LLM)
|
||||
--------------------
|
||||
4.6a _extract_agent_config: valid JSON → returns serialised config
|
||||
4.6b _extract_agent_config: invalid JSON → returns None
|
||||
4.6c _extract_agent_config: markers absent → returns None
|
||||
4.6d _extract_agent_config: only START marker → returns None
|
||||
4.6e Session not found → done=True, agent_config=None
|
||||
4.6f Nudge uses AGENT_CONFIG_START/END markers (not old PROMPT_TEMPLATE)
|
||||
|
||||
Eval tests (real LLM + Langfuse scoring)
|
||||
-----------------------------------------
|
||||
Cases are defined in tests/fixtures/journey_v2/cases.yaml.
|
||||
Email HTML files live in tests/fixtures/journey_v2/data/.
|
||||
Use --journey-dir to point at a custom folder (same structure required).
|
||||
|
||||
Run:
|
||||
pytest tests/test_journey_v2.py -v
|
||||
pytest tests/test_journey_v2.py -v -k "4_6" # unit only
|
||||
pytest tests/test_journey_v2.py -v -k "eval" # LLM evals only
|
||||
pytest tests/test_journey_v2.py -v --journey-dir /p # custom fixtures
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from contextlib import nullcontext
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from app.api.routes.agent_setup import (
|
||||
_CONFIG_END,
|
||||
_CONFIG_START,
|
||||
_MAX_TURNS,
|
||||
_extract_agent_config,
|
||||
_sessions,
|
||||
handle_journey_message,
|
||||
handle_journey_start,
|
||||
)
|
||||
from app.core.langfuse_client import get_langfuse
|
||||
from app.core.ws_context import clear_client_executor, set_client_executor
|
||||
from app.schemas import AgentConfig
|
||||
from tests.conftest import TEST_USER_IDS
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
_USER_ID = TEST_USER_IDS["power"]
|
||||
|
||||
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "journey_v2"
|
||||
|
||||
# ── Fixture loading ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _fixtures_dir(config) -> Path:
|
||||
override = config.getoption("--journey-dir")
|
||||
return Path(override) if override else _DEFAULT_FIXTURE_DIR
|
||||
|
||||
|
||||
def _load_cases(config) -> list[dict]:
|
||||
return yaml.safe_load(
|
||||
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
|
||||
)
|
||||
|
||||
|
||||
def _read_data_file(filename: str, fixtures_dir: Path) -> str:
|
||||
return (fixtures_dir / "data" / filename).read_text(encoding="utf-8")
|
||||
|
||||
|
||||
# ── pytest_generate_tests ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
if "journey_case" not in metafunc.fixturenames:
|
||||
return
|
||||
cases = _load_cases(metafunc.config)
|
||||
metafunc.parametrize("journey_case", cases, ids=[c["id"] for c in cases])
|
||||
|
||||
|
||||
# ── Executor builder ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_fs_executor(directory_files: list[dict], fixtures_dir: Path):
|
||||
"""Return an async callback that simulates filesystem tool responses.
|
||||
|
||||
Matches the signature expected by ``set_client_executor`` / ``execute_on_client``:
|
||||
receives the full ``payload`` dict and returns a result dict.
|
||||
|
||||
``directory_files`` is a list of ``{path, content_file}`` dicts;
|
||||
``content_file`` is relative to ``fixtures_dir/data/``.
|
||||
"""
|
||||
file_map: dict[str, str] = {
|
||||
entry["path"]: _read_data_file(entry["content_file"], fixtures_dir)
|
||||
for entry in directory_files
|
||||
}
|
||||
|
||||
async def _executor(payload: dict) -> dict:
|
||||
action = payload.get("action", "")
|
||||
data = payload.get("data") or {}
|
||||
|
||||
if action == "list_directory":
|
||||
return {"entries": [
|
||||
{"type": "file", "name": p.split("/")[-1], "path": p}
|
||||
for p in file_map
|
||||
]}
|
||||
|
||||
if action == "read_file_content":
|
||||
path = data.get("path", "")
|
||||
return {"content": file_map.get(path, "")}
|
||||
|
||||
if action == "get_file_metadata":
|
||||
path = data.get("path", "")
|
||||
name = path.split("/")[-1]
|
||||
ext = "." + name.rsplit(".", 1)[-1] if "." in name else ""
|
||||
return {"name": name, "extension": ext, "size": 1024,
|
||||
"createdAt": None, "modifiedAt": None}
|
||||
|
||||
return {}
|
||||
|
||||
return _executor
|
||||
|
||||
|
||||
# ── Journey runner helper ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _run_journey(user_id: str, case: dict, executor) -> dict[str, Any]:
|
||||
"""Drive start + all user_messages for a case. Returns the final reply dict.
|
||||
|
||||
Mirrors ``device_ws._handle_journey_start/message``: sets the client
|
||||
executor (so filesystem tools work) before each handler call.
|
||||
"""
|
||||
session_id = str(uuid.uuid4())
|
||||
try:
|
||||
set_client_executor(executor)
|
||||
reply = await handle_journey_start(user_id, {
|
||||
"agent_type": "local",
|
||||
"directory": case["directory"],
|
||||
"data_types": case["data_types"],
|
||||
"session_id": session_id,
|
||||
})
|
||||
|
||||
for msg in case.get("user_messages", []):
|
||||
if reply.get("done"):
|
||||
break
|
||||
set_client_executor(executor)
|
||||
reply = await handle_journey_message(user_id, {
|
||||
"session_id": reply["session_id"],
|
||||
"message": msg,
|
||||
})
|
||||
finally:
|
||||
clear_client_executor()
|
||||
_sessions.pop(session_id, None)
|
||||
|
||||
return reply
|
||||
|
||||
|
||||
# ── Assertion helper ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _evaluate_case(case: dict, reply: dict) -> tuple[float, str]:
|
||||
"""Return (score, comment) for a journey case given the final reply dict."""
|
||||
if case.get("expect_question"):
|
||||
has_q = "?" in reply.get("message", "")
|
||||
return (1.0 if has_q else 0.0), f"first_reply_has_question={has_q}"
|
||||
|
||||
if case.get("expect_done") and not reply.get("done"):
|
||||
return 0.0, "expected done=True but journey did not complete"
|
||||
|
||||
agent_config_raw = reply.get("agent_config")
|
||||
|
||||
if case.get("expect_valid_config"):
|
||||
if not agent_config_raw:
|
||||
return 0.0, "agent_config is None"
|
||||
try:
|
||||
parsed = AgentConfig.model_validate_json(agent_config_raw)
|
||||
valid = len(parsed.content_types) > 0
|
||||
return (1.0 if valid else 0.0), f"content_types={len(parsed.content_types)}"
|
||||
except Exception as exc:
|
||||
return 0.0, f"parse error: {exc}"
|
||||
|
||||
if case.get("expect_content_type_id"):
|
||||
expected_id = case["expect_content_type_id"]
|
||||
if not agent_config_raw:
|
||||
return 0.0, "agent_config is None"
|
||||
try:
|
||||
parsed = AgentConfig.model_validate_json(agent_config_raw)
|
||||
ids = [ct.id for ct in parsed.content_types]
|
||||
found = expected_id in ids
|
||||
return (1.0 if found else 0.0), f"content_type_ids={ids}, expected={expected_id}"
|
||||
except Exception as exc:
|
||||
return 0.0, f"parse error: {exc}"
|
||||
|
||||
if case.get("expect_extraction_contains"):
|
||||
keyword = case["expect_extraction_contains"].lower()
|
||||
if not agent_config_raw:
|
||||
return 0.0, "agent_config is None"
|
||||
try:
|
||||
parsed = AgentConfig.model_validate_json(agent_config_raw)
|
||||
if not parsed.content_types:
|
||||
return 0.0, "no content_types in config"
|
||||
prompt = parsed.content_types[0].extraction_prompt.lower()
|
||||
found = keyword in prompt
|
||||
return (1.0 if found else 0.0), f"keyword='{keyword}' in extraction_prompt={found}"
|
||||
except Exception as exc:
|
||||
return 0.0, f"parse error: {exc}"
|
||||
|
||||
if case.get("expect_global_rules"):
|
||||
if not agent_config_raw:
|
||||
return 0.0, "agent_config is None"
|
||||
try:
|
||||
parsed = AgentConfig.model_validate_json(agent_config_raw)
|
||||
has_rules = len(parsed.global_rules) > 0
|
||||
return (1.0 if has_rules else 0.0), f"global_rules={parsed.global_rules}"
|
||||
except Exception as exc:
|
||||
return 0.0, f"parse error: {exc}"
|
||||
|
||||
return 1.0, "no specific assertion"
|
||||
|
||||
|
||||
# ── Unit tests ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_4_6a_extract_valid_json():
|
||||
"""_extract_agent_config: valid JSON between markers → returns serialised config."""
|
||||
config = AgentConfig(
|
||||
content_types=[],
|
||||
global_rules=["No project = no entity"],
|
||||
data_types=["tasks"],
|
||||
)
|
||||
text = f"Some preamble\n{_CONFIG_START}\n{config.model_dump_json()}\n{_CONFIG_END}\nTrailing"
|
||||
result = _extract_agent_config(text)
|
||||
assert result is not None
|
||||
parsed = AgentConfig.model_validate_json(result)
|
||||
assert parsed.global_rules == ["No project = no entity"]
|
||||
|
||||
|
||||
def test_4_6b_extract_invalid_json():
|
||||
"""_extract_agent_config: malformed JSON between markers → returns None."""
|
||||
text = f"{_CONFIG_START}\n{{not: valid json\n{_CONFIG_END}"
|
||||
assert _extract_agent_config(text) is None
|
||||
|
||||
|
||||
def test_4_6c_extract_markers_absent():
|
||||
"""_extract_agent_config: no markers at all → returns None."""
|
||||
assert _extract_agent_config("No markers here at all") is None
|
||||
|
||||
|
||||
def test_4_6d_extract_only_start_marker():
|
||||
"""_extract_agent_config: START without END → returns None."""
|
||||
assert _extract_agent_config(f"text {_CONFIG_START} no end marker") is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_4_6e_session_not_found():
|
||||
"""4.6e Session not found → done=True, agent_config=None, informative message."""
|
||||
reply = await handle_journey_message(_USER_ID, {
|
||||
"session_id": "nonexistent-session-id",
|
||||
"message": "Hello",
|
||||
})
|
||||
assert reply["done"] is True
|
||||
assert reply["agent_config"] is None
|
||||
assert "not found" in reply["message"].lower() or "expired" in reply["message"].lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_4_6f_nudge_uses_new_markers():
|
||||
"""4.6f Nudge injected after max turns uses AGENT_CONFIG markers, not PROMPT_TEMPLATE."""
|
||||
session_id = str(uuid.uuid4())
|
||||
captured_histories: list[list[dict]] = []
|
||||
|
||||
async def _mock_llm(system_prompt, history, tools, **kwargs) -> str:
|
||||
captured_histories.append(list(history))
|
||||
# Return plain text — no markers — to trigger the nudge path.
|
||||
return "I still need more information from you."
|
||||
|
||||
from app.api.routes.agent_setup import JourneySession
|
||||
|
||||
fake_session = JourneySession(
|
||||
session_id=session_id,
|
||||
user_id=_USER_ID,
|
||||
agent_type="local",
|
||||
directory="/test",
|
||||
data_types=["tasks"],
|
||||
system_prompt="system",
|
||||
langfuse_prompt=None,
|
||||
)
|
||||
# Fill history to the turn limit so the next message triggers the nudge.
|
||||
for i in range(_MAX_TURNS):
|
||||
fake_session.history.append({"role": "user", "content": f"msg {i}"})
|
||||
fake_session.history.append({"role": "assistant", "content": "ok"})
|
||||
_sessions[session_id] = fake_session
|
||||
|
||||
try:
|
||||
with patch("app.api.routes.agent_setup._call_llm_with_tools", side_effect=_mock_llm):
|
||||
await handle_journey_message(_USER_ID, {
|
||||
"session_id": session_id,
|
||||
"message": "one more message to trigger nudge",
|
||||
})
|
||||
finally:
|
||||
_sessions.pop(session_id, None)
|
||||
|
||||
# Second LLM call receives the nudge appended to history.
|
||||
assert len(captured_histories) >= 2, "Expected ≥ 2 LLM calls (main reply + nudge)"
|
||||
nudge_history = captured_histories[1]
|
||||
user_msgs = " ".join(t["content"] for t in nudge_history if t["role"] == "user")
|
||||
assert _CONFIG_START in user_msgs, f"Nudge must reference {_CONFIG_START}"
|
||||
assert _CONFIG_END in user_msgs, f"Nudge must reference {_CONFIG_END}"
|
||||
assert "PROMPT_TEMPLATE" not in user_msgs, "Old PROMPT_TEMPLATE markers must not appear in nudge"
|
||||
|
||||
|
||||
# ── Eval tests (real LLM + Langfuse) ─────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.eval
|
||||
async def test_eval_journey(journey_case, pytestconfig):
|
||||
"""Parametrized eval test — one invocation per YAML case."""
|
||||
case: dict = journey_case
|
||||
fixtures_dir = _fixtures_dir(pytestconfig)
|
||||
executor = _make_fs_executor(case.get("directory_files", []), fixtures_dir)
|
||||
|
||||
lf = get_langfuse()
|
||||
obs_ctx = lf.start_as_current_observation(
|
||||
name=f"eval-journey-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
|
||||
metadata={"step": "4", "case_id": case["id"]},
|
||||
) if lf else nullcontext()
|
||||
|
||||
with obs_ctx as obs:
|
||||
reply = await _run_journey(_USER_ID, case, executor)
|
||||
score, comment = _evaluate_case(case, reply)
|
||||
|
||||
if obs is not None:
|
||||
obs.score(
|
||||
name=case.get("score_name", f"journey.case_{case['id']}"),
|
||||
value=score,
|
||||
comment=comment,
|
||||
)
|
||||
|
||||
if lf:
|
||||
lf.flush()
|
||||
|
||||
assert score == 1.0, f"[{case['id']}] {case.get('description', '')} — {comment}"
|
||||
98
tests/test_preprocessors.py
Normal file
98
tests/test_preprocessors.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
|
||||
|
||||
Run:
|
||||
pytest tests/test_preprocessors.py -v
|
||||
pytest tests/test_preprocessors.py -v --preprocess-dir /path/to/folder
|
||||
|
||||
The folder must contain cases.yaml + data/.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from app.core.preprocessors import detect_content_type, preprocess
|
||||
|
||||
_DEFAULT_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
|
||||
|
||||
_GENERATORS = {
|
||||
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
|
||||
}
|
||||
|
||||
|
||||
def _fixtures_dir(config) -> Path:
|
||||
override = config.getoption("--preprocess-dir")
|
||||
return Path(override) if override else _DEFAULT_DIR
|
||||
|
||||
|
||||
def _load_cases(config) -> list[dict]:
|
||||
return yaml.safe_load((_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def _content(case: dict, data_dir: Path) -> str:
|
||||
if "generate" in case:
|
||||
return _GENERATORS[case["generate"]]
|
||||
return (data_dir / case["file"]).read_text(encoding="utf-8")
|
||||
|
||||
|
||||
# ── parametrize at collection time via pytest hook ────────────────────
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
if "preprocess_case" not in metafunc.fixturenames:
|
||||
return
|
||||
cases = _load_cases(metafunc.config)
|
||||
test_name = metafunc.function.__name__
|
||||
if test_name == "test_detect":
|
||||
subset = [c for c in cases if "detect" in c]
|
||||
else:
|
||||
subset = [c for c in cases if "process" in c]
|
||||
metafunc.parametrize("preprocess_case", subset, ids=[c["id"] for c in subset])
|
||||
|
||||
|
||||
# ── detect ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_detect(preprocess_case, pytestconfig) -> None:
|
||||
case = preprocess_case
|
||||
data_dir = _fixtures_dir(pytestconfig) / "data"
|
||||
raw = _content(case, data_dir)
|
||||
filename = case.get("file", "")
|
||||
ct = detect_content_type(filename, raw)
|
||||
expected = case["detect"]
|
||||
assert ct == expected, f"[{case['id']}] expected {expected!r}, got {ct!r}"
|
||||
|
||||
|
||||
# ── preprocess ────────────────────────────────────────────────────────
|
||||
|
||||
def test_preprocess(preprocess_case, pytestconfig) -> None:
|
||||
case = preprocess_case
|
||||
data_dir = _fixtures_dir(pytestconfig) / "data"
|
||||
raw = _content(case, data_dir)
|
||||
result = preprocess(case["process"], raw)
|
||||
|
||||
if case.get("no_html"):
|
||||
assert not re.search(r"<[^>]+>", result.clean_text), "clean_text contains HTML tags"
|
||||
|
||||
if "min_chars" in case:
|
||||
assert len(result.clean_text) >= case["min_chars"], \
|
||||
f"clean_text too short: {len(result.clean_text)} < {case['min_chars']}"
|
||||
|
||||
if "ratio_lt" in case:
|
||||
ratio = len(result.clean_text) / len(raw)
|
||||
assert ratio < case["ratio_lt"], f"compression ratio {ratio:.2f} >= {case['ratio_lt']}"
|
||||
|
||||
for key in case.get("has_meta", []):
|
||||
assert result.metadata.get(key), f"metadata missing {key!r} (got {result.metadata})"
|
||||
|
||||
for item in ([case["contains"]] if isinstance(case.get("contains"), str) else case.get("contains", [])):
|
||||
assert item in result.clean_text, f"clean_text missing {item!r}"
|
||||
|
||||
for item in ([case["excludes"]] if isinstance(case.get("excludes"), str) else case.get("excludes", [])):
|
||||
assert item not in result.clean_text, f"clean_text contains forbidden {item!r}"
|
||||
|
||||
if "content_type" in case:
|
||||
assert result.content_type == case["content_type"], \
|
||||
f"expected content_type {case['content_type']!r}, got {result.content_type!r}"
|
||||
Reference in New Issue
Block a user