feat(langfuse): propagate user_id and session_id to all traces

- Add hash_user_id() to SHA-256 hash user IDs before sending to Langfuse
- Add langfuse_context() helper wrapping propagate_attributes()
- deep_agent: extract session_id from _debug context, wrap all agent
  runs and classifier with langfuse_context(user_id, session_id)
- agent_runner: add session_id param, pass run_id as session for batch
- agent_setup: wrap journey LLM calls with langfuse_context
- Remove redundant metadata dicts (now handled by propagate_attributes)
This commit is contained in:
Roberto Musso
2026-04-10 22:38:02 +02:00
parent 90500a3462
commit a85f8fde29
5 changed files with 104 additions and 23 deletions

View File

@@ -32,7 +32,7 @@ from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agents.filesystem_agent import make_directory_tools from app.agents.filesystem_agent import make_directory_tools
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent from app.core.llm import get_agent_llm, model_for_agent
from app.schemas import AgentConfig from app.schemas import AgentConfig
@@ -260,11 +260,13 @@ async def _call_llm_with_tools(
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
_lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name="journey-setup", name="journey-setup",
metadata={"user_id": user_id or None, "session_id": session_id or None},
input=history[-1]["content"] if history else "", input=history[-1]["content"] if history else "",
) )
if lf else None if lf else None
@@ -286,7 +288,7 @@ async def _call_llm_with_tools(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
resp_text = _as_text(response.content) resp_text = _as_text(response.content)
@@ -342,6 +344,7 @@ async def _call_llm_with_tools(
finally: finally:
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()

View File

@@ -12,9 +12,12 @@ in backend agent-config tables.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func, select from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@@ -182,7 +185,6 @@ async def trigger_agent_run(
if body.last_run_at if body.last_run_at
else None else None
) )
config = LocalAgentConfig( config = LocalAgentConfig(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
user_id=current_user.id, user_id=current_user.id,

View File

@@ -44,7 +44,7 @@ from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent from app.core.llm import get_agent_llm, model_for_agent
from app.core.preprocessors import detect_content_type, preprocess from app.core.preprocessors import detect_content_type, preprocess
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
@@ -227,6 +227,7 @@ async def _run_agent_with_tools(
tools: list[Any], tools: list[Any],
max_steps: int, max_steps: int,
user_id: str = "", user_id: str = "",
session_id: str = "",
langfuse_prompt: Any = None, langfuse_prompt: Any = None,
agent_name: str = "batch-agent", agent_name: str = "batch-agent",
_tool_calls_out: list[str] | None = None, _tool_calls_out: list[str] | None = None,
@@ -246,6 +247,9 @@ async def _run_agent_with_tools(
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
_lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -272,7 +276,7 @@ async def _run_agent_with_tools(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -317,6 +321,7 @@ async def _run_agent_with_tools(
finally: finally:
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -385,7 +390,8 @@ async def _scan_directories(
for file_path in all_files: for file_path in all_files:
try: try:
meta = await execute_on_client(action="get_file_metadata", data={"path": file_path}) meta = await execute_on_client(action="get_file_metadata", data={"path": file_path})
modified_at = meta.get("modifiedAt") # FE sends snake_case keys on the wire (toSnakeCase transform)
modified_at = meta.get("modified_at") or meta.get("modifiedAt")
if modified_at is None: if modified_at is None:
filtered.append(file_path) filtered.append(file_path)
continue continue
@@ -606,7 +612,6 @@ async def run_local_agent(
try: try:
# ── Code: scan directories ─────────────────────────────────── # ── Code: scan directories ───────────────────────────────────
logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id)
file_paths = await _scan_directories( file_paths = await _scan_directories(
paths=config.directory_paths, paths=config.directory_paths,
extensions=config.file_extensions or [], extensions=config.file_extensions or [],
@@ -685,6 +690,7 @@ async def run_local_agent(
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
session_id=run_id,
langfuse_prompt=prompt_obj, langfuse_prompt=prompt_obj,
agent_name="unified-processor", agent_name="unified-processor",
_tool_calls_out=file_tool_calls, _tool_calls_out=file_tool_calls,
@@ -916,6 +922,7 @@ async def run_cloud_agent(
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
session_id=run_id,
langfuse_prompt=cloud_prompt_obj, langfuse_prompt=cloud_prompt_obj,
agent_name="cloud-processor", agent_name="cloud-processor",
) )

View File

@@ -16,7 +16,7 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent from app.core.llm import get_agent_llm, model_for_agent
from app.core.memory_middleware import MemoryMiddleware from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
@@ -148,6 +148,15 @@ def _trace_id_from_context(context: dict[str, Any]) -> str | None:
return None return None
def _session_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
session_id = debug.get("session_id")
if isinstance(session_id, str) and session_id:
return session_id
return None
def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: def _context_for_model(context: dict[str, Any]) -> dict[str, Any]:
sanitized = dict(context) sanitized = dict(context)
sanitized.pop("_debug", None) sanitized.pop("_debug", None)
@@ -550,18 +559,25 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
_, classifier_prompt_obj = get_prompt_or_fallback( _, classifier_prompt_obj = get_prompt_or_fallback(
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT "floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
) )
if lf:
with lf.start_as_current_observation( # Extract user/session from context for Langfuse attribution
as_type="generation", _debug = context.get("_debug") if isinstance(context, dict) else None
name="floating-classifier", _lf_user = (_debug or {}).get("user_id") if isinstance(_debug, dict) else None
model=model_for_agent("classifier"), _lf_session = (_debug or {}).get("session_id") if isinstance(_debug, dict) else None
prompt=classifier_prompt_obj,
input=classifier_messages, with langfuse_context(user_id=_lf_user, session_id=_lf_session):
) as gen: if lf:
with lf.start_as_current_observation(
as_type="generation",
name="floating-classifier",
model=model_for_agent("classifier"),
prompt=classifier_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages) response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
parsed = _parse_json_object(_as_text(response.content)) parsed = _parse_json_object(_as_text(response.content))
if parsed is not None: if parsed is not None:
domain = _normalize_domain_payload(parsed, project_id) domain = _normalize_domain_payload(parsed, project_id)
@@ -590,6 +606,7 @@ async def _run_single_agent(
agent_name: str = "agent", agent_name: str = "agent",
) -> str: ) -> str:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse() lf = get_langfuse()
llm = get_agent_llm(agent_name) llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
@@ -610,6 +627,9 @@ async def _run_single_agent(
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -636,7 +656,7 @@ async def _run_single_agent(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -698,6 +718,7 @@ async def _run_single_agent(
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -713,6 +734,7 @@ async def _run_single_agent_stream(
agent_name: str = "agent", agent_name: str = "agent",
) -> AsyncGenerator[tuple[str, Any], None]: ) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse() lf = get_langfuse()
llm = get_agent_llm(agent_name) llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
@@ -734,6 +756,9 @@ async def _run_single_agent_stream(
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -761,7 +786,7 @@ async def _run_single_agent_stream(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -841,6 +866,7 @@ async def _run_single_agent_stream(
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()

View File

@@ -39,8 +39,10 @@ Linking a prompt to a generation::
from __future__ import annotations from __future__ import annotations
import hashlib
import logging import logging
from typing import Any from contextlib import contextmanager
from typing import Any, Generator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -145,3 +147,44 @@ def extract_usage(response: Any) -> dict[str, int]:
"output": int(meta.get("output_tokens", 0)), "output": int(meta.get("output_tokens", 0)),
"total": int(meta.get("total_tokens", 0)), "total": int(meta.get("total_tokens", 0)),
} }
def hash_user_id(user_id: str) -> str:
"""Return a SHA-256 hash of *user_id* for use as Langfuse ``user_id``.
This avoids sending raw database UUIDs to external observability services
while still providing a stable, deterministic identifier for per-user
metrics in the Langfuse dashboard.
"""
return hashlib.sha256(user_id.encode()).hexdigest()
@contextmanager
def langfuse_context(
user_id: str | None = None,
session_id: str | None = None,
) -> Generator[None, None, None]:
"""Propagate ``user_id`` (hashed) and ``session_id`` to all Langfuse observations.
No-op when Langfuse is not configured or parameters are empty.
"""
lf = get_langfuse()
if lf is None or (not user_id and not session_id):
yield
return
try:
from langfuse import propagate_attributes
except ImportError:
logger.debug("langfuse: propagate_attributes not available — skipping context")
yield
return
attrs: dict[str, str] = {}
if user_id:
attrs["user_id"] = hash_user_id(user_id)
if session_id:
attrs["session_id"] = session_id
with propagate_attributes(**attrs):
yield