Compare commits
5 Commits
feature/mi
...
3cc32569d9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cc32569d9 | ||
|
|
bf445ac2ce | ||
|
|
a2d6d689e4 | ||
|
|
aa8bcbf0d8 | ||
|
|
1ce1d492b0 |
@@ -39,6 +39,13 @@ QDRANT_URL=
|
|||||||
QDRANT_API_KEY=
|
QDRANT_API_KEY=
|
||||||
# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333
|
# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333
|
||||||
|
|
||||||
|
# ── Langfuse (leave empty to disable observability) ───────────────────────────
|
||||||
|
LANGFUSE_SECRET_KEY=
|
||||||
|
LANGFUSE_PUBLIC_KEY=
|
||||||
|
# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default)
|
||||||
|
# LANGFUSE_HOST=https://us.cloud.langfuse.com # US
|
||||||
|
# LANGFUSE_HOST=http://localhost:3000 # Self-hosted
|
||||||
|
|
||||||
# ── CORS ──────────────────────────────────────────────────────────────────────
|
# ── CORS ──────────────────────────────────────────────────────────────────────
|
||||||
# Comma-separated list parsed by Settings (override default if needed)
|
# Comma-separated list parsed by Settings (override default if needed)
|
||||||
# CORS_ORIGINS=["app://.","http://localhost:3000"]
|
# CORS_ORIGINS=["app://.","http://localhost:3000"]
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ from typing import Any
|
|||||||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||||||
|
|
||||||
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
|
from app.agents.filesystem_agent import FILESYSTEM_TOOLS
|
||||||
|
from app.config.settings import settings
|
||||||
|
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
||||||
from app.core.llm import get_llm
|
from app.core.llm import get_llm
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -62,6 +64,7 @@ class JourneySession:
|
|||||||
data_types: list[str]
|
data_types: list[str]
|
||||||
history: list[dict[str, Any]] = field(default_factory=list)
|
history: list[dict[str, Any]] = field(default_factory=list)
|
||||||
system_prompt: str = ""
|
system_prompt: str = ""
|
||||||
|
langfuse_prompt: Any = None
|
||||||
created_at: float = field(default_factory=time.monotonic)
|
created_at: float = field(default_factory=time.monotonic)
|
||||||
|
|
||||||
def is_expired(self) -> bool:
|
def is_expired(self) -> bool:
|
||||||
@@ -85,7 +88,7 @@ def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
|
|||||||
|
|
||||||
# ── System prompt builder ─────────────────────────────────────────────────
|
# ── System prompt builder ─────────────────────────────────────────────────
|
||||||
|
|
||||||
_SYSTEM_PROMPT_TEMPLATE = """\
|
_JOURNEY_SYSTEM_PROMPT = """\
|
||||||
You are a friendly assistant helping a freelancer configure a data-extraction agent.
|
You are a friendly assistant helping a freelancer configure a data-extraction agent.
|
||||||
Your job is to understand exactly what data the user wants to extract from their
|
Your job is to understand exactly what data the user wants to extract from their
|
||||||
local directory and produce a detailed prompt_template that a separate AI will use
|
local directory and produce a detailed prompt_template that a separate AI will use
|
||||||
@@ -146,20 +149,25 @@ def _build_system_prompt(
|
|||||||
directory: str,
|
directory: str,
|
||||||
data_types: list[str],
|
data_types: list[str],
|
||||||
existing_template: str | None = None,
|
existing_template: str | None = None,
|
||||||
) -> str:
|
) -> tuple[str, Any]:
|
||||||
|
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
|
||||||
existing_section = (
|
existing_section = (
|
||||||
f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
|
f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
|
||||||
f"---\n{existing_template}\n---\n"
|
f"---\n{existing_template}\n---\n"
|
||||||
if existing_template
|
if existing_template
|
||||||
else ""
|
else ""
|
||||||
)
|
)
|
||||||
return _SYSTEM_PROMPT_TEMPLATE.format(
|
template, prompt_obj = get_prompt_or_fallback(
|
||||||
|
"journey_system", _JOURNEY_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
|
compiled = template.format(
|
||||||
directory=directory,
|
directory=directory,
|
||||||
data_types=", ".join(data_types),
|
data_types=", ".join(data_types),
|
||||||
template_start=_TEMPLATE_START,
|
template_start=_TEMPLATE_START,
|
||||||
template_end=_TEMPLATE_END,
|
template_end=_TEMPLATE_END,
|
||||||
existing_section=existing_section,
|
existing_section=existing_section,
|
||||||
)
|
)
|
||||||
|
return compiled, prompt_obj
|
||||||
|
|
||||||
|
|
||||||
# ── Template extraction ───────────────────────────────────────────────────
|
# ── Template extraction ───────────────────────────────────────────────────
|
||||||
@@ -199,12 +207,17 @@ async def _call_llm_with_tools(
|
|||||||
system_prompt: str,
|
system_prompt: str,
|
||||||
history: list[dict[str, Any]],
|
history: list[dict[str, Any]],
|
||||||
tools: list[Any],
|
tools: list[Any],
|
||||||
|
*,
|
||||||
|
user_id: str = "",
|
||||||
|
session_id: str = "",
|
||||||
|
langfuse_prompt: Any = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Build LangChain messages from history and invoke the LLM with tools.
|
"""Build LangChain messages from history and invoke the LLM with tools.
|
||||||
|
|
||||||
Handles tool-calling loops: if the LLM calls tools, execute them and
|
Handles tool-calling loops: if the LLM calls tools, execute them and
|
||||||
continue until a final text response is produced.
|
continue until a final text response is produced.
|
||||||
"""
|
"""
|
||||||
|
lf = get_langfuse()
|
||||||
messages: list[Any] = [SystemMessage(content=system_prompt)]
|
messages: list[Any] = [SystemMessage(content=system_prompt)]
|
||||||
for turn in history:
|
for turn in history:
|
||||||
if turn["role"] == "user":
|
if turn["role"] == "user":
|
||||||
@@ -216,11 +229,41 @@ async def _call_llm_with_tools(
|
|||||||
llm_with_tools = llm.bind_tools(tools)
|
llm_with_tools = llm.bind_tools(tools)
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
|
|
||||||
|
_span_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="span",
|
||||||
|
name="journey-setup",
|
||||||
|
user_id=user_id or None,
|
||||||
|
session_id=session_id or None,
|
||||||
|
input=history[-1]["content"] if history else "",
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
|
|
||||||
|
try:
|
||||||
for _ in range(_MAX_TOOL_STEPS):
|
for _ in range(_MAX_TOOL_STEPS):
|
||||||
|
_gen_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name="journey-setup-llm",
|
||||||
|
model=settings.LLM_MODEL,
|
||||||
|
prompt=langfuse_prompt,
|
||||||
|
input=messages,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||||
|
if _gen_ctx:
|
||||||
|
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
_gen_ctx.__exit__(None, None, None)
|
||||||
|
|
||||||
messages.append(response)
|
messages.append(response)
|
||||||
|
|
||||||
if not response.tool_calls:
|
if not response.tool_calls:
|
||||||
|
if _span:
|
||||||
|
_span.update(output=_as_text(response.content))
|
||||||
return _as_text(response.content)
|
return _as_text(response.content)
|
||||||
|
|
||||||
for call in response.tool_calls:
|
for call in response.tool_calls:
|
||||||
@@ -247,7 +290,15 @@ async def _call_llm_with_tools(
|
|||||||
|
|
||||||
# Fallback: exceeded max steps.
|
# Fallback: exceeded max steps.
|
||||||
final = await llm.ainvoke(messages)
|
final = await llm.ainvoke(messages)
|
||||||
return _as_text(final.content)
|
final_text = _as_text(final.content)
|
||||||
|
if _span:
|
||||||
|
_span.update(output=final_text)
|
||||||
|
return final_text
|
||||||
|
finally:
|
||||||
|
if _span_ctx:
|
||||||
|
_span_ctx.__exit__(None, None, None)
|
||||||
|
if lf:
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
|
||||||
# ── Journey handlers (called from device_ws.py) ──────────────────────────
|
# ── Journey handlers (called from device_ws.py) ──────────────────────────
|
||||||
@@ -270,7 +321,7 @@ async def handle_journey_start(
|
|||||||
# Use the session_id provided by the FE so the reply matches the
|
# Use the session_id provided by the FE so the reply matches the
|
||||||
# listener key; fall back to a generated one if absent.
|
# listener key; fall back to a generated one if absent.
|
||||||
session_id = frame.get("session_id") or str(uuid.uuid4())
|
session_id = frame.get("session_id") or str(uuid.uuid4())
|
||||||
system_prompt = _build_system_prompt(directory, data_types, existing_template)
|
system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_template)
|
||||||
|
|
||||||
session = JourneySession(
|
session = JourneySession(
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
@@ -279,6 +330,7 @@ async def handle_journey_start(
|
|||||||
directory=directory,
|
directory=directory,
|
||||||
data_types=data_types,
|
data_types=data_types,
|
||||||
system_prompt=system_prompt,
|
system_prompt=system_prompt,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
)
|
)
|
||||||
|
|
||||||
# The LLM will explore the directory using FILESYSTEM_TOOLS via the
|
# The LLM will explore the directory using FILESYSTEM_TOOLS via the
|
||||||
@@ -292,6 +344,9 @@ async def handle_journey_start(
|
|||||||
system_prompt=system_prompt,
|
system_prompt=system_prompt,
|
||||||
history=seed_history,
|
history=seed_history,
|
||||||
tools=list(FILESYSTEM_TOOLS),
|
tools=list(FILESYSTEM_TOOLS),
|
||||||
|
user_id=user_id,
|
||||||
|
session_id=session_id,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
)
|
)
|
||||||
|
|
||||||
session.history.extend(seed_history)
|
session.history.extend(seed_history)
|
||||||
@@ -356,6 +411,9 @@ async def handle_journey_message(
|
|||||||
system_prompt=session.system_prompt,
|
system_prompt=session.system_prompt,
|
||||||
history=session.history,
|
history=session.history,
|
||||||
tools=list(FILESYSTEM_TOOLS),
|
tools=list(FILESYSTEM_TOOLS),
|
||||||
|
user_id=session.user_id,
|
||||||
|
session_id=session_id,
|
||||||
|
langfuse_prompt=session.langfuse_prompt,
|
||||||
)
|
)
|
||||||
|
|
||||||
session.history.append({"role": "assistant", "content": ai_reply})
|
session.history.append({"role": "assistant", "content": ai_reply})
|
||||||
@@ -379,6 +437,9 @@ async def handle_journey_message(
|
|||||||
system_prompt=session.system_prompt,
|
system_prompt=session.system_prompt,
|
||||||
history=session.history,
|
history=session.history,
|
||||||
tools=list(FILESYSTEM_TOOLS),
|
tools=list(FILESYSTEM_TOOLS),
|
||||||
|
user_id=session.user_id,
|
||||||
|
session_id=session_id,
|
||||||
|
langfuse_prompt=session.langfuse_prompt,
|
||||||
)
|
)
|
||||||
session.history.append({"role": "assistant", "content": nudge_reply})
|
session.history.append({"role": "assistant", "content": nudge_reply})
|
||||||
|
|
||||||
|
|||||||
@@ -52,6 +52,10 @@ class Settings(BaseSettings):
|
|||||||
|
|
||||||
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
|
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
|
||||||
|
|
||||||
|
LANGFUSE_SECRET_KEY: str = ""
|
||||||
|
LANGFUSE_PUBLIC_KEY: str = ""
|
||||||
|
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
|
||||||
|
|
||||||
ENV: Literal["dev", "prod"] = "dev"
|
ENV: Literal["dev", "prod"] = "dev"
|
||||||
|
|
||||||
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
||||||
|
|||||||
@@ -42,7 +42,9 @@ from app.agents.note_agent import NOTE_TOOLS
|
|||||||
from app.agents.project_agent import PROJECT_TOOLS
|
from app.agents.project_agent import PROJECT_TOOLS
|
||||||
from app.agents.task_agent import TASK_TOOLS
|
from app.agents.task_agent import TASK_TOOLS
|
||||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||||
|
from app.config.settings import settings
|
||||||
from app.core.device_manager import DeviceConnectionManager
|
from app.core.device_manager import DeviceConnectionManager
|
||||||
|
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
||||||
from app.core.llm import get_llm
|
from app.core.llm import get_llm
|
||||||
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
|
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
|
||||||
from app.db import async_session
|
from app.db import async_session
|
||||||
@@ -100,7 +102,7 @@ _DOMAIN_DESCRIPTIONS: dict[str, str] = {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
_STEP1_SYSTEM_PROMPT = """\
|
_BATCH_FILE_CLASSIFIER_PROMPT = """\
|
||||||
You are a file classifier for a freelance project management tool.
|
You are a file classifier for a freelance project management tool.
|
||||||
|
|
||||||
Your job is to match a file to an existing project and identify which data domains to extract.
|
Your job is to match a file to an existing project and identify which data domains to extract.
|
||||||
@@ -131,7 +133,7 @@ Respond ONLY with a JSON object — no markdown, no explanation:
|
|||||||
|
|
||||||
# ── Step 2: Processing prompt ─────────────────────────────────────────────
|
# ── Step 2: Processing prompt ─────────────────────────────────────────────
|
||||||
|
|
||||||
_PROCESSING_SYSTEM_PROMPT = """\
|
_BATCH_PROCESSING_PROMPT = """\
|
||||||
You are a data extraction assistant for a freelance project management tool.
|
You are a data extraction assistant for a freelance project management tool.
|
||||||
|
|
||||||
Your task: extract structured data from the file content and persist it using the available tools.
|
Your task: extract structured data from the file content and persist it using the available tools.
|
||||||
@@ -160,7 +162,7 @@ Domains to extract: {data_types}
|
|||||||
|
|
||||||
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
|
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
|
||||||
|
|
||||||
_CLOUD_PROCESSING_PROMPT = """\
|
_BATCH_CLOUD_PROCESSING_PROMPT = """\
|
||||||
You are a data extraction and management assistant for a freelance project
|
You are a data extraction and management assistant for a freelance project
|
||||||
management tool.
|
management tool.
|
||||||
|
|
||||||
@@ -268,8 +270,12 @@ async def _run_agent_with_tools(
|
|||||||
user_message: str,
|
user_message: str,
|
||||||
tools: list[Any],
|
tools: list[Any],
|
||||||
max_steps: int,
|
max_steps: int,
|
||||||
|
user_id: str = "",
|
||||||
|
langfuse_prompt: Any = None,
|
||||||
|
agent_name: str = "batch-agent",
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Run an LLM agent with tool-calling, returning the final text response."""
|
"""Run an LLM agent with tool-calling, returning the final text response."""
|
||||||
|
lf = get_langfuse()
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
llm_with_tools = llm.bind_tools(tools)
|
llm_with_tools = llm.bind_tools(tools)
|
||||||
messages: list[Any] = [
|
messages: list[Any] = [
|
||||||
@@ -279,12 +285,42 @@ async def _run_agent_with_tools(
|
|||||||
|
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
|
|
||||||
|
_span_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="span",
|
||||||
|
name=agent_name,
|
||||||
|
user_id=user_id or None,
|
||||||
|
input=user_message,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
|
|
||||||
|
try:
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
|
_gen_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name=f"{agent_name}-llm",
|
||||||
|
model=settings.LLM_MODEL,
|
||||||
|
prompt=langfuse_prompt,
|
||||||
|
input=messages,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||||
|
if _gen_ctx:
|
||||||
|
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
_gen_ctx.__exit__(None, None, None)
|
||||||
|
|
||||||
messages.append(response)
|
messages.append(response)
|
||||||
|
|
||||||
if not response.tool_calls:
|
if not response.tool_calls:
|
||||||
return _as_text(response.content)
|
final_text = _as_text(response.content)
|
||||||
|
if _span:
|
||||||
|
_span.update(output=final_text)
|
||||||
|
return final_text
|
||||||
|
|
||||||
for call in response.tool_calls:
|
for call in response.tool_calls:
|
||||||
call_id = str(call.get("id", ""))
|
call_id = str(call.get("id", ""))
|
||||||
@@ -310,7 +346,15 @@ async def _run_agent_with_tools(
|
|||||||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||||||
|
|
||||||
final = await llm.ainvoke(messages)
|
final = await llm.ainvoke(messages)
|
||||||
return _as_text(final.content)
|
final_text = _as_text(final.content)
|
||||||
|
if _span:
|
||||||
|
_span.update(output=final_text)
|
||||||
|
return final_text
|
||||||
|
finally:
|
||||||
|
if _span_ctx:
|
||||||
|
_span_ctx.__exit__(None, None, None)
|
||||||
|
if lf:
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
|
||||||
# ── Tool list builder ─────────────────────────────────────────────────────
|
# ── Tool list builder ─────────────────────────────────────────────────────
|
||||||
@@ -515,17 +559,33 @@ async def _classify_file(
|
|||||||
if d in _DOMAIN_DESCRIPTIONS
|
if d in _DOMAIN_DESCRIPTIONS
|
||||||
)
|
)
|
||||||
|
|
||||||
system = _STEP1_SYSTEM_PROMPT.format(
|
step1_template, step1_prompt_obj = get_prompt_or_fallback(
|
||||||
|
"batch_file_classifier", _BATCH_FILE_CLASSIFIER_PROMPT
|
||||||
|
)
|
||||||
|
system = step1_template.format(
|
||||||
domain_definitions=domain_definitions,
|
domain_definitions=domain_definitions,
|
||||||
projects_list=projects_list,
|
projects_list=projects_list,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
lf = get_langfuse()
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
try:
|
classifier_messages = [
|
||||||
response = await llm.ainvoke([
|
|
||||||
SystemMessage(content=system),
|
SystemMessage(content=system),
|
||||||
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
|
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
|
||||||
])
|
]
|
||||||
|
try:
|
||||||
|
if lf:
|
||||||
|
with lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name="step1-classifier",
|
||||||
|
model=settings.LLM_ROUTER_MODEL,
|
||||||
|
prompt=step1_prompt_obj,
|
||||||
|
input=classifier_messages,
|
||||||
|
) as gen:
|
||||||
|
response = await llm.ainvoke(classifier_messages)
|
||||||
|
gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
else:
|
||||||
|
response = await llm.ainvoke(classifier_messages)
|
||||||
raw = _as_text(response.content).strip()
|
raw = _as_text(response.content).strip()
|
||||||
# Strip markdown fences if the model wraps the JSON.
|
# Strip markdown fences if the model wraps the JSON.
|
||||||
if raw.startswith("```"):
|
if raw.startswith("```"):
|
||||||
@@ -713,7 +773,10 @@ async def run_local_agent(
|
|||||||
|
|
||||||
existing_context = "\n\n".join(existing_blocks)
|
existing_context = "\n\n".join(existing_blocks)
|
||||||
|
|
||||||
system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
|
step2_template, step2_prompt_obj = get_prompt_or_fallback(
|
||||||
|
"batch_processing", _BATCH_PROCESSING_PROMPT
|
||||||
|
)
|
||||||
|
system_prompt = step2_template.format(
|
||||||
existing_context=existing_context,
|
existing_context=existing_context,
|
||||||
project_context=project_context,
|
project_context=project_context,
|
||||||
data_types=", ".join(domains),
|
data_types=", ".join(domains),
|
||||||
@@ -730,6 +793,9 @@ async def run_local_agent(
|
|||||||
),
|
),
|
||||||
tools=processing_tools,
|
tools=processing_tools,
|
||||||
max_steps=_MAX_PROCESSING_STEPS,
|
max_steps=_MAX_PROCESSING_STEPS,
|
||||||
|
user_id=user_id,
|
||||||
|
langfuse_prompt=step2_prompt_obj,
|
||||||
|
agent_name="step2-processor",
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: run=%s file=%r result=%s",
|
"agent_runner: run=%s file=%r result=%s",
|
||||||
@@ -928,7 +994,10 @@ async def run_cloud_agent(
|
|||||||
continue
|
continue
|
||||||
items_processed += 1
|
items_processed += 1
|
||||||
|
|
||||||
processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
|
cloud_template, cloud_prompt_obj = get_prompt_or_fallback(
|
||||||
|
"batch_cloud_processing", _BATCH_CLOUD_PROCESSING_PROMPT
|
||||||
|
)
|
||||||
|
processing_prompt = cloud_template.format(
|
||||||
data_types=", ".join(config.data_types),
|
data_types=", ".join(config.data_types),
|
||||||
project_context="Determine the appropriate project from the message context.",
|
project_context="Determine the appropriate project from the message context.",
|
||||||
file_list=f"Message from {config.provider} (id: {msg.id})",
|
file_list=f"Message from {config.provider} (id: {msg.id})",
|
||||||
@@ -941,6 +1010,9 @@ async def run_cloud_agent(
|
|||||||
user_message=f"Process this message content:\n\n{content_text[:8000]}",
|
user_message=f"Process this message content:\n\n{content_text[:8000]}",
|
||||||
tools=processing_tools,
|
tools=processing_tools,
|
||||||
max_steps=_MAX_PROCESSING_STEPS,
|
max_steps=_MAX_PROCESSING_STEPS,
|
||||||
|
user_id=user_id,
|
||||||
|
langfuse_prompt=cloud_prompt_obj,
|
||||||
|
agent_name="cloud-processor",
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
|
errors.append(f"LLM processing error for message {msg.id!r}: {exc}")
|
||||||
|
|||||||
@@ -16,7 +16,9 @@ from app.agents.note_agent import NOTE_TOOLS
|
|||||||
from app.agents.project_agent import PROJECT_TOOLS
|
from app.agents.project_agent import PROJECT_TOOLS
|
||||||
from app.agents.task_agent import TASK_TOOLS
|
from app.agents.task_agent import TASK_TOOLS
|
||||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||||
|
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
||||||
from app.core.llm import get_llm
|
from app.core.llm import get_llm
|
||||||
|
from app.config.settings import settings
|
||||||
from app.core.memory_middleware import MemoryMiddleware
|
from app.core.memory_middleware import MemoryMiddleware
|
||||||
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
|
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
|
||||||
from app.db import async_session
|
from app.db import async_session
|
||||||
@@ -26,7 +28,7 @@ logger = logging.getLogger(__name__)
|
|||||||
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
||||||
FloatingDomainSection = Literal["task", "timeline", "note"]
|
FloatingDomainSection = Literal["task", "timeline", "note"]
|
||||||
|
|
||||||
_HOME_SINGLE_AGENT_SYSTEM = (
|
_HOME_SYSTEM_PROMPT = (
|
||||||
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
||||||
"Always use tools for factual data retrieval before answering. "
|
"Always use tools for factual data retrieval before answering. "
|
||||||
"When the user asks to remember, forget, or update what you know about them, use memory tools. "
|
"When the user asks to remember, forget, or update what you know about them, use memory tools. "
|
||||||
@@ -39,7 +41,7 @@ _HOME_SINGLE_AGENT_SYSTEM = (
|
|||||||
"For upcoming tasks, after tag lines add a short recommendation based on due date and priority."
|
"For upcoming tasks, after tag lines add a short recommendation based on due date and priority."
|
||||||
)
|
)
|
||||||
|
|
||||||
_FLOATING_SINGLE_AGENT_SYSTEM = (
|
_FLOATING_SYSTEM_PROMPT = (
|
||||||
"You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
"You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
|
||||||
"Stay focused on the floating scope in context.scope and answer concisely. "
|
"Stay focused on the floating scope in context.scope and answer concisely. "
|
||||||
"Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. "
|
"Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers. "
|
||||||
@@ -48,7 +50,7 @@ _FLOATING_SINGLE_AGENT_SYSTEM = (
|
|||||||
"If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
|
"If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
|
||||||
)
|
)
|
||||||
|
|
||||||
_FLOATING_DOMAIN_CLASSIFIER_SYSTEM = (
|
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
|
||||||
"You are a strict domain classifier for websocket floating requests. "
|
"You are a strict domain classifier for websocket floating requests. "
|
||||||
"Return ONLY a JSON object with keys: type, id, section. "
|
"Return ONLY a JSON object with keys: type, id, section. "
|
||||||
"Allowed type values: task, timeline, project, node. "
|
"Allowed type values: task, timeline, project, node. "
|
||||||
@@ -536,9 +538,8 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
response = await llm.ainvoke(
|
classifier_messages = [
|
||||||
[
|
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
|
||||||
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM),
|
|
||||||
HumanMessage(
|
HumanMessage(
|
||||||
content=(
|
content=(
|
||||||
f"Message:\n{message}\n\n"
|
f"Message:\n{message}\n\n"
|
||||||
@@ -546,7 +547,22 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
|
|||||||
)
|
)
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
lf = get_langfuse()
|
||||||
|
_, classifier_prompt_obj = get_prompt_or_fallback(
|
||||||
|
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
|
||||||
)
|
)
|
||||||
|
if lf:
|
||||||
|
with lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name="floating-classifier",
|
||||||
|
model=settings.LLM_MODEL,
|
||||||
|
prompt=classifier_prompt_obj,
|
||||||
|
input=classifier_messages,
|
||||||
|
) as gen:
|
||||||
|
response = await llm.ainvoke(classifier_messages)
|
||||||
|
gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
else:
|
||||||
|
response = await llm.ainvoke(classifier_messages)
|
||||||
parsed = _parse_json_object(_as_text(response.content))
|
parsed = _parse_json_object(_as_text(response.content))
|
||||||
if parsed is not None:
|
if parsed is not None:
|
||||||
domain = _normalize_domain_payload(parsed, project_id)
|
domain = _normalize_domain_payload(parsed, project_id)
|
||||||
@@ -571,8 +587,11 @@ async def _run_single_agent(
|
|||||||
message: str,
|
message: str,
|
||||||
context: dict[str, Any],
|
context: dict[str, Any],
|
||||||
max_steps: int = 6,
|
max_steps: int = 6,
|
||||||
|
langfuse_prompt: Any = None,
|
||||||
|
agent_name: str = "agent",
|
||||||
) -> str:
|
) -> str:
|
||||||
trace_id = _trace_id_from_context(context)
|
trace_id = _trace_id_from_context(context)
|
||||||
|
lf = get_langfuse()
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
tools = _all_tools_for_user(user_id, trace_id)
|
tools = _all_tools_for_user(user_id, trace_id)
|
||||||
model_context = _context_for_model(context)
|
model_context = _context_for_model(context)
|
||||||
@@ -591,9 +610,37 @@ async def _run_single_agent(
|
|||||||
tool_calls_count = 0
|
tool_calls_count = 0
|
||||||
collected: list[dict[str, Any]] = []
|
collected: list[dict[str, Any]] = []
|
||||||
set_tool_result_collector(collected)
|
set_tool_result_collector(collected)
|
||||||
|
|
||||||
|
_span_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="span",
|
||||||
|
name=agent_name,
|
||||||
|
user_id=user_id,
|
||||||
|
session_id=trace_id,
|
||||||
|
input=message,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
|
_gen_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name=f"{agent_name}-llm",
|
||||||
|
model=settings.LLM_MODEL,
|
||||||
|
prompt=langfuse_prompt,
|
||||||
|
input=messages,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||||
|
if _gen_ctx:
|
||||||
|
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
_gen_ctx.__exit__(None, None, None)
|
||||||
|
|
||||||
messages.append(response)
|
messages.append(response)
|
||||||
|
|
||||||
if not response.tool_calls:
|
if not response.tool_calls:
|
||||||
@@ -605,6 +652,8 @@ async def _run_single_agent(
|
|||||||
tool_calls_count,
|
tool_calls_count,
|
||||||
len(final_text),
|
len(final_text),
|
||||||
)
|
)
|
||||||
|
if _span:
|
||||||
|
_span.update(output=final_text)
|
||||||
return final_text
|
return final_text
|
||||||
|
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
@@ -644,9 +693,15 @@ async def _run_single_agent(
|
|||||||
tool_calls_count,
|
tool_calls_count,
|
||||||
len(final_text),
|
len(final_text),
|
||||||
)
|
)
|
||||||
|
if _span:
|
||||||
|
_span.update(output=final_text)
|
||||||
return final_text
|
return final_text
|
||||||
finally:
|
finally:
|
||||||
clear_tool_result_collector()
|
clear_tool_result_collector()
|
||||||
|
if _span_ctx:
|
||||||
|
_span_ctx.__exit__(None, None, None)
|
||||||
|
if lf:
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
|
||||||
async def _run_single_agent_stream(
|
async def _run_single_agent_stream(
|
||||||
@@ -656,8 +711,11 @@ async def _run_single_agent_stream(
|
|||||||
message: str,
|
message: str,
|
||||||
context: dict[str, Any],
|
context: dict[str, Any],
|
||||||
max_steps: int = 6,
|
max_steps: int = 6,
|
||||||
|
langfuse_prompt: Any = None,
|
||||||
|
agent_name: str = "agent",
|
||||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||||
trace_id = _trace_id_from_context(context)
|
trace_id = _trace_id_from_context(context)
|
||||||
|
lf = get_langfuse()
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
tools = _all_tools_for_user(user_id, trace_id)
|
tools = _all_tools_for_user(user_id, trace_id)
|
||||||
model_context = _context_for_model(context)
|
model_context = _context_for_model(context)
|
||||||
@@ -677,9 +735,38 @@ async def _run_single_agent_stream(
|
|||||||
streamed_chars = 0
|
streamed_chars = 0
|
||||||
collected: list[dict[str, Any]] = []
|
collected: list[dict[str, Any]] = []
|
||||||
set_tool_result_collector(collected)
|
set_tool_result_collector(collected)
|
||||||
|
|
||||||
|
_span_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="span",
|
||||||
|
name=f"{agent_name}-stream",
|
||||||
|
user_id=user_id,
|
||||||
|
session_id=trace_id,
|
||||||
|
input=message,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
|
streamed_text: list[str] = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
|
_gen_ctx = (
|
||||||
|
lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name=f"{agent_name}-llm",
|
||||||
|
model=settings.LLM_MODEL,
|
||||||
|
prompt=langfuse_prompt,
|
||||||
|
input=messages,
|
||||||
|
)
|
||||||
|
if lf else None
|
||||||
|
)
|
||||||
|
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||||
|
if _gen_ctx:
|
||||||
|
_gen.update(output=_as_text(response.content), usage=extract_usage(response))
|
||||||
|
_gen_ctx.__exit__(None, None, None)
|
||||||
|
|
||||||
messages.append(response)
|
messages.append(response)
|
||||||
|
|
||||||
if not response.tool_calls:
|
if not response.tool_calls:
|
||||||
@@ -688,6 +775,7 @@ async def _run_single_agent_stream(
|
|||||||
token = _as_text(getattr(chunk, "content", ""))
|
token = _as_text(getattr(chunk, "content", ""))
|
||||||
if token:
|
if token:
|
||||||
streamed_chars += len(token)
|
streamed_chars += len(token)
|
||||||
|
streamed_text.append(token)
|
||||||
emitted_any = True
|
emitted_any = True
|
||||||
yield "token", token
|
yield "token", token
|
||||||
|
|
||||||
@@ -696,6 +784,7 @@ async def _run_single_agent_stream(
|
|||||||
fallback_text = _as_text(response.content)
|
fallback_text = _as_text(response.content)
|
||||||
if fallback_text:
|
if fallback_text:
|
||||||
streamed_chars += len(fallback_text)
|
streamed_chars += len(fallback_text)
|
||||||
|
streamed_text.append(fallback_text)
|
||||||
yield "token", fallback_text
|
yield "token", fallback_text
|
||||||
logger.info(
|
logger.info(
|
||||||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||||||
@@ -704,6 +793,8 @@ async def _run_single_agent_stream(
|
|||||||
tool_calls_count,
|
tool_calls_count,
|
||||||
streamed_chars,
|
streamed_chars,
|
||||||
)
|
)
|
||||||
|
if _span:
|
||||||
|
_span.update(output="".join(streamed_text))
|
||||||
return
|
return
|
||||||
|
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
@@ -738,6 +829,7 @@ async def _run_single_agent_stream(
|
|||||||
token = _as_text(getattr(chunk, "content", ""))
|
token = _as_text(getattr(chunk, "content", ""))
|
||||||
if token:
|
if token:
|
||||||
streamed_chars += len(token)
|
streamed_chars += len(token)
|
||||||
|
streamed_text.append(token)
|
||||||
yield "token", token
|
yield "token", token
|
||||||
logger.info(
|
logger.info(
|
||||||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||||||
@@ -746,17 +838,28 @@ async def _run_single_agent_stream(
|
|||||||
tool_calls_count,
|
tool_calls_count,
|
||||||
streamed_chars,
|
streamed_chars,
|
||||||
)
|
)
|
||||||
|
if _span:
|
||||||
|
_span.update(output="".join(streamed_text))
|
||||||
finally:
|
finally:
|
||||||
clear_tool_result_collector()
|
clear_tool_result_collector()
|
||||||
|
if _span_ctx:
|
||||||
|
_span_ctx.__exit__(None, None, None)
|
||||||
|
if lf:
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
|
||||||
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
||||||
prepared_context = await _prepare_context(message, context)
|
prepared_context = await _prepare_context(message, context)
|
||||||
|
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||||
|
"home_system", _HOME_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
response = await _run_single_agent(
|
response = await _run_single_agent(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
|
system_prompt=system_prompt,
|
||||||
message=message,
|
message=message,
|
||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
|
agent_name="home-agent",
|
||||||
)
|
)
|
||||||
return _normalize_tagged_list_lines(response, message)
|
return _normalize_tagged_list_lines(response, message)
|
||||||
|
|
||||||
@@ -764,11 +867,16 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
|||||||
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
|
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
|
||||||
prepared_context = await _prepare_context(message, context)
|
prepared_context = await _prepare_context(message, context)
|
||||||
domain = await _infer_floating_domain(message, prepared_context)
|
domain = await _infer_floating_domain(message, prepared_context)
|
||||||
|
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||||
|
"floating_system", _FLOATING_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
response = await _run_single_agent(
|
response = await _run_single_agent(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
|
system_prompt=system_prompt,
|
||||||
message=message,
|
message=message,
|
||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
|
agent_name="floating-agent",
|
||||||
)
|
)
|
||||||
sanitized = _strip_floating_markup(response)
|
sanitized = _strip_floating_markup(response)
|
||||||
if not sanitized and response:
|
if not sanitized and response:
|
||||||
@@ -782,12 +890,17 @@ async def run_home_stream(
|
|||||||
context: dict[str, Any],
|
context: dict[str, Any],
|
||||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||||
prepared_context = await _prepare_context(message, context)
|
prepared_context = await _prepare_context(message, context)
|
||||||
|
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||||
|
"home_system", _HOME_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
text_chunks: list[str] = []
|
text_chunks: list[str] = []
|
||||||
async for event in _run_single_agent_stream(
|
async for event in _run_single_agent_stream(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
system_prompt=_HOME_SINGLE_AGENT_SYSTEM,
|
system_prompt=system_prompt,
|
||||||
message=message,
|
message=message,
|
||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
|
agent_name="home-agent",
|
||||||
):
|
):
|
||||||
event_type, data = event
|
event_type, data = event
|
||||||
if event_type != "token":
|
if event_type != "token":
|
||||||
@@ -809,14 +922,19 @@ async def run_floating_stream(
|
|||||||
domain = await _infer_floating_domain(message, prepared_context)
|
domain = await _infer_floating_domain(message, prepared_context)
|
||||||
yield "floating_domain", domain
|
yield "floating_domain", domain
|
||||||
|
|
||||||
|
system_prompt, langfuse_prompt = get_prompt_or_fallback(
|
||||||
|
"floating_system", _FLOATING_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
sanitizer = _FloatingStreamSanitizer()
|
sanitizer = _FloatingStreamSanitizer()
|
||||||
emitted_sanitized = False
|
emitted_sanitized = False
|
||||||
raw_chunks: list[str] = []
|
raw_chunks: list[str] = []
|
||||||
async for event in _run_single_agent_stream(
|
async for event in _run_single_agent_stream(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM,
|
system_prompt=system_prompt,
|
||||||
message=message,
|
message=message,
|
||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
|
langfuse_prompt=langfuse_prompt,
|
||||||
|
agent_name="floating-agent",
|
||||||
):
|
):
|
||||||
event_type, data = event
|
event_type, data = event
|
||||||
if event_type != "token":
|
if event_type != "token":
|
||||||
|
|||||||
114
app/core/langfuse_client.py
Normal file
114
app/core/langfuse_client.py
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
"""Langfuse observability — singleton client and prompt helpers.
|
||||||
|
|
||||||
|
If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set,
|
||||||
|
all helpers are no-ops so the app works without Langfuse configured.
|
||||||
|
|
||||||
|
Usage
|
||||||
|
-----
|
||||||
|
Tracing::
|
||||||
|
|
||||||
|
from app.core.langfuse_client import get_langfuse
|
||||||
|
|
||||||
|
lf = get_langfuse()
|
||||||
|
if lf:
|
||||||
|
with lf.start_as_current_observation(as_type="span", name="my-agent") as span:
|
||||||
|
span.update(input=user_message)
|
||||||
|
# ... do work ...
|
||||||
|
span.update(output=result)
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
Prompt management::
|
||||||
|
|
||||||
|
from app.core.langfuse_client import get_prompt_or_fallback
|
||||||
|
|
||||||
|
text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT)
|
||||||
|
# Use text as the system prompt; pass prompt_obj to generations for linking.
|
||||||
|
|
||||||
|
Linking a prompt to a generation::
|
||||||
|
|
||||||
|
with lf.start_as_current_observation(
|
||||||
|
as_type="generation",
|
||||||
|
name="llm-call",
|
||||||
|
model="gpt-4o",
|
||||||
|
prompt=prompt_obj, # links generation → prompt version in the UI
|
||||||
|
input=messages,
|
||||||
|
) as gen:
|
||||||
|
response = await llm.ainvoke(messages)
|
||||||
|
gen.update(output=response.content, usage=_usage(response))
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_client: Any = None
|
||||||
|
_initialized: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
def get_langfuse() -> Any | None:
|
||||||
|
"""Return the Langfuse singleton, or ``None`` when not configured."""
|
||||||
|
global _client, _initialized
|
||||||
|
if _initialized:
|
||||||
|
return _client
|
||||||
|
_initialized = True
|
||||||
|
|
||||||
|
from app.config.settings import settings # local import to avoid circular deps
|
||||||
|
|
||||||
|
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
|
||||||
|
logger.debug("langfuse: not configured — observability disabled")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from langfuse import Langfuse
|
||||||
|
|
||||||
|
_client = Langfuse(
|
||||||
|
secret_key=settings.LANGFUSE_SECRET_KEY,
|
||||||
|
public_key=settings.LANGFUSE_PUBLIC_KEY,
|
||||||
|
host=settings.LANGFUSE_HOST,
|
||||||
|
)
|
||||||
|
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("langfuse: failed to initialize: %s", exc)
|
||||||
|
_client = None
|
||||||
|
|
||||||
|
return _client
|
||||||
|
|
||||||
|
|
||||||
|
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
|
||||||
|
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
|
||||||
|
|
||||||
|
Returns ``(prompt_text, prompt_obj_or_None)``.
|
||||||
|
|
||||||
|
* ``prompt_text`` — the raw template string (variables not yet substituted).
|
||||||
|
Callers perform variable substitution with Python's ``.format()``.
|
||||||
|
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
|
||||||
|
unavailable / the fetch failed. Pass this to generation observations so
|
||||||
|
Langfuse links the generation to the exact prompt version in the UI.
|
||||||
|
"""
|
||||||
|
lf = get_langfuse()
|
||||||
|
if lf is None:
|
||||||
|
return fallback, None
|
||||||
|
|
||||||
|
try:
|
||||||
|
prompt = lf.get_prompt(name, label="production", fallback=fallback)
|
||||||
|
# For text-type prompts .prompt holds the raw template string.
|
||||||
|
raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback
|
||||||
|
return raw, prompt
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc)
|
||||||
|
return fallback, None
|
||||||
|
|
||||||
|
|
||||||
|
def extract_usage(response: Any) -> dict[str, int]:
|
||||||
|
"""Extract token usage from a LangChain AI message into Langfuse format."""
|
||||||
|
meta = getattr(response, "usage_metadata", None)
|
||||||
|
if not meta:
|
||||||
|
return {}
|
||||||
|
return {
|
||||||
|
"input": int(meta.get("input_tokens", 0)),
|
||||||
|
"output": int(meta.get("output_tokens", 0)),
|
||||||
|
"total": int(meta.get("total_tokens", 0)),
|
||||||
|
}
|
||||||
104
app/core/preprocessors/__init__.py
Normal file
104
app/core/preprocessors/__init__.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
"""Preprocessor registry: detect content type and dispatch to handlers.
|
||||||
|
|
||||||
|
Public API
|
||||||
|
----------
|
||||||
|
detect_content_type(filename, raw_content) -> str
|
||||||
|
Heuristic detection based on file extension and content patterns.
|
||||||
|
|
||||||
|
preprocess(content_type, raw_content) -> PreprocessResult
|
||||||
|
Dispatch to the appropriate handler.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
from app.core.preprocessors.base import PreprocessResult
|
||||||
|
|
||||||
|
# ── Heuristics ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Patterns that strongly suggest an email HTML file
|
||||||
|
_EMAIL_SIGNALS = re.compile(
|
||||||
|
r"(Subject:|From:|To:|Date:|Sent:|MIME-Version:|Content-Type:\s*text/html)",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Patterns that suggest a generic HTML page (not an email)
|
||||||
|
_GENERIC_HTML_SIGNALS = re.compile(
|
||||||
|
r"<(nav|main|header|footer|article|section)\b",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_content_type(filename: str, raw_content: str) -> str:
|
||||||
|
"""Return a content-type string for the given file.
|
||||||
|
|
||||||
|
Supported types: ``"email_html"``, ``"generic_html"``,
|
||||||
|
``"plain_text"``, ``"unknown"``.
|
||||||
|
"""
|
||||||
|
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
||||||
|
|
||||||
|
if ext == "txt":
|
||||||
|
return "plain_text"
|
||||||
|
|
||||||
|
if ext in ("html", "htm", "eml", "mhtml", "mht"):
|
||||||
|
# Prefer email detection over generic HTML
|
||||||
|
if _EMAIL_SIGNALS.search(raw_content[:4096]):
|
||||||
|
return "email_html"
|
||||||
|
if _GENERIC_HTML_SIGNALS.search(raw_content[:4096]) or "<html" in raw_content[:200].lower():
|
||||||
|
return "generic_html"
|
||||||
|
# .html without clear signals — check for any email header
|
||||||
|
if re.search(r"^(From|To|Subject|Date):", raw_content[:2048], re.MULTILINE | re.IGNORECASE):
|
||||||
|
return "email_html"
|
||||||
|
return "generic_html"
|
||||||
|
|
||||||
|
# Plain text files with email headers
|
||||||
|
if ext in ("", "txt") or not ext:
|
||||||
|
if _EMAIL_SIGNALS.search(raw_content[:4096]):
|
||||||
|
return "email_html"
|
||||||
|
|
||||||
|
# Detect binary content
|
||||||
|
try:
|
||||||
|
raw_content.encode("utf-8")
|
||||||
|
except (UnicodeEncodeError, AttributeError):
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
# Non-text bytes heuristic: high ratio of non-printable chars
|
||||||
|
sample = raw_content[:512]
|
||||||
|
non_printable = sum(1 for c in sample if ord(c) < 32 and c not in "\r\n\t")
|
||||||
|
if len(sample) > 0 and non_printable / len(sample) > 0.1:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Generic fallback handler ──────────────────────────────────────────
|
||||||
|
|
||||||
|
def _preprocess_generic(raw_content: str, content_type: str) -> PreprocessResult:
|
||||||
|
"""Strip HTML tags if present, return text as-is."""
|
||||||
|
try:
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
text = BeautifulSoup(raw_content, "html.parser").get_text(separator="\n")
|
||||||
|
except ImportError:
|
||||||
|
# No BeautifulSoup — strip tags with a simple regex
|
||||||
|
text = re.sub(r"<[^>]+>", "", raw_content)
|
||||||
|
|
||||||
|
text = re.sub(r"\n{3,}", "\n\n", text).strip()
|
||||||
|
return PreprocessResult(content_type=content_type, clean_text=text, metadata={})
|
||||||
|
|
||||||
|
|
||||||
|
# ── Dispatch ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def preprocess(content_type: str, raw_content: str) -> PreprocessResult:
|
||||||
|
"""Dispatch *raw_content* to the handler registered for *content_type*.
|
||||||
|
|
||||||
|
Falls back to the generic handler for unknown types.
|
||||||
|
"""
|
||||||
|
if content_type == "email_html":
|
||||||
|
from app.core.preprocessors.email_html import preprocess_email_html
|
||||||
|
return preprocess_email_html(raw_content)
|
||||||
|
|
||||||
|
return _preprocess_generic(raw_content, content_type)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["detect_content_type", "preprocess", "PreprocessResult"]
|
||||||
25
app/core/preprocessors/base.py
Normal file
25
app/core/preprocessors/base.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
"""Base types for the preprocessor system."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PreprocessResult:
|
||||||
|
"""Output of a preprocessor handler.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
content_type:
|
||||||
|
The detected content type (e.g. ``"email_html"``, ``"plain_text"``).
|
||||||
|
clean_text:
|
||||||
|
Human-readable text stripped of markup/binary noise.
|
||||||
|
metadata:
|
||||||
|
Dict of extracted metadata (keys vary by handler).
|
||||||
|
Common keys: ``subject``, ``from``, ``to``, ``date``, ``filename``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
content_type: str
|
||||||
|
clean_text: str
|
||||||
|
metadata: dict = field(default_factory=dict)
|
||||||
111
app/core/preprocessors/email_html.py
Normal file
111
app/core/preprocessors/email_html.py
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
"""Preprocessor for email HTML files.
|
||||||
|
|
||||||
|
Handles:
|
||||||
|
- HTML stripping via BeautifulSoup
|
||||||
|
- Metadata extraction (Subject, From, To, Date)
|
||||||
|
- Thread splitting — isolates the latest reply
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from app.core.preprocessors.base import PreprocessResult
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ── Thread split markers ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Matches patterns like:
|
||||||
|
# "On Mon, Apr 7, 2026 at 10:00 AM, Alice <alice@co.com> wrote:"
|
||||||
|
# "-----Original Message-----"
|
||||||
|
# "> " (plain-text quote prefix)
|
||||||
|
_THREAD_PATTERNS = [
|
||||||
|
re.compile(r"^On\s+.+wrote\s*:", re.IGNORECASE | re.MULTILINE),
|
||||||
|
re.compile(r"^-{3,}\s*(original message|forwarded message)\s*-{3,}", re.IGNORECASE | re.MULTILINE),
|
||||||
|
re.compile(r"^>{1,}\s+\S", re.MULTILINE),
|
||||||
|
re.compile(r"^From:\s+.+\nSent:\s+", re.IGNORECASE | re.MULTILINE),
|
||||||
|
]
|
||||||
|
|
||||||
|
# ── Metadata patterns (applied on raw HTML / plain fallback) ──────────
|
||||||
|
|
||||||
|
_META_PATTERNS: dict[str, list[re.Pattern]] = {
|
||||||
|
"subject": [
|
||||||
|
re.compile(r"<title>(.+?)</title>", re.IGNORECASE | re.DOTALL),
|
||||||
|
re.compile(r"Subject:\s*(.+)", re.IGNORECASE),
|
||||||
|
],
|
||||||
|
"from": [
|
||||||
|
re.compile(r'<meta[^>]+name=["\']?from["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||||
|
re.compile(r"From:\s*(.+)", re.IGNORECASE),
|
||||||
|
],
|
||||||
|
"to": [
|
||||||
|
re.compile(r'<meta[^>]+name=["\']?to["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||||
|
re.compile(r"To:\s*(.+)", re.IGNORECASE),
|
||||||
|
],
|
||||||
|
"date": [
|
||||||
|
re.compile(r'<meta[^>]+name=["\']?date["\']?[^>]+content=["\']([^"\']+)["\']', re.IGNORECASE),
|
||||||
|
re.compile(r"Date:\s*(.+)", re.IGNORECASE),
|
||||||
|
re.compile(r"Sent:\s*(.+)", re.IGNORECASE),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_metadata(raw_html: str, text: str) -> dict:
|
||||||
|
"""Extract Subject/From/To/Date from raw HTML or plain text."""
|
||||||
|
metadata: dict[str, str] = {}
|
||||||
|
for field, patterns in _META_PATTERNS.items():
|
||||||
|
for pat in patterns:
|
||||||
|
m = pat.search(raw_html) or pat.search(text)
|
||||||
|
if m:
|
||||||
|
metadata[field] = m.group(1).strip()
|
||||||
|
break
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
|
def _split_thread(text: str) -> str:
|
||||||
|
"""Return only the latest message in a threaded email."""
|
||||||
|
earliest_pos: int | None = None
|
||||||
|
for pat in _THREAD_PATTERNS:
|
||||||
|
m = pat.search(text)
|
||||||
|
if m and (earliest_pos is None or m.start() < earliest_pos):
|
||||||
|
earliest_pos = m.start()
|
||||||
|
|
||||||
|
if earliest_pos is not None and earliest_pos > 0:
|
||||||
|
return text[:earliest_pos].strip()
|
||||||
|
return text.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def preprocess_email_html(raw_content: str) -> PreprocessResult:
|
||||||
|
"""Strip HTML, extract metadata, split thread from an email HTML file."""
|
||||||
|
try:
|
||||||
|
from bs4 import BeautifulSoup # lazy import — optional dep
|
||||||
|
except ImportError as exc:
|
||||||
|
raise ImportError(
|
||||||
|
"beautifulsoup4 is required for email_html preprocessing. "
|
||||||
|
"Install it with: pip install beautifulsoup4"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
# Parse with lxml if available, fall back to html.parser
|
||||||
|
try:
|
||||||
|
soup = BeautifulSoup(raw_content, "lxml")
|
||||||
|
except Exception:
|
||||||
|
soup = BeautifulSoup(raw_content, "html.parser")
|
||||||
|
|
||||||
|
# Remove noise tags
|
||||||
|
for tag in soup(["style", "script", "head", "noscript"]):
|
||||||
|
tag.decompose()
|
||||||
|
|
||||||
|
clean_text = soup.get_text(separator="\n")
|
||||||
|
# Collapse excessive blank lines
|
||||||
|
clean_text = re.sub(r"\n{3,}", "\n\n", clean_text).strip()
|
||||||
|
|
||||||
|
metadata = _extract_metadata(raw_content, clean_text)
|
||||||
|
latest_message = _split_thread(clean_text)
|
||||||
|
|
||||||
|
return PreprocessResult(
|
||||||
|
content_type="email_html",
|
||||||
|
clean_text=latest_message,
|
||||||
|
metadata=metadata,
|
||||||
|
)
|
||||||
@@ -32,4 +32,8 @@ google-auth-oauthlib>=1.2.0
|
|||||||
google-auth-httplib2>=0.2.0
|
google-auth-httplib2>=0.2.0
|
||||||
msal>=1.28.0
|
msal>=1.28.0
|
||||||
cryptography>=42.0.0
|
cryptography>=42.0.0
|
||||||
|
langfuse>=2.0.0
|
||||||
|
beautifulsoup4>=4.12.0
|
||||||
|
lxml>=5.0.0
|
||||||
|
PyYAML>=6.0.0
|
||||||
ruff>=0.8.0
|
ruff>=0.8.0
|
||||||
|
|||||||
121
tests/fixtures/preprocessors/cases.yaml
vendored
Normal file
121
tests/fixtures/preprocessors/cases.yaml
vendored
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
# Preprocessor test cases — Step 1 (Local Agent V2)
|
||||||
|
#
|
||||||
|
# Schema per caso:
|
||||||
|
# id: "1.N"
|
||||||
|
# description: str
|
||||||
|
# score_name: str # nome score inviato a Langfuse
|
||||||
|
#
|
||||||
|
# Sorgente contenuto (una delle due):
|
||||||
|
# file: <nome file in data/> # letto come testo UTF-8
|
||||||
|
# generate: binary_noise # contenuto generato dal runner (per test binari)
|
||||||
|
#
|
||||||
|
# Per op=detect:
|
||||||
|
# op: detect
|
||||||
|
# input_filename: str # filename passato a detect_content_type
|
||||||
|
# expected_content_type: str
|
||||||
|
#
|
||||||
|
# Per op=preprocess:
|
||||||
|
# op: preprocess
|
||||||
|
# input_content_type: str # content_type passato a preprocess()
|
||||||
|
# assertions:
|
||||||
|
# no_html_tags: bool
|
||||||
|
# min_length: int
|
||||||
|
# compression_ratio_lt: float # len(clean) / len(raw) < soglia
|
||||||
|
# metadata_keys: [str, ...] # chiavi che devono essere in metadata
|
||||||
|
# contains: str | [str, ...] # substring(s) presenti in clean_text
|
||||||
|
# not_contains: str | [str, ...] # substring(s) assenti da clean_text
|
||||||
|
# content_type: str # valore atteso di result.content_type
|
||||||
|
|
||||||
|
cases:
|
||||||
|
|
||||||
|
# ── Detection tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
- id: "1.1"
|
||||||
|
description: "Detect email HTML"
|
||||||
|
score_name: preprocess.detect_email
|
||||||
|
file: email_action.html
|
||||||
|
op: detect
|
||||||
|
input_filename: email_export.html
|
||||||
|
expected_content_type: email_html
|
||||||
|
|
||||||
|
- id: "1.2"
|
||||||
|
description: "Detect generic HTML"
|
||||||
|
score_name: preprocess.detect_generic
|
||||||
|
file: generic_page.html
|
||||||
|
op: detect
|
||||||
|
input_filename: index.html
|
||||||
|
expected_content_type: generic_html
|
||||||
|
|
||||||
|
- id: "1.3"
|
||||||
|
description: "Detect plain text"
|
||||||
|
score_name: preprocess.detect_text
|
||||||
|
file: notes.txt
|
||||||
|
op: detect
|
||||||
|
input_filename: notes.txt
|
||||||
|
expected_content_type: plain_text
|
||||||
|
|
||||||
|
- id: "1.4"
|
||||||
|
description: "Detect unknown (binary-like content)"
|
||||||
|
score_name: preprocess.detect_unknown
|
||||||
|
generate: binary_noise
|
||||||
|
op: detect
|
||||||
|
input_filename: archive.xyz
|
||||||
|
expected_content_type: unknown
|
||||||
|
|
||||||
|
# ── Preprocess tests ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
- id: "1.5"
|
||||||
|
description: "Email: strip HTML tags"
|
||||||
|
file: email_action.html
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: email_html
|
||||||
|
assertions:
|
||||||
|
no_html_tags: true
|
||||||
|
min_length: 50
|
||||||
|
compression_ratio_lt: 0.8
|
||||||
|
|
||||||
|
- id: "1.6"
|
||||||
|
description: "Email: extract metadata (Subject + From)"
|
||||||
|
file: email_action.html
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: email_html
|
||||||
|
assertions:
|
||||||
|
metadata_keys: [subject, from]
|
||||||
|
|
||||||
|
- id: "1.7"
|
||||||
|
description: "Email: split thread — solo ultimo messaggio"
|
||||||
|
file: email_thread.html
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: email_html
|
||||||
|
assertions:
|
||||||
|
contains: "Sure, I'll handle the deploy"
|
||||||
|
not_contains: "Let's plan the deploy"
|
||||||
|
|
||||||
|
- id: "1.8"
|
||||||
|
description: "Email: singolo messaggio senza thread"
|
||||||
|
file: email_single.html
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: email_html
|
||||||
|
assertions:
|
||||||
|
contains: "deploy is done"
|
||||||
|
|
||||||
|
- id: "1.9"
|
||||||
|
description: "Email: HTML pesante con table layout"
|
||||||
|
file: email_heavy.html
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: email_html
|
||||||
|
assertions:
|
||||||
|
no_html_tags: true
|
||||||
|
min_length: 30
|
||||||
|
not_contains:
|
||||||
|
- "border-collapse"
|
||||||
|
- "font-size"
|
||||||
|
|
||||||
|
- id: "1.10"
|
||||||
|
description: "Fallback: file sconosciuto → testo restituito"
|
||||||
|
file: fallback.txt
|
||||||
|
op: preprocess
|
||||||
|
input_content_type: unknown
|
||||||
|
assertions:
|
||||||
|
min_length: 1
|
||||||
|
content_type: unknown
|
||||||
25
tests/fixtures/preprocessors/data/email_action.html
vendored
Normal file
25
tests/fixtures/preprocessors/data/email_action.html
vendored
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>Fix the login bug</title>
|
||||||
|
<style>
|
||||||
|
body { font-family: Arial, sans-serif; color: #333; margin: 0; padding: 20px; }
|
||||||
|
.header { background: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }
|
||||||
|
.body { padding: 20px; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="header">
|
||||||
|
<p><strong>From:</strong> boss@company.com</p>
|
||||||
|
<p><strong>To:</strong> dev@company.com</p>
|
||||||
|
<p><strong>Subject:</strong> Fix the login bug</p>
|
||||||
|
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:00:00 +0200</p>
|
||||||
|
</div>
|
||||||
|
<div class="body">
|
||||||
|
<p>Hi,</p>
|
||||||
|
<p>Please fix the login bug by Friday. It is blocking the release.</p>
|
||||||
|
<p>Priority: high. Let me know if you need anything.</p>
|
||||||
|
<p>Thanks,<br>Boss</p>
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
49
tests/fixtures/preprocessors/data/email_heavy.html
vendored
Normal file
49
tests/fixtures/preprocessors/data/email_heavy.html
vendored
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<style>
|
||||||
|
table { border-collapse: collapse; width: 100%; max-width: 600px; margin: 0 auto; }
|
||||||
|
td { padding: 8px 12px; border: 1px solid #dddddd; font-size: 12px; color: #444444; }
|
||||||
|
.header-row { background-color: #003366; color: #ffffff; font-weight: bold; }
|
||||||
|
.label-col { background-color: #f0f0f0; width: 80px; font-weight: bold; }
|
||||||
|
.footer-row { font-size: 10px; color: #999999; text-align: center; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body bgcolor="#eeeeee">
|
||||||
|
<center>
|
||||||
|
<table cellpadding="0" cellspacing="0">
|
||||||
|
<tr class="header-row">
|
||||||
|
<td colspan="2">Company Internal Update</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td class="label-col">From:</td>
|
||||||
|
<td>newsletter@corp.com</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td class="label-col">Subject:</td>
|
||||||
|
<td>Q1 Results Update</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td class="label-col">Date:</td>
|
||||||
|
<td>Apr 7, 2026</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td colspan="2">
|
||||||
|
<table width="100%" cellpadding="10">
|
||||||
|
<tr>
|
||||||
|
<td>
|
||||||
|
<p style="font-size:14px; font-weight:bold;">Dear Team,</p>
|
||||||
|
<p>Q1 results are in. Revenue up 15% year-over-year.</p>
|
||||||
|
<p>Please review the attached report and share any feedback by EOW.</p>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
<tr class="footer-row">
|
||||||
|
<td colspan="2">Confidential — do not forward outside the company.</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
</center>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
8
tests/fixtures/preprocessors/data/email_single.html
vendored
Normal file
8
tests/fixtures/preprocessors/data/email_single.html
vendored
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html><body>
|
||||||
|
<p><strong>From:</strong> alice@co.com</p>
|
||||||
|
<p><strong>To:</strong> team@co.com</p>
|
||||||
|
<p><strong>Subject:</strong> Quick update</p>
|
||||||
|
<p><strong>Date:</strong> Tue, 7 Apr 2026 10:30:00 +0200</p>
|
||||||
|
<p>The deploy is done. Everything looks good. No issues so far.</p>
|
||||||
|
</body></html>
|
||||||
24
tests/fixtures/preprocessors/data/email_thread.html
vendored
Normal file
24
tests/fixtures/preprocessors/data/email_thread.html
vendored
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html><body>
|
||||||
|
<div class="message-latest">
|
||||||
|
<p><strong>From:</strong> alice@co.com</p>
|
||||||
|
<p><strong>Subject:</strong> Re: Re: Deploy plan</p>
|
||||||
|
<p>Sure, I'll handle the deploy.</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<p>On Mon, Apr 6, 2026 at 3:00 PM, Bob <bob@co.com> wrote:</p>
|
||||||
|
<blockquote>
|
||||||
|
<p>From: bob@co.com</p>
|
||||||
|
<p>Can you handle the deploy?</p>
|
||||||
|
<p>On Sun, Apr 5, 2026 at 1:00 PM, Alice <alice@co.com> wrote:</p>
|
||||||
|
<blockquote>
|
||||||
|
<p>From: alice@co.com</p>
|
||||||
|
<p>Let's plan the deploy for Monday.</p>
|
||||||
|
<p>On Sat, Apr 4, 2026 at 11:00 AM, Charlie <charlie@co.com> wrote:</p>
|
||||||
|
<blockquote>
|
||||||
|
<p>From: charlie@co.com</p>
|
||||||
|
<p>We need to schedule the deploy. What day works?</p>
|
||||||
|
</blockquote>
|
||||||
|
</blockquote>
|
||||||
|
</blockquote>
|
||||||
|
</body></html>
|
||||||
3
tests/fixtures/preprocessors/data/fallback.txt
vendored
Normal file
3
tests/fixtures/preprocessors/data/fallback.txt
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
random text content without any structure
|
||||||
|
line two with some words
|
||||||
|
line three and more content here
|
||||||
35
tests/fixtures/preprocessors/data/generic_page.html
vendored
Normal file
35
tests/fixtures/preprocessors/data/generic_page.html
vendored
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<title>My Web App</title>
|
||||||
|
<link rel="stylesheet" href="styles.css">
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<nav>
|
||||||
|
<a href="/">Home</a>
|
||||||
|
<a href="/about">About</a>
|
||||||
|
<a href="/contact">Contact</a>
|
||||||
|
</nav>
|
||||||
|
<main>
|
||||||
|
<header>
|
||||||
|
<h1>Welcome to My App</h1>
|
||||||
|
</header>
|
||||||
|
<article>
|
||||||
|
<p>This is a generic web page with no email headers.</p>
|
||||||
|
<p>It has navigation, main content, and a footer.</p>
|
||||||
|
</article>
|
||||||
|
<section>
|
||||||
|
<h2>Features</h2>
|
||||||
|
<ul>
|
||||||
|
<li>Fast</li>
|
||||||
|
<li>Reliable</li>
|
||||||
|
<li>Secure</li>
|
||||||
|
</ul>
|
||||||
|
</section>
|
||||||
|
</main>
|
||||||
|
<footer>
|
||||||
|
<p>© 2026 My App</p>
|
||||||
|
</footer>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
15
tests/fixtures/preprocessors/data/notes.txt
vendored
Normal file
15
tests/fixtures/preprocessors/data/notes.txt
vendored
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
Meeting notes - April 7, 2026
|
||||||
|
|
||||||
|
Attendees: Alice, Bob, Charlie
|
||||||
|
|
||||||
|
Discussion points:
|
||||||
|
- Deploy scheduled for Friday
|
||||||
|
- Bug fix for login must be completed by Thursday
|
||||||
|
- Review Q1 numbers before EOW
|
||||||
|
|
||||||
|
Action items:
|
||||||
|
- Alice: fix login bug
|
||||||
|
- Bob: prepare deploy checklist
|
||||||
|
- Charlie: send Q1 report
|
||||||
|
|
||||||
|
Next meeting: April 14, 2026
|
||||||
171
tests/test_preprocessors.py
Normal file
171
tests/test_preprocessors.py
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
"""Tests for the preprocessor system (Step 1 — Local Agent V2).
|
||||||
|
|
||||||
|
Fixtures are driven by:
|
||||||
|
tests/fixtures/preprocessors/cases.yaml — test case definitions
|
||||||
|
tests/fixtures/preprocessors/data/ — input files (HTML, txt, ...)
|
||||||
|
|
||||||
|
Run:
|
||||||
|
pytest tests/test_preprocessors.py -v
|
||||||
|
|
||||||
|
# Only detection tests
|
||||||
|
pytest tests/test_preprocessors.py -v -k detect
|
||||||
|
|
||||||
|
# Only preprocess tests
|
||||||
|
pytest tests/test_preprocessors.py -v -k preprocess
|
||||||
|
|
||||||
|
Langfuse scores are sent when LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are set.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from app.core.langfuse_client import get_langfuse
|
||||||
|
from app.core.preprocessors import detect_content_type, preprocess
|
||||||
|
|
||||||
|
# ── Paths ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "preprocessors"
|
||||||
|
_DATA_DIR = _FIXTURES_DIR / "data"
|
||||||
|
_CASES_FILE = _FIXTURES_DIR / "cases.yaml"
|
||||||
|
|
||||||
|
# ── Content generators ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_GENERATORS: dict[str, str] = {
|
||||||
|
# High ratio of non-printable chars → triggers "unknown" heuristic
|
||||||
|
"binary_noise": "some\x00\x01\x02\x03\x04\x05content" * 20,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _load_cases() -> list[dict]:
|
||||||
|
with _CASES_FILE.open(encoding="utf-8") as f:
|
||||||
|
return yaml.safe_load(f)["cases"]
|
||||||
|
|
||||||
|
|
||||||
|
def _read_content(case: dict) -> str:
|
||||||
|
if "generate" in case:
|
||||||
|
key = case["generate"]
|
||||||
|
if key not in _GENERATORS:
|
||||||
|
raise ValueError(f"Unknown generator '{key}' in case {case['id']}")
|
||||||
|
return _GENERATORS[key]
|
||||||
|
file_path = _DATA_DIR / case["file"]
|
||||||
|
return file_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Langfuse helper ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _lf_score(score_name: str, value: float, comment: str = "") -> None:
|
||||||
|
lf = get_langfuse()
|
||||||
|
if lf:
|
||||||
|
trace = lf.trace(name=f"eval-{score_name}")
|
||||||
|
lf.score(
|
||||||
|
trace_id=trace.id,
|
||||||
|
name=score_name,
|
||||||
|
value=value,
|
||||||
|
data_type="NUMERIC",
|
||||||
|
comment=comment,
|
||||||
|
)
|
||||||
|
lf.flush()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Assertion engine ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _run_assertions(assertions: dict[str, Any], result: Any, raw: str) -> list[str]:
|
||||||
|
"""Run all assertions declared in the YAML case. Returns failure messages."""
|
||||||
|
failures: list[str] = []
|
||||||
|
|
||||||
|
if assertions.get("no_html_tags"):
|
||||||
|
if re.search(r"<[^>]+>", result.clean_text):
|
||||||
|
failures.append("clean_text still contains HTML tags")
|
||||||
|
|
||||||
|
min_len = assertions.get("min_length")
|
||||||
|
if min_len is not None:
|
||||||
|
if len(result.clean_text) < min_len:
|
||||||
|
failures.append(
|
||||||
|
f"clean_text too short: {len(result.clean_text)} < {min_len}"
|
||||||
|
)
|
||||||
|
|
||||||
|
ratio_lt = assertions.get("compression_ratio_lt")
|
||||||
|
if ratio_lt is not None and len(raw) > 0:
|
||||||
|
ratio = len(result.clean_text) / len(raw)
|
||||||
|
if ratio >= ratio_lt:
|
||||||
|
failures.append(f"compression ratio {ratio:.2f} >= {ratio_lt}")
|
||||||
|
|
||||||
|
meta_keys = assertions.get("metadata_keys", [])
|
||||||
|
for key in meta_keys:
|
||||||
|
if not result.metadata.get(key):
|
||||||
|
failures.append(f"metadata missing key '{key}' (got {result.metadata})")
|
||||||
|
|
||||||
|
contains = assertions.get("contains")
|
||||||
|
if contains:
|
||||||
|
items = [contains] if isinstance(contains, str) else contains
|
||||||
|
for item in items:
|
||||||
|
if item not in result.clean_text:
|
||||||
|
failures.append(f"clean_text missing expected substring: {item!r}")
|
||||||
|
|
||||||
|
not_contains = assertions.get("not_contains")
|
||||||
|
if not_contains:
|
||||||
|
items = [not_contains] if isinstance(not_contains, str) else not_contains
|
||||||
|
for item in items:
|
||||||
|
if item in result.clean_text:
|
||||||
|
failures.append(f"clean_text contains forbidden substring: {item!r}")
|
||||||
|
|
||||||
|
expected_ct = assertions.get("content_type")
|
||||||
|
if expected_ct and result.content_type != expected_ct:
|
||||||
|
failures.append(
|
||||||
|
f"content_type mismatch: expected {expected_ct!r}, got {result.content_type!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return failures
|
||||||
|
|
||||||
|
|
||||||
|
# ── Parametrized: detect ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
_detect_cases = [c for c in _load_cases() if c["op"] == "detect"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"case",
|
||||||
|
_detect_cases,
|
||||||
|
ids=[c["id"] for c in _detect_cases],
|
||||||
|
)
|
||||||
|
def test_detect(case: dict) -> None:
|
||||||
|
raw = _read_content(case)
|
||||||
|
ct = detect_content_type(case["input_filename"], raw)
|
||||||
|
|
||||||
|
expected = case["expected_content_type"]
|
||||||
|
score = 1.0 if ct == expected else 0.0
|
||||||
|
_lf_score(case["score_name"], score, f"got={ct}, expected={expected}")
|
||||||
|
|
||||||
|
assert ct == expected, (
|
||||||
|
f"[{case['id']}] {case['description']}: "
|
||||||
|
f"expected content_type={expected!r}, got {ct!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Parametrized: preprocess ──────────────────────────────────────────
|
||||||
|
|
||||||
|
_preprocess_cases = [c for c in _load_cases() if c["op"] == "preprocess"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"case",
|
||||||
|
_preprocess_cases,
|
||||||
|
ids=[c["id"] for c in _preprocess_cases],
|
||||||
|
)
|
||||||
|
def test_preprocess(case: dict) -> None:
|
||||||
|
raw = _read_content(case)
|
||||||
|
result = preprocess(case["input_content_type"], raw)
|
||||||
|
|
||||||
|
assertions = case.get("assertions", {})
|
||||||
|
failures = _run_assertions(assertions, result, raw)
|
||||||
|
|
||||||
|
assert not failures, (
|
||||||
|
f"[{case['id']}] {case['description']} — {len(failures)} assertion(s) failed:\n"
|
||||||
|
+ "\n".join(f" • {f}" for f in failures)
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user