Files
api/app/api/routes/agent_setup.py
Roberto Musso a85f8fde29 feat(langfuse): propagate user_id and session_id to all traces
- Add hash_user_id() to SHA-256 hash user IDs before sending to Langfuse
- Add langfuse_context() helper wrapping propagate_attributes()
- deep_agent: extract session_id from _debug context, wrap all agent
  runs and classifier with langfuse_context(user_id, session_id)
- agent_runner: add session_id param, pass run_id as session for batch
- agent_setup: wrap journey LLM calls with langfuse_context
- Remove redundant metadata dicts (now handled by propagate_attributes)
2026-04-10 22:44:05 +02:00

514 lines
18 KiB
Python

"""Chatbot Journey — WS-based guided conversation to build an AgentConfig.
The journey is driven entirely through WebSocket frames (no REST endpoints).
The device WS handler dispatches ``journey_start`` and ``journey_message``
frames to the functions exported here.
Journey flow:
1. FE sends ``journey_start`` frame with basic agent info (directory,
data_types, schedule).
2. Server creates an in-memory session, sets up a WS executor so the
setup LLM can use file-system tools, does a first directory scrape,
and sends back a ``journey_reply`` with the first question.
3. FE sends ``journey_message`` frames for each user reply.
4. Server appends the user message, calls the LLM (which may read files
via tools), and sends back a ``journey_reply``.
5. After 3-5 turns the LLM wraps up by emitting an ``AgentConfig`` JSON
block delimited by ``AGENT_CONFIG_START`` / ``AGENT_CONFIG_END``.
6. Server parses and validates the JSON with Pydantic, sends
``journey_reply`` with ``done=True`` and the serialised config.
FE stores it locally.
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agents.filesystem_agent import make_directory_tools
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent
from app.schemas import AgentConfig
logger = logging.getLogger(__name__)
# ── Session TTL ───────────────────────────────────────────────────────────
_SESSION_TTL_SECONDS: int = 1800 # 30 minutes
# Sentinel strings used to delimit the LLM-produced AgentConfig JSON.
_CONFIG_START = "AGENT_CONFIG_START"
_CONFIG_END = "AGENT_CONFIG_END"
# Minimum turns before we consider nudging the LLM to wrap up.
_MIN_TURNS_BEFORE_NUDGE: int = 3
# Hard cap to avoid infinite loops (safety net, not the primary stopping criterion).
_MAX_TURNS: int = 15
# Max tool-calling steps per LLM invocation.
_MAX_TOOL_STEPS: int = 6
# ── In-memory session store ───────────────────────────────────────────────
@dataclass
class JourneySession:
session_id: str
user_id: str
agent_type: str # "local" | "cloud"
directory: str
data_types: list[str]
history: list[dict[str, Any]] = field(default_factory=list)
system_prompt: str = ""
langfuse_prompt: Any = None
created_at: float = field(default_factory=time.monotonic)
def is_expired(self) -> bool:
return (time.monotonic() - self.created_at) > _SESSION_TTL_SECONDS
# session_id → session
_sessions: dict[str, JourneySession] = {}
def get_journey_session(session_id: str, user_id: str) -> JourneySession | None:
"""Retrieve session; return None on missing, expired, or wrong owner."""
s = _sessions.get(session_id)
if s is None or s.is_expired():
_sessions.pop(session_id, None)
return None
if s.user_id != user_id:
return None
return s
# ── System prompt ─────────────────────────────────────────────────────────
_JOURNEY_SYSTEM_PROMPT = """\
You are a friendly assistant helping a freelancer configure a data-extraction agent.
Your job is to understand what files the user has in their directory and produce a
structured AgentConfig JSON that the extraction agent will use as its instruction set.
You have access to file-system tools to explore the user's directory:
- list_directory: see folder structure and file names
- read_file_content: peek at a file's content
- get_file_metadata: check file size, extension, dates
The user's configured directory is: {directory}
Target data types: {data_types}
## Your process
### Step 1 — Explore the directory
Use list_directory and read_file_content to understand what types of files are present
(HTML emails, plain-text documents, CSVs, etc.).
### Step 2 — Identify content types
For each distinct file type found, decide:
- A short id (e.g. "email_html", "plain_text", "csv")
- Which preprocessing handler to use: "email_html" for HTML emails, "generic" for everything else
- A human-readable label and optional detection_hint
### Step 3 — Ask focused questions (one at a time)
Cover these topics based on what you discovered:
1. How to map content to entity types (task / note / timeline entry)
2. Field mapping rules (e.g. email Subject → task title, filename → note title)
3. Priority or status rules (e.g. "urgent" in subject → high priority)
4. Date extraction (e.g. "by Friday" → dueDate)
5. Exclusion rules (e.g. skip newsletters, skip files with no project match)
### Step 4 — Produce the AgentConfig JSON
Once you are ≥ 90% confident, output the final config between these exact markers
(each on its own line):
{config_start}
{{
"content_types": [
{{
"id": "email_html",
"label": "Email HTML",
"detection_hint": "HTML file with From/To/Subject headers",
"preprocessing": "email_html",
"extraction_prompt": "Detailed extraction instructions for this content type..."
}}
],
"global_rules": [
"If the file cannot be matched to any project, do not create any entity."
],
"data_types": {data_types_json}
}}
{config_end}
## Rules for the extraction_prompt field
- Describe when to create a task vs note vs timeline entry (be specific and concrete)
- Include field mapping rules based on what you found in the directory
- Include priority/status/date rules if applicable
- Do NOT include projectId logic — the runner handles project assignment automatically
- Do NOT mention isAiSuggested — the runner always sets it to 1
## Constraints
- Never ask about projects, projectId, or how to link records to projects
- Never include projectId or project creation logic in the generated config
- Keep asking questions until ≥ 90% confident, then output the JSON immediately
{existing_section}\
Begin by exploring the directory, then ask your first question.\
"""
def _build_system_prompt(
directory: str,
data_types: list[str],
existing_config: str | None = None,
) -> tuple[str, Any]:
"""Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``."""
existing_section = (
"\nThe user already has the following AgentConfig — refine it based on their answers:\n"
f"```json\n{existing_config}\n```\n"
if existing_config
else ""
)
template, prompt_obj = get_prompt_or_fallback(
"journey_system", _JOURNEY_SYSTEM_PROMPT
)
compiled = compile_prompt(
template,
prompt_obj,
directory=directory,
data_types=", ".join(data_types),
data_types_json=json.dumps(data_types),
config_start=_CONFIG_START,
config_end=_CONFIG_END,
existing_section=existing_section,
)
return compiled, prompt_obj
# ── AgentConfig extraction ────────────────────────────────────────────────
def _extract_agent_config(text: str) -> str | None:
"""Return validated AgentConfig JSON string from between markers, or None.
Parses the JSON with Pydantic to ensure it conforms to the schema before
returning. Returns None if markers are absent or JSON is invalid.
"""
if _CONFIG_START not in text or _CONFIG_END not in text:
return None
start_idx = text.index(_CONFIG_START) + len(_CONFIG_START)
end_idx = text.index(_CONFIG_END)
raw = text[start_idx:end_idx].strip()
if not raw:
return None
try:
parsed = AgentConfig.model_validate_json(raw)
return parsed.model_dump_json()
except Exception as exc:
logger.warning("agent_setup: failed to parse AgentConfig JSON: %s", exc)
return None
# ── LLM call with tool support ───────────────────────────────────────────
def _as_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content)
async def _call_llm_with_tools(
system_prompt: str,
history: list[dict[str, Any]],
tools: list[Any],
*,
user_id: str = "",
session_id: str = "",
langfuse_prompt: Any = None,
) -> str:
"""Build LangChain messages from history and invoke the LLM with tools.
Handles tool-calling loops: if the LLM calls tools, execute them and
continue until a final text response is produced.
"""
lf = get_langfuse()
messages: list[Any] = [SystemMessage(content=system_prompt)]
for turn in history:
if turn["role"] == "user":
messages.append(HumanMessage(content=turn["content"]))
else:
messages.append(AIMessage(content=turn["content"]))
llm = get_agent_llm("setup", temperature=0.4)
llm_with_tools = llm.bind_tools(tools)
tool_map = {tool_def.name: tool_def for tool_def in tools}
_lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None)
_lf_ctx.__enter__()
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
name="journey-setup",
input=history[-1]["content"] if history else "",
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
try:
for step in range(_MAX_TOOL_STEPS):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name="journey-setup-llm",
model=model_for_agent("setup"),
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
resp_text = _as_text(response.content)
# Guard against empty responses (e.g. model returned finish_reason
# 'error' which LiteLLM maps to 'stop' with empty content).
if not response.tool_calls and not resp_text.strip():
logger.warning(
"agent_setup: journey LLM returned empty response at step %d — retrying",
step,
)
# Drop the empty AIMessage so we don't pollute history, and retry.
continue
messages.append(response)
if not response.tool_calls:
if _span:
_span.update(output=resp_text)
return resp_text
for call in response.tool_calls:
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"agent_setup: journey tool_call name=%s args=%s",
call_name,
json.dumps(call_args, ensure_ascii=True)[:500],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"agent_setup: journey tool_result name=%s output=%s",
call_name,
str(tool_output)[:800],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps.
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
if _span:
_span.update(output=final_text)
return final_text or (
"Sorry, I had trouble processing the files. "
"Could you try again? If the issue persists, the files might be too large for me to analyse."
)
finally:
if _span_ctx:
_span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf:
lf.flush()
# ── Journey handlers (called from device_ws.py) ──────────────────────────
async def handle_journey_start(
user_id: str,
frame: dict[str, Any],
) -> dict[str, Any]:
"""Handle a ``journey_start`` WS frame.
Creates a session, runs the setup LLM with directory exploration,
and returns the ``journey_reply`` payload.
"""
agent_type = frame.get("agent_type", "local")
directory = frame.get("directory", "")
data_types = frame.get("data_types", [])
existing_config = frame.get("existing_config")
# Use the session_id provided by the FE so the reply matches the
# listener key; fall back to a generated one if absent.
session_id = frame.get("session_id") or str(uuid.uuid4())
system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_config)
session = JourneySession(
session_id=session_id,
user_id=user_id,
agent_type=agent_type,
directory=directory,
data_types=data_types,
system_prompt=system_prompt,
langfuse_prompt=langfuse_prompt,
)
# Seed with an initial user message — some providers require at least one
# user/input message to be present.
seed_history: list[dict[str, Any]] = [
{"role": "user", "content": "Hi, I'm ready to set up my agent. Please explore my directory and ask me your first question."},
]
ai_reply = await _call_llm_with_tools(
system_prompt=system_prompt,
history=seed_history,
tools=make_directory_tools(directory),
user_id=user_id,
session_id=session_id,
langfuse_prompt=langfuse_prompt,
)
session.history.extend(seed_history)
session.history.append({"role": "assistant", "content": ai_reply})
_sessions[session_id] = session
logger.info(
"agent_setup: journey session %s started for user %s (directory=%s)",
session_id,
user_id,
directory,
)
# Check if the LLM produced the config on the first turn (unlikely but possible).
agent_config = _extract_agent_config(ai_reply)
done = agent_config is not None
display_message = ai_reply
if done:
display_message = (
ai_reply[: ai_reply.index(_CONFIG_START)].strip()
or "Here is your agent configuration. You can save it or continue refining."
)
_sessions.pop(session_id, None)
return {
"type": "journey_reply",
"session_id": session_id,
"message": display_message,
"done": done,
"agent_config": agent_config,
}
async def handle_journey_message(
user_id: str,
frame: dict[str, Any],
) -> dict[str, Any]:
"""Handle a ``journey_message`` WS frame.
Appends the user message, calls the LLM, and returns the
``journey_reply`` payload.
"""
session_id = frame.get("session_id", "")
message = frame.get("message", "")
session = get_journey_session(session_id, user_id)
if session is None:
return {
"type": "journey_reply",
"session_id": session_id,
"message": "Journey session not found or expired. Please start a new setup.",
"done": True,
"agent_config": None,
}
# Append user turn.
session.history.append({"role": "user", "content": message})
# Call the LLM with tools.
session_tools = make_directory_tools(session.directory)
ai_reply = await _call_llm_with_tools(
system_prompt=session.system_prompt,
history=session.history,
tools=session_tools,
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
)
session.history.append({"role": "assistant", "content": ai_reply})
# Check if the LLM produced the final config.
agent_config = _extract_agent_config(ai_reply)
done = agent_config is not None
# If the LLM didn't produce a config, nudge it once it hits the hard safety cap.
if not done:
turns = sum(1 for t in session.history if t["role"] == "user")
if turns >= _MAX_TURNS:
nudge_content = (
"[System: You have enough information. Please generate the final "
f"AgentConfig JSON now, wrapped in {_CONFIG_START} / {_CONFIG_END} markers.]"
)
session.history.append({"role": "user", "content": nudge_content})
nudge_reply = await _call_llm_with_tools(
system_prompt=session.system_prompt,
history=session.history,
tools=session_tools,
user_id=session.user_id,
session_id=session_id,
langfuse_prompt=session.langfuse_prompt,
)
session.history.append({"role": "assistant", "content": nudge_reply})
agent_config = _extract_agent_config(nudge_reply)
if agent_config is not None:
done = True
ai_reply = nudge_reply
display_message = ai_reply
if done:
display_message = (
ai_reply[: ai_reply.index(_CONFIG_START)].strip()
if _CONFIG_START in ai_reply
else "Here is your agent configuration. You can save it or continue refining."
)
_sessions.pop(session_id, None)
logger.info("agent_setup: journey session %s completed for user %s", session_id, user_id)
return {
"type": "journey_reply",
"session_id": session_id,
"message": display_message,
"done": done,
"agent_config": agent_config,
}