From a85f8fde2900f6f6d90412980def9ac019bccb88 Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Fri, 10 Apr 2026 22:38:02 +0200 Subject: [PATCH] feat(langfuse): propagate user_id and session_id to all traces - Add hash_user_id() to SHA-256 hash user IDs before sending to Langfuse - Add langfuse_context() helper wrapping propagate_attributes() - deep_agent: extract session_id from _debug context, wrap all agent runs and classifier with langfuse_context(user_id, session_id) - agent_runner: add session_id param, pass run_id as session for batch - agent_setup: wrap journey LLM calls with langfuse_context - Remove redundant metadata dicts (now handled by propagate_attributes) --- app/api/routes/agent_setup.py | 9 ++++-- app/api/routes/agents.py | 4 ++- app/core/agent_runner.py | 15 +++++++--- app/core/deep_agent.py | 54 ++++++++++++++++++++++++++--------- app/core/langfuse_client.py | 45 ++++++++++++++++++++++++++++- 5 files changed, 104 insertions(+), 23 deletions(-) diff --git a/app/api/routes/agent_setup.py b/app/api/routes/agent_setup.py index d833632..7ff4e74 100644 --- a/app/api/routes/agent_setup.py +++ b/app/api/routes/agent_setup.py @@ -32,7 +32,7 @@ from typing import Any from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from app.agents.filesystem_agent import make_directory_tools -from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback +from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context from app.core.llm import get_agent_llm, model_for_agent from app.schemas import AgentConfig @@ -260,11 +260,13 @@ async def _call_llm_with_tools( llm_with_tools = llm.bind_tools(tools) tool_map = {tool_def.name: tool_def for tool_def in tools} + _lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None) + _lf_ctx.__enter__() + _span_ctx = ( lf.start_as_current_observation( as_type="span", name="journey-setup", - metadata={"user_id": user_id or None, "session_id": session_id or None}, input=history[-1]["content"] if history else "", ) if lf else None @@ -286,7 +288,7 @@ async def _call_llm_with_tools( _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) if _gen_ctx: - _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen.update(output=_as_text(response.content), usage_details=extract_usage(response)) _gen_ctx.__exit__(None, None, None) resp_text = _as_text(response.content) @@ -342,6 +344,7 @@ async def _call_llm_with_tools( finally: if _span_ctx: _span_ctx.__exit__(None, None, None) + _lf_ctx.__exit__(None, None, None) if lf: lf.flush() diff --git a/app/api/routes/agents.py b/app/api/routes/agents.py index 0a66a65..8a9d24d 100644 --- a/app/api/routes/agents.py +++ b/app/api/routes/agents.py @@ -12,9 +12,12 @@ in backend agent-config tables. from __future__ import annotations import asyncio +import logging import uuid from datetime import datetime, timedelta, timezone +logger = logging.getLogger(__name__) + from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -182,7 +185,6 @@ async def trigger_agent_run( if body.last_run_at else None ) - config = LocalAgentConfig( id=str(uuid.uuid4()), user_id=current_user.id, diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index a91d1da..9fda3c7 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -44,7 +44,7 @@ from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.device_manager import DeviceConnectionManager -from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback +from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context from app.core.llm import get_agent_llm, model_for_agent from app.core.preprocessors import detect_content_type, preprocess from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor @@ -227,6 +227,7 @@ async def _run_agent_with_tools( tools: list[Any], max_steps: int, user_id: str = "", + session_id: str = "", langfuse_prompt: Any = None, agent_name: str = "batch-agent", _tool_calls_out: list[str] | None = None, @@ -246,6 +247,9 @@ async def _run_agent_with_tools( tool_map = {tool_def.name: tool_def for tool_def in tools} + _lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None) + _lf_ctx.__enter__() + _span_ctx = ( lf.start_as_current_observation( as_type="span", @@ -272,7 +276,7 @@ async def _run_agent_with_tools( _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) if _gen_ctx: - _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen.update(output=_as_text(response.content), usage_details=extract_usage(response)) _gen_ctx.__exit__(None, None, None) messages.append(response) @@ -317,6 +321,7 @@ async def _run_agent_with_tools( finally: if _span_ctx: _span_ctx.__exit__(None, None, None) + _lf_ctx.__exit__(None, None, None) if lf: lf.flush() @@ -385,7 +390,8 @@ async def _scan_directories( for file_path in all_files: try: meta = await execute_on_client(action="get_file_metadata", data={"path": file_path}) - modified_at = meta.get("modifiedAt") + # FE sends snake_case keys on the wire (toSnakeCase transform) + modified_at = meta.get("modified_at") or meta.get("modifiedAt") if modified_at is None: filtered.append(file_path) continue @@ -606,7 +612,6 @@ async def run_local_agent( try: # ── Code: scan directories ─────────────────────────────────── - logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id) file_paths = await _scan_directories( paths=config.directory_paths, extensions=config.file_extensions or [], @@ -685,6 +690,7 @@ async def run_local_agent( tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, user_id=user_id, + session_id=run_id, langfuse_prompt=prompt_obj, agent_name="unified-processor", _tool_calls_out=file_tool_calls, @@ -916,6 +922,7 @@ async def run_cloud_agent( tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, user_id=user_id, + session_id=run_id, langfuse_prompt=cloud_prompt_obj, agent_name="cloud-processor", ) diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index 44a7d1d..e549ef2 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -16,7 +16,7 @@ from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS -from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback +from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context from app.core.llm import get_agent_llm, model_for_agent from app.core.memory_middleware import MemoryMiddleware from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector @@ -148,6 +148,15 @@ def _trace_id_from_context(context: dict[str, Any]) -> str | None: return None +def _session_id_from_context(context: dict[str, Any]) -> str | None: + debug = context.get("_debug") + if isinstance(debug, dict): + session_id = debug.get("session_id") + if isinstance(session_id, str) and session_id: + return session_id + return None + + def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: sanitized = dict(context) sanitized.pop("_debug", None) @@ -550,18 +559,25 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[ _, classifier_prompt_obj = get_prompt_or_fallback( "floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT ) - if lf: - with lf.start_as_current_observation( - as_type="generation", - name="floating-classifier", - model=model_for_agent("classifier"), - prompt=classifier_prompt_obj, - input=classifier_messages, - ) as gen: + + # Extract user/session from context for Langfuse attribution + _debug = context.get("_debug") if isinstance(context, dict) else None + _lf_user = (_debug or {}).get("user_id") if isinstance(_debug, dict) else None + _lf_session = (_debug or {}).get("session_id") if isinstance(_debug, dict) else None + + with langfuse_context(user_id=_lf_user, session_id=_lf_session): + if lf: + with lf.start_as_current_observation( + as_type="generation", + name="floating-classifier", + model=model_for_agent("classifier"), + prompt=classifier_prompt_obj, + input=classifier_messages, + ) as gen: + response = await llm.ainvoke(classifier_messages) + gen.update(output=_as_text(response.content), usage_details=extract_usage(response)) + else: response = await llm.ainvoke(classifier_messages) - gen.update(output=_as_text(response.content), usage=extract_usage(response)) - else: - response = await llm.ainvoke(classifier_messages) parsed = _parse_json_object(_as_text(response.content)) if parsed is not None: domain = _normalize_domain_payload(parsed, project_id) @@ -590,6 +606,7 @@ async def _run_single_agent( agent_name: str = "agent", ) -> str: trace_id = _trace_id_from_context(context) + session_id = _session_id_from_context(context) lf = get_langfuse() llm = get_agent_llm(agent_name) tools = _all_tools_for_user(user_id, trace_id) @@ -610,6 +627,9 @@ async def _run_single_agent( collected: list[dict[str, Any]] = [] set_tool_result_collector(collected) + _lf_ctx = langfuse_context(user_id=user_id, session_id=session_id) + _lf_ctx.__enter__() + _span_ctx = ( lf.start_as_current_observation( as_type="span", @@ -636,7 +656,7 @@ async def _run_single_agent( _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) if _gen_ctx: - _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen.update(output=_as_text(response.content), usage_details=extract_usage(response)) _gen_ctx.__exit__(None, None, None) messages.append(response) @@ -698,6 +718,7 @@ async def _run_single_agent( clear_tool_result_collector() if _span_ctx: _span_ctx.__exit__(None, None, None) + _lf_ctx.__exit__(None, None, None) if lf: lf.flush() @@ -713,6 +734,7 @@ async def _run_single_agent_stream( agent_name: str = "agent", ) -> AsyncGenerator[tuple[str, Any], None]: trace_id = _trace_id_from_context(context) + session_id = _session_id_from_context(context) lf = get_langfuse() llm = get_agent_llm(agent_name) tools = _all_tools_for_user(user_id, trace_id) @@ -734,6 +756,9 @@ async def _run_single_agent_stream( collected: list[dict[str, Any]] = [] set_tool_result_collector(collected) + _lf_ctx = langfuse_context(user_id=user_id, session_id=session_id) + _lf_ctx.__enter__() + _span_ctx = ( lf.start_as_current_observation( as_type="span", @@ -761,7 +786,7 @@ async def _run_single_agent_stream( _gen = _gen_ctx.__enter__() if _gen_ctx else None response: AIMessage = await llm_with_tools.ainvoke(messages) if _gen_ctx: - _gen.update(output=_as_text(response.content), usage=extract_usage(response)) + _gen.update(output=_as_text(response.content), usage_details=extract_usage(response)) _gen_ctx.__exit__(None, None, None) messages.append(response) @@ -841,6 +866,7 @@ async def _run_single_agent_stream( clear_tool_result_collector() if _span_ctx: _span_ctx.__exit__(None, None, None) + _lf_ctx.__exit__(None, None, None) if lf: lf.flush() diff --git a/app/core/langfuse_client.py b/app/core/langfuse_client.py index b7f9b37..954b876 100644 --- a/app/core/langfuse_client.py +++ b/app/core/langfuse_client.py @@ -39,8 +39,10 @@ Linking a prompt to a generation:: from __future__ import annotations +import hashlib import logging -from typing import Any +from contextlib import contextmanager +from typing import Any, Generator logger = logging.getLogger(__name__) @@ -145,3 +147,44 @@ def extract_usage(response: Any) -> dict[str, int]: "output": int(meta.get("output_tokens", 0)), "total": int(meta.get("total_tokens", 0)), } + + +def hash_user_id(user_id: str) -> str: + """Return a SHA-256 hash of *user_id* for use as Langfuse ``user_id``. + + This avoids sending raw database UUIDs to external observability services + while still providing a stable, deterministic identifier for per-user + metrics in the Langfuse dashboard. + """ + return hashlib.sha256(user_id.encode()).hexdigest() + + +@contextmanager +def langfuse_context( + user_id: str | None = None, + session_id: str | None = None, +) -> Generator[None, None, None]: + """Propagate ``user_id`` (hashed) and ``session_id`` to all Langfuse observations. + + No-op when Langfuse is not configured or parameters are empty. + """ + lf = get_langfuse() + if lf is None or (not user_id and not session_id): + yield + return + + try: + from langfuse import propagate_attributes + except ImportError: + logger.debug("langfuse: propagate_attributes not available — skipping context") + yield + return + + attrs: dict[str, str] = {} + if user_id: + attrs["user_id"] = hash_user_id(user_id) + if session_id: + attrs["session_id"] = session_id + + with propagate_attributes(**attrs): + yield