"""Langfuse tracing & prompt management for the Chat Service. Provides: - ``langfuse`` — singleton Langfuse client (lazy, no-op when keys are missing) - ``create_trace()`` — start a new trace for a chat request - ``get_langfuse_callback()`` — LangChain callback handler for a trace/span - ``get_prompt()`` — fetch a managed prompt from Langfuse by name - ``flush()`` — ensure all events are sent before shutdown All functions gracefully degrade to no-ops when Langfuse is not configured, so the service works identically with or without observability keys. """ from __future__ import annotations import logging from typing import Any from shared.config import settings logger = logging.getLogger(__name__) # ── Lazy singleton ─────────────────────────────────────────────────────── _langfuse_client: Any | None = None _langfuse_disabled: bool = False def _is_configured() -> bool: return bool(settings.LANGFUSE_SECRET_KEY and settings.LANGFUSE_PUBLIC_KEY) def _get_langfuse() -> Any | None: """Return the Langfuse client singleton, or None if not configured.""" global _langfuse_client, _langfuse_disabled if _langfuse_disabled: return None if _langfuse_client is not None: return _langfuse_client if not _is_configured(): _langfuse_disabled = True logger.info("tracing: Langfuse keys not set — tracing disabled") return None try: from langfuse import Langfuse _langfuse_client = Langfuse( secret_key=settings.LANGFUSE_SECRET_KEY, public_key=settings.LANGFUSE_PUBLIC_KEY, host=settings.LANGFUSE_HOST, ) logger.info("tracing: Langfuse client initialised (host=%s)", settings.LANGFUSE_HOST) return _langfuse_client except Exception as exc: _langfuse_disabled = True logger.warning("tracing: failed to initialise Langfuse: %s", exc) return None # ── Trace lifecycle ────────────────────────────────────────────────────── def create_trace( *, name: str, user_id: str, session_id: str | None = None, trace_id: str | None = None, metadata: dict[str, Any] | None = None, tags: list[str] | None = None, ) -> Any | None: """Create a Langfuse trace. Returns the trace object, or None if disabled.""" lf = _get_langfuse() if lf is None: return None try: return lf.trace( id=trace_id, name=name, user_id=user_id, session_id=session_id, metadata=metadata or {}, tags=tags or [], ) except Exception as exc: logger.warning("tracing: create_trace failed: %s", exc) return None # ── LangChain callback handler ────────────────────────────────────────── def get_langfuse_callback( *, trace_id: str | None = None, trace: Any | None = None, span_name: str | None = None, update_parent: bool = True, ) -> Any | None: """Return a ``CallbackHandler`` wired to an existing trace. This handler is passed to LangChain's ``ainvoke`` / ``astream`` as a callback so every LLM generation and tool call is automatically captured as a nested span inside the trace. If both *trace* and *trace_id* are given, *trace* takes precedence. Returns None when Langfuse is disabled. """ lf = _get_langfuse() if lf is None: return None try: from langfuse.callback import CallbackHandler kwargs: dict[str, Any] = { "secret_key": settings.LANGFUSE_SECRET_KEY, "public_key": settings.LANGFUSE_PUBLIC_KEY, "host": settings.LANGFUSE_HOST, "update_parent": update_parent, } if trace is not None: kwargs["trace_id"] = trace.id elif trace_id is not None: kwargs["trace_id"] = trace_id if span_name: kwargs["root_span"] = span_name return CallbackHandler(**kwargs) except Exception as exc: logger.warning("tracing: get_langfuse_callback failed: %s", exc) return None # ── Prompt management ──────────────────────────────────────────────────── def get_prompt( name: str, *, version: int | None = None, label: str | None = None, fallback: str | None = None, cache_ttl_seconds: int = 300, ) -> str | None: """Fetch a managed prompt from Langfuse by name. Returns the compiled prompt string, or *fallback* if the prompt is not found or Langfuse is disabled. Parameters ---------- name : str Prompt name as registered in Langfuse. version : int, optional Pin to a specific version; omit for the latest production version. label : str, optional Fetch by label (e.g. ``"production"``, ``"staging"``). fallback : str, optional Value returned when the prompt cannot be fetched. cache_ttl_seconds : int How long to cache the prompt locally (default 5 min). """ lf = _get_langfuse() if lf is None: return fallback try: kwargs: dict[str, Any] = { "name": name, "cache_ttl_seconds": cache_ttl_seconds, } if version is not None: kwargs["version"] = version if label is not None: kwargs["label"] = label prompt = lf.get_prompt(**kwargs) return prompt.prompt except Exception as exc: logger.warning("tracing: get_prompt(%s) failed: %s", name, exc) return fallback def link_prompt_to_trace( trace: Any, prompt_name: str, *, version: int | None = None, label: str | None = None, ) -> None: """Attach a Langfuse prompt reference to a trace/generation. Call this *after* creating a generation on the trace to associate the prompt that was used. The prompt object is fetched and linked so Langfuse can display prompt→trace associations in the dashboard. """ lf = _get_langfuse() if lf is None or trace is None: return try: kwargs: dict[str, Any] = {"name": prompt_name} if version is not None: kwargs["version"] = version if label is not None: kwargs["label"] = label prompt = lf.get_prompt(**kwargs) trace.update(metadata={"prompt": {"name": prompt_name, "version": prompt.version}}) except Exception as exc: logger.warning("tracing: link_prompt_to_trace(%s) failed: %s", prompt_name, exc) # ── Scoring helper ─────────────────────────────────────────────────────── def score_trace( trace_id: str, name: str, value: float, *, comment: str | None = None, ) -> None: """Post a score to a trace (e.g. user feedback, latency, quality).""" lf = _get_langfuse() if lf is None: return try: lf.score(trace_id=trace_id, name=name, value=value, comment=comment) except Exception as exc: logger.warning("tracing: score_trace failed: %s", exc) # ── Shutdown ───────────────────────────────────────────────────────────── def flush() -> None: """Flush pending Langfuse events. Call this on service shutdown.""" if _langfuse_client is not None: try: _langfuse_client.flush() except Exception as exc: logger.warning("tracing: flush failed: %s", exc) def shutdown() -> None: """Flush and close the Langfuse client.""" global _langfuse_client, _langfuse_disabled if _langfuse_client is not None: try: _langfuse_client.flush() _langfuse_client.shutdown() except Exception as exc: logger.warning("tracing: shutdown failed: %s", exc) _langfuse_client = None _langfuse_disabled = False