diff --git a/.env.example b/.env.example index fd3b5f9..98945d4 100644 --- a/.env.example +++ b/.env.example @@ -39,6 +39,13 @@ QDRANT_URL= QDRANT_API_KEY= # For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333 +# ── Langfuse (leave empty to disable observability) ─────────────────────────── +LANGFUSE_SECRET_KEY= +LANGFUSE_PUBLIC_KEY= +# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default) +# LANGFUSE_HOST=https://us.cloud.langfuse.com # US +# LANGFUSE_HOST=http://localhost:3000 # Self-hosted + # ── CORS ────────────────────────────────────────────────────────────────────── # Comma-separated list parsed by Settings (override default if needed) # CORS_ORIGINS=["app://.","http://localhost:3000"] diff --git a/app/api/routes/agent_setup.py b/app/api/routes/agent_setup.py index 2052d0b..0af3ff2 100644 --- a/app/api/routes/agent_setup.py +++ b/app/api/routes/agent_setup.py @@ -31,6 +31,8 @@ from typing import Any from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from app.agents.filesystem_agent import FILESYSTEM_TOOLS +from app.config.settings import settings +from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_llm logger = logging.getLogger(__name__) @@ -62,6 +64,7 @@ class JourneySession: data_types: list[str] history: list[dict[str, Any]] = field(default_factory=list) system_prompt: str = "" + langfuse_prompt: Any = None created_at: float = field(default_factory=time.monotonic) def is_expired(self) -> bool: @@ -146,20 +149,25 @@ def _build_system_prompt( directory: str, data_types: list[str], existing_template: str | None = None, -) -> str: +) -> tuple[str, Any]: + """Return ``(compiled_system_prompt, langfuse_prompt_obj_or_None)``.""" existing_section = ( f"\nThe user already has the following prompt_template — refine it based on their answers:\n" f"---\n{existing_template}\n---\n" if existing_template else "" ) - return _SYSTEM_PROMPT_TEMPLATE.format( + template, prompt_obj = get_prompt_or_fallback( + "journey_system", _SYSTEM_PROMPT_TEMPLATE + ) + compiled = template.format( directory=directory, data_types=", ".join(data_types), template_start=_TEMPLATE_START, template_end=_TEMPLATE_END, existing_section=existing_section, ) + return compiled, prompt_obj # ── Template extraction ─────────────────────────────────────────────────── @@ -199,12 +207,17 @@ async def _call_llm_with_tools( system_prompt: str, history: list[dict[str, Any]], tools: list[Any], + *, + user_id: str = "", + session_id: str = "", + langfuse_prompt: Any = None, ) -> str: """Build LangChain messages from history and invoke the LLM with tools. Handles tool-calling loops: if the LLM calls tools, execute them and continue until a final text response is produced. """ + lf = get_langfuse() messages: list[Any] = [SystemMessage(content=system_prompt)] for turn in history: if turn["role"] == "user": @@ -216,38 +229,76 @@ async def _call_llm_with_tools( llm_with_tools = llm.bind_tools(tools) tool_map = {tool_def.name: tool_def for tool_def in tools} - for _ in range(_MAX_TOOL_STEPS): - response: AIMessage = await llm_with_tools.ainvoke(messages) - messages.append(response) + _span_ctx = ( + lf.start_as_current_observation( + as_type="span", + name="journey-setup", + user_id=user_id or None, + session_id=session_id or None, + input=history[-1]["content"] if history else "", + ) + if lf else None + ) + _span = _span_ctx.__enter__() if _span_ctx else None - if not response.tool_calls: - return _as_text(response.content) - - for call in response.tool_calls: - call_name = str(call.get("name", "")) - call_args = call.get("args", {}) - logger.info( - "agent_setup: journey tool_call name=%s args=%s", - call_name, - json.dumps(call_args, ensure_ascii=True)[:500], + try: + for _ in range(_MAX_TOOL_STEPS): + _gen_ctx = ( + lf.start_as_current_observation( + as_type="generation", + name="journey-setup-llm", + model=settings.LLM_MODEL, + prompt=langfuse_prompt, + input=messages, + ) + if lf else None ) + _gen = _gen_ctx.__enter__() if _gen_ctx else None + response: AIMessage = await llm_with_tools.ainvoke(messages) + if _gen_ctx: + _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen_ctx.__exit__(None, None, None) - tool_fn = tool_map.get(call_name) - if tool_fn is None: - tool_output = f"Unknown tool: {call_name}" - else: - tool_output = await tool_fn.ainvoke(call_args) + messages.append(response) - logger.info( - "agent_setup: journey tool_result name=%s output=%s", - call_name, - str(tool_output)[:800], - ) - messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + if not response.tool_calls: + if _span: + _span.update(output=_as_text(response.content)) + return _as_text(response.content) - # Fallback: exceeded max steps. - final = await llm.ainvoke(messages) - return _as_text(final.content) + for call in response.tool_calls: + call_name = str(call.get("name", "")) + call_args = call.get("args", {}) + logger.info( + "agent_setup: journey tool_call name=%s args=%s", + call_name, + json.dumps(call_args, ensure_ascii=True)[:500], + ) + + tool_fn = tool_map.get(call_name) + if tool_fn is None: + tool_output = f"Unknown tool: {call_name}" + else: + tool_output = await tool_fn.ainvoke(call_args) + + logger.info( + "agent_setup: journey tool_result name=%s output=%s", + call_name, + str(tool_output)[:800], + ) + messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + + # Fallback: exceeded max steps. + final = await llm.ainvoke(messages) + final_text = _as_text(final.content) + if _span: + _span.update(output=final_text) + return final_text + finally: + if _span_ctx: + _span_ctx.__exit__(None, None, None) + if lf: + lf.flush() # ── Journey handlers (called from device_ws.py) ────────────────────────── @@ -270,7 +321,7 @@ async def handle_journey_start( # Use the session_id provided by the FE so the reply matches the # listener key; fall back to a generated one if absent. session_id = frame.get("session_id") or str(uuid.uuid4()) - system_prompt = _build_system_prompt(directory, data_types, existing_template) + system_prompt, langfuse_prompt = _build_system_prompt(directory, data_types, existing_template) session = JourneySession( session_id=session_id, @@ -279,6 +330,7 @@ async def handle_journey_start( directory=directory, data_types=data_types, system_prompt=system_prompt, + langfuse_prompt=langfuse_prompt, ) # The LLM will explore the directory using FILESYSTEM_TOOLS via the @@ -292,6 +344,9 @@ async def handle_journey_start( system_prompt=system_prompt, history=seed_history, tools=list(FILESYSTEM_TOOLS), + user_id=user_id, + session_id=session_id, + langfuse_prompt=langfuse_prompt, ) session.history.extend(seed_history) @@ -356,6 +411,9 @@ async def handle_journey_message( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), + user_id=session.user_id, + session_id=session_id, + langfuse_prompt=session.langfuse_prompt, ) session.history.append({"role": "assistant", "content": ai_reply}) @@ -379,6 +437,9 @@ async def handle_journey_message( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), + user_id=session.user_id, + session_id=session_id, + langfuse_prompt=session.langfuse_prompt, ) session.history.append({"role": "assistant", "content": nudge_reply}) diff --git a/app/config/settings.py b/app/config/settings.py index 796cdad..88b4de8 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -52,6 +52,10 @@ class Settings(BaseSettings): CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"] + LANGFUSE_SECRET_KEY: str = "" + LANGFUSE_PUBLIC_KEY: str = "" + LANGFUSE_HOST: str = "https://cloud.langfuse.com" + ENV: Literal["dev", "prod"] = "dev" model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index c11324e..03cf8a3 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -42,7 +42,9 @@ from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS +from app.config.settings import settings from app.core.device_manager import DeviceConnectionManager +from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_llm from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.db import async_session @@ -268,8 +270,12 @@ async def _run_agent_with_tools( user_message: str, tools: list[Any], max_steps: int, + user_id: str = "", + langfuse_prompt: Any = None, + agent_name: str = "batch-agent", ) -> str: """Run an LLM agent with tool-calling, returning the final text response.""" + lf = get_langfuse() llm = get_llm() llm_with_tools = llm.bind_tools(tools) messages: list[Any] = [ @@ -279,38 +285,76 @@ async def _run_agent_with_tools( tool_map = {tool_def.name: tool_def for tool_def in tools} - for _ in range(max_steps): - response: AIMessage = await llm_with_tools.ainvoke(messages) - messages.append(response) + _span_ctx = ( + lf.start_as_current_observation( + as_type="span", + name=agent_name, + user_id=user_id or None, + input=user_message, + ) + if lf else None + ) + _span = _span_ctx.__enter__() if _span_ctx else None - if not response.tool_calls: - return _as_text(response.content) - - for call in response.tool_calls: - call_id = str(call.get("id", "")) - call_name = str(call.get("name", "")) - call_args = call.get("args", {}) - logger.info( - "agent_runner: tool_call name=%s args=%s", - call_name, - json.dumps(call_args, ensure_ascii=True)[:800], + try: + for _ in range(max_steps): + _gen_ctx = ( + lf.start_as_current_observation( + as_type="generation", + name=f"{agent_name}-llm", + model=settings.LLM_MODEL, + prompt=langfuse_prompt, + input=messages, + ) + if lf else None ) + _gen = _gen_ctx.__enter__() if _gen_ctx else None + response: AIMessage = await llm_with_tools.ainvoke(messages) + if _gen_ctx: + _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen_ctx.__exit__(None, None, None) - tool_fn = tool_map.get(call_name) - if tool_fn is None: - tool_output = f"Unknown tool: {call_name}" - else: - tool_output = await tool_fn.ainvoke(call_args) + messages.append(response) - logger.info( - "agent_runner: tool_result name=%s output=%s", - call_name, - str(tool_output)[:200], - ) - messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + if not response.tool_calls: + final_text = _as_text(response.content) + if _span: + _span.update(output=final_text) + return final_text - final = await llm.ainvoke(messages) - return _as_text(final.content) + for call in response.tool_calls: + call_id = str(call.get("id", "")) + call_name = str(call.get("name", "")) + call_args = call.get("args", {}) + logger.info( + "agent_runner: tool_call name=%s args=%s", + call_name, + json.dumps(call_args, ensure_ascii=True)[:800], + ) + + tool_fn = tool_map.get(call_name) + if tool_fn is None: + tool_output = f"Unknown tool: {call_name}" + else: + tool_output = await tool_fn.ainvoke(call_args) + + logger.info( + "agent_runner: tool_result name=%s output=%s", + call_name, + str(tool_output)[:200], + ) + messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + + final = await llm.ainvoke(messages) + final_text = _as_text(final.content) + if _span: + _span.update(output=final_text) + return final_text + finally: + if _span_ctx: + _span_ctx.__exit__(None, None, None) + if lf: + lf.flush() # ── Tool list builder ───────────────────────────────────────────────────── @@ -515,17 +559,33 @@ async def _classify_file( if d in _DOMAIN_DESCRIPTIONS ) - system = _STEP1_SYSTEM_PROMPT.format( + step1_template, step1_prompt_obj = get_prompt_or_fallback( + "batch_file_classifier", _STEP1_SYSTEM_PROMPT + ) + system = step1_template.format( domain_definitions=domain_definitions, projects_list=projects_list, ) + lf = get_langfuse() llm = get_llm() + classifier_messages = [ + SystemMessage(content=system), + HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), + ] try: - response = await llm.ainvoke([ - SystemMessage(content=system), - HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"), - ]) + if lf: + with lf.start_as_current_observation( + as_type="generation", + name="step1-classifier", + model=settings.LLM_ROUTER_MODEL, + prompt=step1_prompt_obj, + input=classifier_messages, + ) as gen: + response = await llm.ainvoke(classifier_messages) + gen.update(output=_as_text(response.content), usage=extract_usage(response)) + else: + response = await llm.ainvoke(classifier_messages) raw = _as_text(response.content).strip() # Strip markdown fences if the model wraps the JSON. if raw.startswith("```"): @@ -713,7 +773,10 @@ async def run_local_agent( existing_context = "\n\n".join(existing_blocks) - system_prompt = _PROCESSING_SYSTEM_PROMPT.format( + step2_template, step2_prompt_obj = get_prompt_or_fallback( + "batch_processing", _PROCESSING_SYSTEM_PROMPT + ) + system_prompt = step2_template.format( existing_context=existing_context, project_context=project_context, data_types=", ".join(domains), @@ -730,6 +793,9 @@ async def run_local_agent( ), tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, + user_id=user_id, + langfuse_prompt=step2_prompt_obj, + agent_name="step2-processor", ) logger.info( "agent_runner: run=%s file=%r result=%s", @@ -928,7 +994,10 @@ async def run_cloud_agent( continue items_processed += 1 - processing_prompt = _CLOUD_PROCESSING_PROMPT.format( + cloud_template, cloud_prompt_obj = get_prompt_or_fallback( + "batch_cloud_processing", _CLOUD_PROCESSING_PROMPT + ) + processing_prompt = cloud_template.format( data_types=", ".join(config.data_types), project_context="Determine the appropriate project from the message context.", file_list=f"Message from {config.provider} (id: {msg.id})", @@ -941,6 +1010,9 @@ async def run_cloud_agent( user_message=f"Process this message content:\n\n{content_text[:8000]}", tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, + user_id=user_id, + langfuse_prompt=cloud_prompt_obj, + agent_name="cloud-processor", ) except Exception as exc: errors.append(f"LLM processing error for message {msg.id!r}: {exc}") diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index 0e490a5..4f6aa32 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -16,7 +16,9 @@ from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS +from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_llm +from app.config.settings import settings from app.core.memory_middleware import MemoryMiddleware from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.db import async_session @@ -536,17 +538,31 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[ try: llm = get_llm() - response = await llm.ainvoke( - [ - SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM), - HumanMessage( - content=( - f"Message:\n{message}\n\n" - f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}" - ) - ), - ] + classifier_messages = [ + SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_SYSTEM), + HumanMessage( + content=( + f"Message:\n{message}\n\n" + f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}" + ) + ), + ] + lf = get_langfuse() + _, classifier_prompt_obj = get_prompt_or_fallback( + "floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_SYSTEM ) + if lf: + with lf.start_as_current_observation( + as_type="generation", + name="floating-classifier", + model=settings.LLM_MODEL, + prompt=classifier_prompt_obj, + input=classifier_messages, + ) as gen: + response = await llm.ainvoke(classifier_messages) + gen.update(output=_as_text(response.content), usage=extract_usage(response)) + else: + response = await llm.ainvoke(classifier_messages) parsed = _parse_json_object(_as_text(response.content)) if parsed is not None: domain = _normalize_domain_payload(parsed, project_id) @@ -571,8 +587,11 @@ async def _run_single_agent( message: str, context: dict[str, Any], max_steps: int = 6, + langfuse_prompt: Any = None, + agent_name: str = "agent", ) -> str: trace_id = _trace_id_from_context(context) + lf = get_langfuse() llm = get_llm() tools = _all_tools_for_user(user_id, trace_id) model_context = _context_for_model(context) @@ -591,9 +610,37 @@ async def _run_single_agent( tool_calls_count = 0 collected: list[dict[str, Any]] = [] set_tool_result_collector(collected) + + _span_ctx = ( + lf.start_as_current_observation( + as_type="span", + name=agent_name, + user_id=user_id, + session_id=trace_id, + input=message, + ) + if lf else None + ) + _span = _span_ctx.__enter__() if _span_ctx else None + try: for _ in range(max_steps): + _gen_ctx = ( + lf.start_as_current_observation( + as_type="generation", + name=f"{agent_name}-llm", + model=settings.LLM_MODEL, + prompt=langfuse_prompt, + input=messages, + ) + if lf else None + ) + _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) + if _gen_ctx: + _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen_ctx.__exit__(None, None, None) + messages.append(response) if not response.tool_calls: @@ -605,6 +652,8 @@ async def _run_single_agent( tool_calls_count, len(final_text), ) + if _span: + _span.update(output=final_text) return final_text tool_map = {tool_def.name: tool_def for tool_def in tools} @@ -644,9 +693,15 @@ async def _run_single_agent( tool_calls_count, len(final_text), ) + if _span: + _span.update(output=final_text) return final_text finally: clear_tool_result_collector() + if _span_ctx: + _span_ctx.__exit__(None, None, None) + if lf: + lf.flush() async def _run_single_agent_stream( @@ -656,8 +711,11 @@ async def _run_single_agent_stream( message: str, context: dict[str, Any], max_steps: int = 6, + langfuse_prompt: Any = None, + agent_name: str = "agent", ) -> AsyncGenerator[tuple[str, Any], None]: trace_id = _trace_id_from_context(context) + lf = get_langfuse() llm = get_llm() tools = _all_tools_for_user(user_id, trace_id) model_context = _context_for_model(context) @@ -677,9 +735,38 @@ async def _run_single_agent_stream( streamed_chars = 0 collected: list[dict[str, Any]] = [] set_tool_result_collector(collected) + + _span_ctx = ( + lf.start_as_current_observation( + as_type="span", + name=f"{agent_name}-stream", + user_id=user_id, + session_id=trace_id, + input=message, + ) + if lf else None + ) + _span = _span_ctx.__enter__() if _span_ctx else None + streamed_text: list[str] = [] + try: for _ in range(max_steps): + _gen_ctx = ( + lf.start_as_current_observation( + as_type="generation", + name=f"{agent_name}-llm", + model=settings.LLM_MODEL, + prompt=langfuse_prompt, + input=messages, + ) + if lf else None + ) + _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) + if _gen_ctx: + _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen_ctx.__exit__(None, None, None) + messages.append(response) if not response.tool_calls: @@ -688,6 +775,7 @@ async def _run_single_agent_stream( token = _as_text(getattr(chunk, "content", "")) if token: streamed_chars += len(token) + streamed_text.append(token) emitted_any = True yield "token", token @@ -696,6 +784,7 @@ async def _run_single_agent_stream( fallback_text = _as_text(response.content) if fallback_text: streamed_chars += len(fallback_text) + streamed_text.append(fallback_text) yield "token", fallback_text logger.info( "deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d", @@ -704,6 +793,8 @@ async def _run_single_agent_stream( tool_calls_count, streamed_chars, ) + if _span: + _span.update(output="".join(streamed_text)) return tool_map = {tool_def.name: tool_def for tool_def in tools} @@ -738,6 +829,7 @@ async def _run_single_agent_stream( token = _as_text(getattr(chunk, "content", "")) if token: streamed_chars += len(token) + streamed_text.append(token) yield "token", token logger.info( "deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1", @@ -746,17 +838,28 @@ async def _run_single_agent_stream( tool_calls_count, streamed_chars, ) + if _span: + _span.update(output="".join(streamed_text)) finally: clear_tool_result_collector() + if _span_ctx: + _span_ctx.__exit__(None, None, None) + if lf: + lf.flush() async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str: prepared_context = await _prepare_context(message, context) + system_prompt, langfuse_prompt = get_prompt_or_fallback( + "home_system", _HOME_SINGLE_AGENT_SYSTEM + ) response = await _run_single_agent( user_id=user_id, - system_prompt=_HOME_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_prompt=langfuse_prompt, + agent_name="home-agent", ) return _normalize_tagged_list_lines(response, message) @@ -764,11 +867,16 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str: async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]: prepared_context = await _prepare_context(message, context) domain = await _infer_floating_domain(message, prepared_context) + system_prompt, langfuse_prompt = get_prompt_or_fallback( + "floating_system", _FLOATING_SINGLE_AGENT_SYSTEM + ) response = await _run_single_agent( user_id=user_id, - system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_prompt=langfuse_prompt, + agent_name="floating-agent", ) sanitized = _strip_floating_markup(response) if not sanitized and response: @@ -782,12 +890,17 @@ async def run_home_stream( context: dict[str, Any], ) -> AsyncGenerator[tuple[str, Any], None]: prepared_context = await _prepare_context(message, context) + system_prompt, langfuse_prompt = get_prompt_or_fallback( + "home_system", _HOME_SINGLE_AGENT_SYSTEM + ) text_chunks: list[str] = [] async for event in _run_single_agent_stream( user_id=user_id, - system_prompt=_HOME_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_prompt=langfuse_prompt, + agent_name="home-agent", ): event_type, data = event if event_type != "token": @@ -809,14 +922,19 @@ async def run_floating_stream( domain = await _infer_floating_domain(message, prepared_context) yield "floating_domain", domain + system_prompt, langfuse_prompt = get_prompt_or_fallback( + "floating_system", _FLOATING_SINGLE_AGENT_SYSTEM + ) sanitizer = _FloatingStreamSanitizer() emitted_sanitized = False raw_chunks: list[str] = [] async for event in _run_single_agent_stream( user_id=user_id, - system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_prompt=langfuse_prompt, + agent_name="floating-agent", ): event_type, data = event if event_type != "token": diff --git a/app/core/langfuse_client.py b/app/core/langfuse_client.py new file mode 100644 index 0000000..745f649 --- /dev/null +++ b/app/core/langfuse_client.py @@ -0,0 +1,114 @@ +"""Langfuse observability — singleton client and prompt helpers. + +If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set, +all helpers are no-ops so the app works without Langfuse configured. + +Usage +----- +Tracing:: + + from app.core.langfuse_client import get_langfuse + + lf = get_langfuse() + if lf: + with lf.start_as_current_observation(as_type="span", name="my-agent") as span: + span.update(input=user_message) + # ... do work ... + span.update(output=result) + lf.flush() + +Prompt management:: + + from app.core.langfuse_client import get_prompt_or_fallback + + text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT) + # Use text as the system prompt; pass prompt_obj to generations for linking. + +Linking a prompt to a generation:: + + with lf.start_as_current_observation( + as_type="generation", + name="llm-call", + model="gpt-4o", + prompt=prompt_obj, # links generation → prompt version in the UI + input=messages, + ) as gen: + response = await llm.ainvoke(messages) + gen.update(output=response.content, usage=_usage(response)) +""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +_client: Any = None +_initialized: bool = False + + +def get_langfuse() -> Any | None: + """Return the Langfuse singleton, or ``None`` when not configured.""" + global _client, _initialized + if _initialized: + return _client + _initialized = True + + from app.config.settings import settings # local import to avoid circular deps + + if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY: + logger.debug("langfuse: not configured — observability disabled") + return None + + try: + from langfuse import Langfuse + + _client = Langfuse( + secret_key=settings.LANGFUSE_SECRET_KEY, + public_key=settings.LANGFUSE_PUBLIC_KEY, + host=settings.LANGFUSE_HOST, + ) + logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST) + except Exception as exc: + logger.warning("langfuse: failed to initialize: %s", exc) + _client = None + + return _client + + +def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]: + """Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error. + + Returns ``(prompt_text, prompt_obj_or_None)``. + + * ``prompt_text`` — the raw template string (variables not yet substituted). + Callers perform variable substitution with Python's ``.format()``. + * ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is + unavailable / the fetch failed. Pass this to generation observations so + Langfuse links the generation to the exact prompt version in the UI. + """ + lf = get_langfuse() + if lf is None: + return fallback, None + + try: + prompt = lf.get_prompt(name, label="production", fallback=fallback) + # For text-type prompts .prompt holds the raw template string. + raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback + return raw, prompt + except Exception as exc: + logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc) + return fallback, None + + +def extract_usage(response: Any) -> dict[str, int]: + """Extract token usage from a LangChain AI message into Langfuse format.""" + meta = getattr(response, "usage_metadata", None) + if not meta: + return {} + return { + "input": int(meta.get("input_tokens", 0)), + "output": int(meta.get("output_tokens", 0)), + "total": int(meta.get("total_tokens", 0)), + } diff --git a/requirements.txt b/requirements.txt index ea10f59..023fe42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,4 +32,5 @@ google-auth-oauthlib>=1.2.0 google-auth-httplib2>=0.2.0 msal>=1.28.0 cryptography>=42.0.0 +langfuse>=2.0.0 ruff>=0.8.0