- New app/core/langfuse_client.py: lazy singleton client, get_prompt_or_fallback() helper (returns raw template + prompt obj for linking), extract_usage() for token counts. No-ops when LANGFUSE_* env vars are not set. - deep_agent.py: home-agent and floating-agent runs wrapped in spans; each ainvoke wrapped in a generation with model/input/output/usage; prompts fetched from Langfuse (adiuva-home-agent, adiuva-floating-agent, adiuva-floating-classifier) with hardcoded fallback. - agent_runner.py: step1-classifier and step2-processor LLM calls traced; batch agent _run_agent_with_tools spans + generations; cloud-processor included. Prompts: adiuva-step1-classifier, adiuva-step2-processor, adiuva-cloud-processor. - agent_setup.py: journey-setup span + generation per ainvoke; prompt_obj stored on JourneySession and reused across turns. Prompt: journey_system. - settings.py: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_HOST added. - .env.example: Langfuse section with EU/US/self-hosted host comments. - requirements.txt: langfuse>=2.0.0. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
115 lines
3.7 KiB
Python
115 lines
3.7 KiB
Python
"""Langfuse observability — singleton client and prompt helpers.
|
|
|
|
If LANGFUSE_SECRET_KEY / LANGFUSE_PUBLIC_KEY are not set,
|
|
all helpers are no-ops so the app works without Langfuse configured.
|
|
|
|
Usage
|
|
-----
|
|
Tracing::
|
|
|
|
from app.core.langfuse_client import get_langfuse
|
|
|
|
lf = get_langfuse()
|
|
if lf:
|
|
with lf.start_as_current_observation(as_type="span", name="my-agent") as span:
|
|
span.update(input=user_message)
|
|
# ... do work ...
|
|
span.update(output=result)
|
|
lf.flush()
|
|
|
|
Prompt management::
|
|
|
|
from app.core.langfuse_client import get_prompt_or_fallback
|
|
|
|
text, prompt_obj = get_prompt_or_fallback("home_system", FALLBACK_PROMPT)
|
|
# Use text as the system prompt; pass prompt_obj to generations for linking.
|
|
|
|
Linking a prompt to a generation::
|
|
|
|
with lf.start_as_current_observation(
|
|
as_type="generation",
|
|
name="llm-call",
|
|
model="gpt-4o",
|
|
prompt=prompt_obj, # links generation → prompt version in the UI
|
|
input=messages,
|
|
) as gen:
|
|
response = await llm.ainvoke(messages)
|
|
gen.update(output=response.content, usage=_usage(response))
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_client: Any = None
|
|
_initialized: bool = False
|
|
|
|
|
|
def get_langfuse() -> Any | None:
|
|
"""Return the Langfuse singleton, or ``None`` when not configured."""
|
|
global _client, _initialized
|
|
if _initialized:
|
|
return _client
|
|
_initialized = True
|
|
|
|
from app.config.settings import settings # local import to avoid circular deps
|
|
|
|
if not settings.LANGFUSE_SECRET_KEY or not settings.LANGFUSE_PUBLIC_KEY:
|
|
logger.debug("langfuse: not configured — observability disabled")
|
|
return None
|
|
|
|
try:
|
|
from langfuse import Langfuse
|
|
|
|
_client = Langfuse(
|
|
secret_key=settings.LANGFUSE_SECRET_KEY,
|
|
public_key=settings.LANGFUSE_PUBLIC_KEY,
|
|
host=settings.LANGFUSE_HOST,
|
|
)
|
|
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST)
|
|
except Exception as exc:
|
|
logger.warning("langfuse: failed to initialize: %s", exc)
|
|
_client = None
|
|
|
|
return _client
|
|
|
|
|
|
def get_prompt_or_fallback(name: str, fallback: str) -> tuple[str, Any]:
|
|
"""Fetch a text prompt from Langfuse; fall back to ``fallback`` on any error.
|
|
|
|
Returns ``(prompt_text, prompt_obj_or_None)``.
|
|
|
|
* ``prompt_text`` — the raw template string (variables not yet substituted).
|
|
Callers perform variable substitution with Python's ``.format()``.
|
|
* ``prompt_obj`` — the Langfuse prompt object, or ``None`` when Langfuse is
|
|
unavailable / the fetch failed. Pass this to generation observations so
|
|
Langfuse links the generation to the exact prompt version in the UI.
|
|
"""
|
|
lf = get_langfuse()
|
|
if lf is None:
|
|
return fallback, None
|
|
|
|
try:
|
|
prompt = lf.get_prompt(name, label="production", fallback=fallback)
|
|
# For text-type prompts .prompt holds the raw template string.
|
|
raw = prompt.prompt if hasattr(prompt, "prompt") and isinstance(prompt.prompt, str) else fallback
|
|
return raw, prompt
|
|
except Exception as exc:
|
|
logger.warning("langfuse: get_prompt %r failed: %s — using fallback", name, exc)
|
|
return fallback, None
|
|
|
|
|
|
def extract_usage(response: Any) -> dict[str, int]:
|
|
"""Extract token usage from a LangChain AI message into Langfuse format."""
|
|
meta = getattr(response, "usage_metadata", None)
|
|
if not meta:
|
|
return {}
|
|
return {
|
|
"input": int(meta.get("input_tokens", 0)),
|
|
"output": int(meta.get("output_tokens", 0)),
|
|
"total": int(meta.get("total_tokens", 0)),
|
|
}
|