"""Chatbot Journey — guided conversation to build an agent prompt_template. Adapted for Batch Agent Service: imports from app.agents.filesystem_agent and app.llm instead of monolith paths. Session state is in-memory (could be moved to Redis for horizontal scaling in the future). Journey flow: 1. Redis consumer dispatches ``journey_start`` with basic agent config. 2. Server creates an in-memory session, runs the setup LLM with file-system tools to explore the directory, returns first question. 3. ``journey_message`` frames drive the conversation. 4. After 3-5 turns the LLM emits PROMPT_TEMPLATE_START / _END block. 5. Server parses the block and returns ``journey_reply`` with ``done=True``. """ from __future__ import annotations import json import logging import time import uuid from dataclasses import dataclass, field from typing import Any from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.llm import get_llm import app.tracing as tracing logger = logging.getLogger(__name__) # ── Session TTL ─────────────────────────────────────────────────────────── _SESSION_TTL_SECONDS: int = 1800 # 30 minutes # Sentinel strings used to delimit the LLM-produced prompt_template. _TEMPLATE_START = "PROMPT_TEMPLATE_START" _TEMPLATE_END = "PROMPT_TEMPLATE_END" _MIN_TURNS_BEFORE_NUDGE: int = 3 _MAX_TURNS: int = 15 _MAX_TOOL_STEPS: int = 6 # ── In-memory session store ─────────────────────────────────────────────── @dataclass class JourneySession: session_id: str user_id: str agent_type: str # "local" | "cloud" directory: str data_types: list[str] history: list[dict[str, Any]] = field(default_factory=list) system_prompt: str = "" created_at: float = field(default_factory=time.monotonic) def is_expired(self) -> bool: return (time.monotonic() - self.created_at) > _SESSION_TTL_SECONDS # session_id → session _sessions: dict[str, JourneySession] = {} def get_journey_session(session_id: str, user_id: str) -> JourneySession | None: """Retrieve session; return None on missing, expired, or wrong owner.""" s = _sessions.get(session_id) if s is None or s.is_expired(): _sessions.pop(session_id, None) return None if s.user_id != user_id: return None return s # ── System prompt builder ───────────────────────────────────────────────── _SYSTEM_PROMPT_TEMPLATE = """\ You are a friendly assistant helping a freelancer configure a data-extraction agent. Your job is to understand exactly what data the user wants to extract from their local directory and produce a detailed prompt_template that a separate AI will use as its instruction set. The extraction agent already has this base behaviour built in: - Reads each file using file-system tools. - Creates records (tasks, notes, timelines, projects) via CRUD tools. - Sets isAiSuggested=1 on every new record. - Only extracts data explicitly present in the files — it never invents information. The user's custom prompt is appended AFTER this base behaviour, so focus on what to look for and how to map it — not on the general extraction mechanics. You have access to file-system tools to explore the user's directory: - list_directory: to see folder structure - read_file_content: to peek at file contents - get_file_metadata: to check file info The user's configured directory is: {directory} Target data types: {data_types} IMPORTANT — project assignment is handled automatically by the main agent runner before the custom prompt is ever used. You MUST NOT ask the user about projects, projectId, or how to link records to projects. Never include projectId logic or project creation instructions in the generated prompt_template. Start by exploring the directory to understand its structure. Then ask concise, focused questions one at a time. Cover these topics (not necessarily in this order): 1. The type and format of the source content (confirmed by your exploration). 2. How fields should be mapped (e.g. filename → task title). 3. Priority or status rules (e.g. "urgent" keyword → high priority). 4. Any special handling, date extraction, or exclusions. Once you reach 90% confidence, output the final prompt_template between these exact markers on their own lines: {template_start} {template_end} The prompt_template must be a self-contained instruction for an AI that reads files and must perform CRUD operations using tools to create records. It should specify: - What entity types to create (tasks, notes, timelines) — never projects. - How to map file content to record fields (camelCase: title, status, priority, dueDate, content, etc.) — never include projectId. - That isAiSuggested must be set to 1 on every new record. - Concrete examples of mappings based on what you discovered in the directory. {existing_section}\ Keep asking clarifying questions until you are at least 90% confident you have enough information to generate an accurate prompt_template. Once you reach that confidence level, stop asking and produce the final template immediately. Begin by exploring the directory, then ask your first question.\ """ def _build_system_prompt( directory: str, data_types: list[str], existing_template: str | None = None, ) -> str: existing_section = ( f"\nThe user already has the following prompt_template — refine it based on their answers:\n" f"---\n{existing_template}\n---\n" if existing_template else "" ) # Try Langfuse-managed prompt first, fall back to hardcoded template managed = tracing.get_prompt("journey_system", fallback=None) template = managed if managed is not None else _SYSTEM_PROMPT_TEMPLATE return template.format( directory=directory, data_types=", ".join(data_types), template_start=_TEMPLATE_START, template_end=_TEMPLATE_END, existing_section=existing_section, ) # ── Template extraction ─────────────────────────────────────────────────── def _extract_template(text: str) -> str | None: """Return the text between PROMPT_TEMPLATE_START and PROMPT_TEMPLATE_END, or None.""" if _TEMPLATE_START not in text or _TEMPLATE_END not in text: return None start_idx = text.index(_TEMPLATE_START) + len(_TEMPLATE_START) end_idx = text.index(_TEMPLATE_END) return text[start_idx:end_idx].strip() or None # ── LLM call with tool support ─────────────────────────────────────────── def _as_text(content: Any) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): text = item.get("text") if isinstance(text, str): parts.append(text) return "".join(parts) return str(content) async def _call_llm_with_tools( system_prompt: str, history: list[dict[str, Any]], tools: list[Any], langfuse_handler: Any | None = None, ) -> str: """Build LangChain messages from history and invoke the LLM with tools. Handles tool-calling loops: if the LLM calls tools, execute them and continue until a final text response is produced. """ messages: list[Any] = [SystemMessage(content=system_prompt)] for turn in history: if turn["role"] == "user": messages.append(HumanMessage(content=turn["content"])) else: messages.append(AIMessage(content=turn["content"])) callbacks = [langfuse_handler] if langfuse_handler else None llm = get_llm(model=None, temperature=0.4, callbacks=callbacks) llm_with_tools = llm.bind_tools(tools) tool_map = {tool_def.name: tool_def for tool_def in tools} for _ in range(_MAX_TOOL_STEPS): response: AIMessage = await llm_with_tools.ainvoke(messages) messages.append(response) if not response.tool_calls: return _as_text(response.content) for call in response.tool_calls: call_name = str(call.get("name", "")) call_args = call.get("args", {}) logger.info( "journey: tool_call name=%s args=%s", call_name, json.dumps(call_args, ensure_ascii=True)[:500], ) tool_fn = tool_map.get(call_name) if tool_fn is None: tool_output = f"Unknown tool: {call_name}" else: tool_output = await tool_fn.ainvoke(call_args) logger.info( "journey: tool_result name=%s output=%s", call_name, str(tool_output)[:800], ) messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) # Fallback: exceeded max tool steps. final = await llm.ainvoke(messages) return _as_text(final.content) # ── Journey handlers (called from redis_consumer) ──────────────────────── async def handle_journey_start( user_id: str, frame: dict[str, Any], *, langfuse_handler: Any | None = None, ) -> dict[str, Any]: """Handle a ``journey_start`` request. Creates a session, runs the setup LLM with directory exploration, and returns the ``journey_reply`` payload. """ agent_type = frame.get("agent_type", "local") directory = frame.get("directory", "") data_types = frame.get("data_types", []) existing_template = frame.get("existing_template") session_id = frame.get("session_id") or str(uuid.uuid4()) system_prompt = _build_system_prompt(directory, data_types, existing_template) session = JourneySession( session_id=session_id, user_id=user_id, agent_type=agent_type, directory=directory, data_types=data_types, system_prompt=system_prompt, ) seed_history: list[dict[str, Any]] = [ {"role": "user", "content": "Hi, I'm ready to set up my agent. Please explore my directory and ask me your first question."}, ] ai_reply = await _call_llm_with_tools( system_prompt=system_prompt, history=seed_history, tools=list(FILESYSTEM_TOOLS), langfuse_handler=langfuse_handler, ) session.history.extend(seed_history) session.history.append({"role": "assistant", "content": ai_reply}) _sessions[session_id] = session logger.info( "journey: session %s started for user %s (directory=%s)", session_id, user_id, directory, ) prompt_template = _extract_template(ai_reply) done = prompt_template is not None display_message = ai_reply if done: display_message = ( ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() or "Here is your agent configuration. You can save it or continue refining." ) _sessions.pop(session_id, None) return { "type": "journey_reply", "session_id": session_id, "message": display_message, "done": done, "prompt_template": prompt_template, } async def handle_journey_message( user_id: str, frame: dict[str, Any], *, langfuse_handler: Any | None = None, ) -> dict[str, Any]: """Handle a ``journey_message`` request. Appends the user message, calls the LLM, and returns the ``journey_reply`` payload. """ session_id = frame.get("session_id", "") message = frame.get("message", "") session = get_journey_session(session_id, user_id) if session is None: return { "type": "journey_reply", "session_id": session_id, "message": "Journey session not found or expired. Please start a new setup.", "done": True, "prompt_template": None, } session.history.append({"role": "user", "content": message}) ai_reply = await _call_llm_with_tools( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), langfuse_handler=langfuse_handler, ) session.history.append({"role": "assistant", "content": ai_reply}) prompt_template = _extract_template(ai_reply) done = prompt_template is not None if not done: turns = sum(1 for t in session.history if t["role"] == "user") if turns >= _MAX_TURNS: nudge_content = ( "[System: You have enough information. Please generate the final " f"prompt_template now, wrapped in {_TEMPLATE_START} / {_TEMPLATE_END} markers.]" ) session.history.append({"role": "user", "content": nudge_content}) nudge_reply = await _call_llm_with_tools( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), langfuse_handler=langfuse_handler, ) session.history.append({"role": "assistant", "content": nudge_reply}) prompt_template = _extract_template(nudge_reply) if prompt_template is not None: done = True ai_reply = nudge_reply display_message = ai_reply if done: display_message = ( ai_reply[: ai_reply.index(_TEMPLATE_START)].strip() if _TEMPLATE_START in ai_reply else "Here is your agent configuration. You can save it or continue refining." ) _sessions.pop(session_id, None) logger.info("journey: session %s completed for user %s", session_id, user_id) return { "type": "journey_reply", "session_id": session_id, "message": display_message, "done": done, "prompt_template": prompt_template, }