diff --git a/services/chat/app/deep_agent.py b/services/chat/app/deep_agent.py index 6f7254d..1472b15 100644 --- a/services/chat/app/deep_agent.py +++ b/services/chat/app/deep_agent.py @@ -23,6 +23,7 @@ from app.agents.timeline_agent import TIMELINE_TOOLS from app.llm import get_llm from app.memory_middleware import MemoryMiddleware from app.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector +from app import tracing from shared.db import async_session logger = logging.getLogger(__name__) @@ -566,6 +567,19 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[ return _infer_floating_domain_rule_based(message, context) +def _get_system_prompt(langfuse_name: str, fallback: str) -> str: + """Fetch a managed prompt from Langfuse, falling back to the hardcoded string.""" + managed = tracing.get_prompt(langfuse_name, fallback=None) + return managed if managed is not None else fallback + + +def _build_callbacks(langfuse_handler: Any | None) -> list[Any] | None: + """Return a callbacks list if a Langfuse handler is available.""" + if langfuse_handler is None: + return None + return [langfuse_handler] + + async def _run_single_agent( *, user_id: str, @@ -573,9 +587,11 @@ async def _run_single_agent( message: str, context: dict[str, Any], max_steps: int = 6, + langfuse_handler: Any | None = None, ) -> str: trace_id = _trace_id_from_context(context) - llm = get_llm() + callbacks = _build_callbacks(langfuse_handler) + llm = get_llm(callbacks=callbacks) tools = _all_tools_for_user(user_id, trace_id) model_context = _context_for_model(context) logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id) @@ -658,9 +674,11 @@ async def _run_single_agent_stream( message: str, context: dict[str, Any], max_steps: int = 6, + langfuse_handler: Any | None = None, ) -> AsyncGenerator[tuple[str, Any], None]: trace_id = _trace_id_from_context(context) - llm = get_llm() + callbacks = _build_callbacks(langfuse_handler) + llm = get_llm(callbacks=callbacks) tools = _all_tools_for_user(user_id, trace_id) model_context = _context_for_model(context) logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id) @@ -751,25 +769,29 @@ async def _run_single_agent_stream( clear_tool_result_collector() -async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str: +async def run_home(user_id: str, message: str, context: dict[str, Any], *, langfuse_handler: Any | None = None) -> str: prepared_context = await _prepare_context(message, context) + system_prompt = _get_system_prompt("home_system", _HOME_SINGLE_AGENT_SYSTEM) response = await _run_single_agent( user_id=user_id, - system_prompt=_HOME_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_handler=langfuse_handler, ) return _normalize_tagged_list_lines(response, message) -async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]: +async def run_floating(user_id: str, message: str, context: dict[str, Any], *, langfuse_handler: Any | None = None) -> tuple[str, dict[str, str | None]]: prepared_context = await _prepare_context(message, context) domain = await _infer_floating_domain(message, prepared_context) + system_prompt = _get_system_prompt("floating_system", _FLOATING_SINGLE_AGENT_SYSTEM) response = await _run_single_agent( user_id=user_id, - system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_handler=langfuse_handler, ) sanitized = _strip_floating_markup(response) if not sanitized and response: @@ -781,14 +803,18 @@ async def run_home_stream( user_id: str, message: str, context: dict[str, Any], + *, + langfuse_handler: Any | None = None, ) -> AsyncGenerator[tuple[str, Any], None]: prepared_context = await _prepare_context(message, context) + system_prompt = _get_system_prompt("home_system", _HOME_SINGLE_AGENT_SYSTEM) text_chunks: list[str] = [] async for event in _run_single_agent_stream( user_id=user_id, - system_prompt=_HOME_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_handler=langfuse_handler, ): event_type, data = event if event_type != "token": @@ -805,19 +831,23 @@ async def run_floating_stream( user_id: str, message: str, context: dict[str, Any], + *, + langfuse_handler: Any | None = None, ) -> AsyncGenerator[tuple[str, Any], None]: prepared_context = await _prepare_context(message, context) domain = await _infer_floating_domain(message, prepared_context) yield "floating_domain", domain + system_prompt = _get_system_prompt("floating_system", _FLOATING_SINGLE_AGENT_SYSTEM) sanitizer = _FloatingStreamSanitizer() emitted_sanitized = False raw_chunks: list[str] = [] async for event in _run_single_agent_stream( user_id=user_id, - system_prompt=_FLOATING_SINGLE_AGENT_SYSTEM, + system_prompt=system_prompt, message=message, context=prepared_context, + langfuse_handler=langfuse_handler, ): event_type, data = event if event_type != "token": diff --git a/services/chat/app/llm.py b/services/chat/app/llm.py index f3178b9..0a6cdce 100644 --- a/services/chat/app/llm.py +++ b/services/chat/app/llm.py @@ -42,6 +42,7 @@ def get_llm( *, model: str | None = None, temperature: float = 0, + callbacks: list | None = None, ) -> ChatOpenAI | ChatLiteLLM: model = model or settings.LLM_MODEL @@ -49,22 +50,16 @@ def get_llm( os.environ.setdefault("GITHUB_COPILOT_TOKEN_DIR", settings.GITHUB_COPILOT_TOKEN_DIR) if "/" in model: - return ChatLiteLLM(model=model, temperature=temperature) + return ChatLiteLLM(model=model, temperature=temperature, callbacks=callbacks) return ChatOpenAI( model=model, temperature=temperature, api_key=_api_key_for_model(model), + callbacks=callbacks, ) -def get_router_llm( - *, - temperature: float = 0, -) -> ChatOpenAI | ChatLiteLLM: - return get_llm(model=settings.LLM_ROUTER_MODEL, temperature=temperature) - - async def embed(text: str) -> list[float]: model = settings.LLM_EMBED_MODEL diff --git a/services/chat/app/main.py b/services/chat/app/main.py index 7044573..1ac3bad 100644 --- a/services/chat/app/main.py +++ b/services/chat/app/main.py @@ -6,8 +6,15 @@ streams responses back via Redis pub/sub to WS Gateway. Owns: memory_core, memory_associative, memory_episodic, memory_proactive tables. """ +import sys from contextlib import asynccontextmanager import logging +from pathlib import Path + +# Ensure the repo root is on sys.path so "shared" is importable in local dev. +_repo_root = str(Path(__file__).resolve().parents[3]) +if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -31,6 +38,10 @@ async def lifespan(app: FastAPI): yield consumer_task.cancel() + from app.tracing import shutdown as shutdown_langfuse + + shutdown_langfuse() + from shared.db import engine await engine.dispose() diff --git a/services/chat/app/redis_consumer.py b/services/chat/app/redis_consumer.py index d2579ce..70689d2 100644 --- a/services/chat/app/redis_consumer.py +++ b/services/chat/app/redis_consumer.py @@ -18,6 +18,7 @@ from app.deep_agent import run_floating_stream, run_home_stream from app.memory_middleware import MemoryMiddleware from app.output_formatter import StreamFormatter from app.ws_context import clear_current_user, set_current_user +from app import tracing logger = logging.getLogger(__name__) @@ -84,6 +85,19 @@ async def _handle_home_request(user_id: str, frame: dict) -> None: user_id, request_id, message[:200], ) + # Create Langfuse trace + trace = tracing.create_trace( + name="home_request", + user_id=user_id, + session_id=session_id, + trace_id=request_id, + metadata={"message_preview": message[:200]}, + tags=["home"], + ) + langfuse_handler = tracing.get_langfuse_callback( + trace=trace, span_name="home_agent", + ) + # Enrich with memory context async with async_session() as db: memory = MemoryMiddleware(db) @@ -101,7 +115,7 @@ async def _handle_home_request(user_id: str, frame: dict) -> None: set_current_user(user_id) response_chunks: list[str] = [] try: - event_stream = run_home_stream(user_id, message, context) + event_stream = run_home_stream(user_id, message, context, langfuse_handler=langfuse_handler) formatter = StreamFormatter(request_id=request_id) async for ws_frame in formatter.format(event_stream): await _publish_frame(user_id, ws_frame.model_dump_json()) @@ -112,6 +126,13 @@ async def _handle_home_request(user_id: str, frame: dict) -> None: finally: clear_current_user() + # Link prompt and flush trace + if trace is not None: + tracing.link_prompt_to_trace(trace, "home_system") + response_text = "".join(response_chunks) + trace.update(output=response_text[:500] if response_text else None) + tracing.flush() + # Store episode async with async_session() as db: memory = MemoryMiddleware(db) @@ -133,6 +154,19 @@ async def _handle_floating_request(user_id: str, frame: dict) -> None: user_id, request_id, json.dumps(scope)[:200], message[:200], ) + # Create Langfuse trace + trace = tracing.create_trace( + name="floating_request", + user_id=user_id, + session_id=session_id, + trace_id=request_id, + metadata={"message_preview": message[:200], "scope": scope}, + tags=["floating"], + ) + langfuse_handler = tracing.get_langfuse_callback( + trace=trace, span_name="floating_agent", + ) + # Enrich with memory context async with async_session() as db: memory = MemoryMiddleware(db) @@ -150,7 +184,7 @@ async def _handle_floating_request(user_id: str, frame: dict) -> None: set_current_user(user_id) response_chunks: list[str] = [] try: - event_stream = run_floating_stream(user_id, message, context) + event_stream = run_floating_stream(user_id, message, context, langfuse_handler=langfuse_handler) formatter = StreamFormatter(request_id=request_id) async for ws_frame in formatter.format(event_stream): await _publish_frame(user_id, ws_frame.model_dump_json()) @@ -161,6 +195,13 @@ async def _handle_floating_request(user_id: str, frame: dict) -> None: finally: clear_current_user() + # Link prompt and flush trace + if trace is not None: + tracing.link_prompt_to_trace(trace, "floating_system") + response_text = "".join(response_chunks) + trace.update(output=response_text[:500] if response_text else None) + tracing.flush() + # Store episode async with async_session() as db: memory = MemoryMiddleware(db) diff --git a/services/chat/app/tracing.py b/services/chat/app/tracing.py new file mode 100644 index 0000000..aa95d28 --- /dev/null +++ b/services/chat/app/tracing.py @@ -0,0 +1,261 @@ +"""Langfuse tracing & prompt management for the Chat Service. + +Provides: +- ``langfuse`` — singleton Langfuse client (lazy, no-op when keys are missing) +- ``create_trace()`` — start a new trace for a chat request +- ``get_langfuse_callback()`` — LangChain callback handler for a trace/span +- ``get_prompt()`` — fetch a managed prompt from Langfuse by name +- ``flush()`` — ensure all events are sent before shutdown + +All functions gracefully degrade to no-ops when Langfuse is not configured, +so the service works identically with or without observability keys. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from shared.config import settings + +logger = logging.getLogger(__name__) + +# ── Lazy singleton ─────────────────────────────────────────────────────── + +_langfuse_client: Any | None = None +_langfuse_disabled: bool = False + + +def _is_configured() -> bool: + return bool(settings.LANGFUSE_SECRET_KEY and settings.LANGFUSE_PUBLIC_KEY) + + +def _get_langfuse() -> Any | None: + """Return the Langfuse client singleton, or None if not configured.""" + global _langfuse_client, _langfuse_disabled + + if _langfuse_disabled: + return None + + if _langfuse_client is not None: + return _langfuse_client + + if not _is_configured(): + _langfuse_disabled = True + logger.info("tracing: Langfuse keys not set — tracing disabled") + return None + + try: + from langfuse import Langfuse + + _langfuse_client = Langfuse( + secret_key=settings.LANGFUSE_SECRET_KEY, + public_key=settings.LANGFUSE_PUBLIC_KEY, + host=settings.LANGFUSE_HOST, + ) + logger.info("tracing: Langfuse client initialised (host=%s)", settings.LANGFUSE_HOST) + return _langfuse_client + except Exception as exc: + _langfuse_disabled = True + logger.warning("tracing: failed to initialise Langfuse: %s", exc) + return None + + +# ── Trace lifecycle ────────────────────────────────────────────────────── + + +def create_trace( + *, + name: str, + user_id: str, + session_id: str | None = None, + trace_id: str | None = None, + metadata: dict[str, Any] | None = None, + tags: list[str] | None = None, +) -> Any | None: + """Create a Langfuse trace. Returns the trace object, or None if disabled.""" + lf = _get_langfuse() + if lf is None: + return None + + try: + return lf.trace( + id=trace_id, + name=name, + user_id=user_id, + session_id=session_id, + metadata=metadata or {}, + tags=tags or [], + ) + except Exception as exc: + logger.warning("tracing: create_trace failed: %s", exc) + return None + + +# ── LangChain callback handler ────────────────────────────────────────── + + +def get_langfuse_callback( + *, + trace_id: str | None = None, + trace: Any | None = None, + span_name: str | None = None, + update_parent: bool = True, +) -> Any | None: + """Return a ``CallbackHandler`` wired to an existing trace. + + This handler is passed to LangChain's ``ainvoke`` / ``astream`` as a + callback so every LLM generation and tool call is automatically + captured as a nested span inside the trace. + + If both *trace* and *trace_id* are given, *trace* takes precedence. + Returns None when Langfuse is disabled. + """ + lf = _get_langfuse() + if lf is None: + return None + + try: + from langfuse.callback import CallbackHandler + + kwargs: dict[str, Any] = { + "secret_key": settings.LANGFUSE_SECRET_KEY, + "public_key": settings.LANGFUSE_PUBLIC_KEY, + "host": settings.LANGFUSE_HOST, + "update_parent": update_parent, + } + if trace is not None: + kwargs["trace_id"] = trace.id + elif trace_id is not None: + kwargs["trace_id"] = trace_id + if span_name: + kwargs["root_span"] = span_name + + return CallbackHandler(**kwargs) + except Exception as exc: + logger.warning("tracing: get_langfuse_callback failed: %s", exc) + return None + + +# ── Prompt management ──────────────────────────────────────────────────── + + +def get_prompt( + name: str, + *, + version: int | None = None, + label: str | None = None, + fallback: str | None = None, + cache_ttl_seconds: int = 300, +) -> str | None: + """Fetch a managed prompt from Langfuse by name. + + Returns the compiled prompt string, or *fallback* if the prompt is not + found or Langfuse is disabled. + + Parameters + ---------- + name : str + Prompt name as registered in Langfuse. + version : int, optional + Pin to a specific version; omit for the latest production version. + label : str, optional + Fetch by label (e.g. ``"production"``, ``"staging"``). + fallback : str, optional + Value returned when the prompt cannot be fetched. + cache_ttl_seconds : int + How long to cache the prompt locally (default 5 min). + """ + lf = _get_langfuse() + if lf is None: + return fallback + + try: + kwargs: dict[str, Any] = { + "name": name, + "cache_ttl_seconds": cache_ttl_seconds, + } + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + return prompt.prompt + except Exception as exc: + logger.warning("tracing: get_prompt(%s) failed: %s", name, exc) + return fallback + + +def link_prompt_to_trace( + trace: Any, + prompt_name: str, + *, + version: int | None = None, + label: str | None = None, +) -> None: + """Attach a Langfuse prompt reference to a trace/generation. + + Call this *after* creating a generation on the trace to associate the + prompt that was used. The prompt object is fetched and linked so + Langfuse can display prompt→trace associations in the dashboard. + """ + lf = _get_langfuse() + if lf is None or trace is None: + return + + try: + kwargs: dict[str, Any] = {"name": prompt_name} + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + trace.update(metadata={"prompt": {"name": prompt_name, "version": prompt.version}}) + except Exception as exc: + logger.warning("tracing: link_prompt_to_trace(%s) failed: %s", prompt_name, exc) + + +# ── Scoring helper ─────────────────────────────────────────────────────── + + +def score_trace( + trace_id: str, + name: str, + value: float, + *, + comment: str | None = None, +) -> None: + """Post a score to a trace (e.g. user feedback, latency, quality).""" + lf = _get_langfuse() + if lf is None: + return + + try: + lf.score(trace_id=trace_id, name=name, value=value, comment=comment) + except Exception as exc: + logger.warning("tracing: score_trace failed: %s", exc) + + +# ── Shutdown ───────────────────────────────────────────────────────────── + + +def flush() -> None: + """Flush pending Langfuse events. Call this on service shutdown.""" + if _langfuse_client is not None: + try: + _langfuse_client.flush() + except Exception as exc: + logger.warning("tracing: flush failed: %s", exc) + + +def shutdown() -> None: + """Flush and close the Langfuse client.""" + global _langfuse_client, _langfuse_disabled + if _langfuse_client is not None: + try: + _langfuse_client.flush() + _langfuse_client.shutdown() + except Exception as exc: + logger.warning("tracing: shutdown failed: %s", exc) + _langfuse_client = None + _langfuse_disabled = False diff --git a/services/chat/requirements.txt b/services/chat/requirements.txt index 80e1958..21d884c 100644 --- a/services/chat/requirements.txt +++ b/services/chat/requirements.txt @@ -14,3 +14,4 @@ langchain-litellm>=0.3.0 litellm>=1.50.0 openai>=1.50.0 httpx>=0.27.0 +langfuse>=2.0.0 diff --git a/shared/config.py b/shared/config.py index db4b77e..dd41ee2 100644 --- a/shared/config.py +++ b/shared/config.py @@ -76,6 +76,11 @@ class Settings(BaseSettings): MS_TENANT_ID: str = "common" OAUTH_ENCRYPTION_KEY: str = "" + # ── Langfuse (observability) ───────────────────────────────────── + LANGFUSE_SECRET_KEY: str = "" + LANGFUSE_PUBLIC_KEY: str = "" + LANGFUSE_HOST: str = "https://cloud.langfuse.com" + # ── CORS ───────────────────────────────────────────────────────── CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]