diff --git a/services/batch-agent/app/agent_runner.py b/services/batch-agent/app/agent_runner.py index c8c40fa..39980f5 100644 --- a/services/batch-agent/app/agent_runner.py +++ b/services/batch-agent/app/agent_runner.py @@ -193,9 +193,11 @@ async def _run_agent_with_tools( user_message: str, tools: list[Any], max_steps: int, + langfuse_handler: Any | None = None, ) -> str: """Run an LLM agent with tool-calling, returning the final text response.""" - llm = get_llm() + callbacks = [langfuse_handler] if langfuse_handler else None + llm = get_llm(callbacks=callbacks) llm_with_tools = llm.bind_tools(tools) messages: list[Any] = [ SystemMessage(content=system_prompt), @@ -396,6 +398,7 @@ async def _classify_file( file_content: str, projects: list[dict], config_data_types: list[str], + langfuse_handler: Any | None = None, ) -> tuple[str, list[str], str | None]: fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None) @@ -422,7 +425,7 @@ async def _classify_file( projects_list=projects_list, ) - llm = get_llm() + llm = get_llm(callbacks=[langfuse_handler] if langfuse_handler else None) try: response = await llm.ainvoke([ SystemMessage(content=system), @@ -458,7 +461,7 @@ async def _classify_file( # ── Local agent runner (two-step per file) ──────────────────────────────── -async def run_local_agent(user_id: str, trigger_data: dict[str, Any]) -> None: +async def run_local_agent(user_id: str, trigger_data: dict[str, Any], *, langfuse_handler: Any | None = None) -> None: """Execute a local directory agent run. In the microservice world, trigger_data is a serialized dict from @@ -552,6 +555,7 @@ async def run_local_agent(user_id: str, trigger_data: dict[str, Any]) -> None: file_content=file_content, projects=projects, config_data_types=data_types, + langfuse_handler=langfuse_handler, ) # Step 2 — resolve project_id, fetch entities, process @@ -610,6 +614,7 @@ async def run_local_agent(user_id: str, trigger_data: dict[str, Any]) -> None: ), tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, + langfuse_handler=langfuse_handler, ) logger.info( "agent_runner: run=%s file=%r result=%s", @@ -660,7 +665,7 @@ async def run_local_agent(user_id: str, trigger_data: dict[str, Any]) -> None: _CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7 -async def run_cloud_agent(user_id: str, config_id: str) -> None: +async def run_cloud_agent(user_id: str, config_id: str, *, langfuse_handler: Any | None = None) -> None: """Execute a cloud connector agent run. Loads the CloudAgentConfig from DB, decrypts OAuth tokens, fetches @@ -789,6 +794,7 @@ async def run_cloud_agent(user_id: str, config_id: str) -> None: user_message=f"Process this message content:\n\n{content_text[:8000]}", tools=processing_tools, max_steps=_MAX_PROCESSING_STEPS, + langfuse_handler=langfuse_handler, ) except Exception as exc: errors.append(f"LLM processing error for message {msg.id!r}: {exc}") diff --git a/services/batch-agent/app/journey.py b/services/batch-agent/app/journey.py index 5f18922..4f26b2e 100644 --- a/services/batch-agent/app/journey.py +++ b/services/batch-agent/app/journey.py @@ -190,6 +190,7 @@ async def _call_llm_with_tools( system_prompt: str, history: list[dict[str, Any]], tools: list[Any], + langfuse_handler: Any | None = None, ) -> str: """Build LangChain messages from history and invoke the LLM with tools. @@ -203,7 +204,8 @@ async def _call_llm_with_tools( else: messages.append(AIMessage(content=turn["content"])) - llm = get_llm(model=None, temperature=0.4) + callbacks = [langfuse_handler] if langfuse_handler else None + llm = get_llm(model=None, temperature=0.4, callbacks=callbacks) llm_with_tools = llm.bind_tools(tools) tool_map = {tool_def.name: tool_def for tool_def in tools} @@ -247,6 +249,8 @@ async def _call_llm_with_tools( async def handle_journey_start( user_id: str, frame: dict[str, Any], + *, + langfuse_handler: Any | None = None, ) -> dict[str, Any]: """Handle a ``journey_start`` request. @@ -277,6 +281,7 @@ async def handle_journey_start( system_prompt=system_prompt, history=seed_history, tools=list(FILESYSTEM_TOOLS), + langfuse_handler=langfuse_handler, ) session.history.extend(seed_history) @@ -313,6 +318,8 @@ async def handle_journey_start( async def handle_journey_message( user_id: str, frame: dict[str, Any], + *, + langfuse_handler: Any | None = None, ) -> dict[str, Any]: """Handle a ``journey_message`` request. @@ -338,6 +345,7 @@ async def handle_journey_message( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), + langfuse_handler=langfuse_handler, ) session.history.append({"role": "assistant", "content": ai_reply}) @@ -358,6 +366,7 @@ async def handle_journey_message( system_prompt=session.system_prompt, history=session.history, tools=list(FILESYSTEM_TOOLS), + langfuse_handler=langfuse_handler, ) session.history.append({"role": "assistant", "content": nudge_reply}) diff --git a/services/batch-agent/app/llm.py b/services/batch-agent/app/llm.py index 929b358..67f8b0b 100644 --- a/services/batch-agent/app/llm.py +++ b/services/batch-agent/app/llm.py @@ -41,6 +41,7 @@ def get_llm( *, model: str | None = None, temperature: float = 0, + callbacks: list | None = None, ) -> ChatOpenAI | ChatLiteLLM: model = model or settings.LLM_MODEL @@ -48,12 +49,13 @@ def get_llm( os.environ.setdefault("GITHUB_COPILOT_TOKEN_DIR", settings.GITHUB_COPILOT_TOKEN_DIR) if "/" in model: - return ChatLiteLLM(model=model, temperature=temperature) + return ChatLiteLLM(model=model, temperature=temperature, callbacks=callbacks) return ChatOpenAI( model=model, temperature=temperature, api_key=_api_key_for_model(model), + callbacks=callbacks, ) diff --git a/services/batch-agent/app/main.py b/services/batch-agent/app/main.py index 52f9a82..ea9105e 100644 --- a/services/batch-agent/app/main.py +++ b/services/batch-agent/app/main.py @@ -29,6 +29,10 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + # Initialise Langfuse tracing (no-op if keys are missing) + from app.tracing import init_langfuse + init_langfuse() + logger.info("batch-agent: starting Redis consumer") task = asyncio.create_task(start_consumer()) yield @@ -37,6 +41,16 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: await task except asyncio.CancelledError: pass + + from app.tracing import shutdown as shutdown_langfuse + shutdown_langfuse() + + from shared.db import engine + await engine.dispose() + + from shared.redis import redis_client + await redis_client.aclose() + logger.info("batch-agent: Redis consumer stopped") diff --git a/services/batch-agent/app/redis_consumer.py b/services/batch-agent/app/redis_consumer.py index d0947d9..e1967c8 100644 --- a/services/batch-agent/app/redis_consumer.py +++ b/services/batch-agent/app/redis_consumer.py @@ -17,6 +17,7 @@ from typing import Any from shared.redis import redis_client, batch_request_channel, ws_out_channel +import app.tracing as tracing from app.ws_context import set_current_user, clear_current_user logger = logging.getLogger(__name__) @@ -32,15 +33,27 @@ async def _handle_journey_start(user_id: str, data: dict[str, Any]) -> None: """Handle a journey_start request from WS Gateway.""" from app.journey import handle_journey_start + session_id = data.get("session_id", "") set_current_user(user_id) try: - reply = await handle_journey_start(user_id, data) - await _publish_to_user(user_id, reply) + with tracing.trace_span( + name="journey_start", + user_id=user_id, + session_id=session_id, + input=data.get("directory", ""), + metadata={"data_types": data.get("data_types", [])}, + tags=["journey"], + ) as span: + langfuse_handler = tracing.get_langfuse_callback() + reply = await handle_journey_start(user_id, data, langfuse_handler=langfuse_handler) + span.update(output=reply.get("message", "")[:500]) + await _publish_to_user(user_id, reply) + tracing.flush() except Exception as exc: logger.error("batch-agent: journey_start failed user=%s: %s", user_id, exc) await _publish_to_user(user_id, { "type": "journey_reply", - "session_id": data.get("session_id", ""), + "session_id": session_id, "message": f"Journey setup failed: {exc}", "done": True, "prompt_template": None, @@ -53,15 +66,26 @@ async def _handle_journey_message(user_id: str, data: dict[str, Any]) -> None: """Handle a journey_message from WS Gateway.""" from app.journey import handle_journey_message + session_id = data.get("session_id", "") set_current_user(user_id) try: - reply = await handle_journey_message(user_id, data) - await _publish_to_user(user_id, reply) + with tracing.trace_span( + name="journey_message", + user_id=user_id, + session_id=session_id, + input=data.get("message", "")[:200], + tags=["journey"], + ) as span: + langfuse_handler = tracing.get_langfuse_callback() + reply = await handle_journey_message(user_id, data, langfuse_handler=langfuse_handler) + span.update(output=reply.get("message", "")[:500]) + await _publish_to_user(user_id, reply) + tracing.flush() except Exception as exc: logger.error("batch-agent: journey_message failed user=%s: %s", user_id, exc) await _publish_to_user(user_id, { "type": "journey_reply", - "session_id": data.get("session_id", ""), + "session_id": session_id, "message": f"Journey processing failed: {exc}", "done": True, "prompt_template": None, @@ -74,15 +98,28 @@ async def _handle_agent_trigger(user_id: str, data: dict[str, Any]) -> None: """Handle an agent_trigger request from the REST route (forwarded via Redis).""" from app.agent_runner import run_local_agent + run_context = data.get("run_context", {}) + agent_id = run_context.get("agent_id", "") set_current_user(user_id) try: - await run_local_agent(user_id, data) + with tracing.trace_span( + name="agent_trigger", + user_id=user_id, + trace_id=run_context.get("run_id"), + input={"agent_id": agent_id, "directory": data.get("directory", "")}, + metadata={"data_types": data.get("data_types", [])}, + tags=["batch", "agent_run"], + ) as span: + langfuse_handler = tracing.get_langfuse_callback() + await run_local_agent(user_id, data, langfuse_handler=langfuse_handler) + span.update(output={"status": "completed"}) + tracing.flush() except Exception as exc: logger.error("batch-agent: agent_trigger failed user=%s: %s", user_id, exc) await _publish_to_user(user_id, { "type": "run_complete", "status": "error", - "run_context": data.get("run_context", {}), + "run_context": run_context, }) finally: clear_current_user() diff --git a/services/batch-agent/app/tracing.py b/services/batch-agent/app/tracing.py new file mode 100644 index 0000000..613210a --- /dev/null +++ b/services/batch-agent/app/tracing.py @@ -0,0 +1,264 @@ +"""Langfuse tracing & prompt management for the Batch Agent Service (v4 SDK). + +Provides: +- ``init_langfuse()`` — initialise the singleton client at startup +- ``trace_span()`` — context manager that creates a trace + span +- ``get_langfuse_callback()`` — LangChain callback handler (auto-inherits trace) +- ``get_prompt()`` — fetch a managed prompt from Langfuse by name +- ``flush()`` / ``shutdown()`` — lifecycle management + +All functions gracefully degrade to no-ops when Langfuse is not configured, +so the service works identically with or without observability keys. + +Requires ``langfuse >= 3.0.0`` (v4 / "Fast Preview" SDK). +""" + +from __future__ import annotations + +import logging +from contextlib import contextmanager +from typing import Any + +from shared.config import settings + +logger = logging.getLogger(__name__) + +# ── State ──────────────────────────────────────────────────────────────── + +_initialised: bool = False +_disabled: bool = False + + +def _is_configured() -> bool: + return bool(settings.LANGFUSE_SECRET_KEY and settings.LANGFUSE_PUBLIC_KEY) + + +def init_langfuse() -> None: + """Initialise the Langfuse singleton. Call once at startup.""" + global _initialised, _disabled + + if _initialised or _disabled: + return + + if not _is_configured(): + _disabled = True + logger.info("tracing: Langfuse keys not set — tracing disabled") + return + + try: + from langfuse import Langfuse + + Langfuse( + secret_key=settings.LANGFUSE_SECRET_KEY, + public_key=settings.LANGFUSE_PUBLIC_KEY, + host=settings.LANGFUSE_HOST, + ) + _initialised = True + logger.info("tracing: Langfuse client initialised (host=%s)", settings.LANGFUSE_HOST) + except Exception as exc: + _disabled = True + logger.warning("tracing: failed to initialise Langfuse: %s", exc) + + +def _get_client() -> Any | None: + """Return the singleton Langfuse client, or *None* if disabled.""" + if _disabled: + return None + if not _initialised: + init_langfuse() + if _disabled: + return None + try: + from langfuse import get_client + return get_client() + except Exception: + return None + + +# ── Null span (no-op when Langfuse is disabled) ───────────────────────── + + +class _NullSpan: + """Drop-in replacement when Langfuse is disabled.""" + + def update(self, **_: Any) -> None: ... + def set_trace_io(self, **_: Any) -> None: ... + def score_trace(self, **_: Any) -> None: ... + + +# ── Trace context manager ─────────────────────────────────────────────── + + +@contextmanager +def trace_span( + *, + name: str, + user_id: str, + session_id: str | None = None, + trace_id: str | None = None, + input: Any = None, + metadata: dict[str, Any] | None = None, + tags: list[str] | None = None, +): + """Context manager that creates a Langfuse trace/span. + + Yields the span object (or a ``_NullSpan`` if Langfuse is disabled). + A ``CallbackHandler`` created inside this block auto-inherits the trace + context, so there is no need to pass trace IDs manually. + """ + lf = _get_client() + if lf is None: + yield _NullSpan() + return + + try: + from langfuse import Langfuse, propagate_attributes + + trace_ctx: dict[str, str] = {} + if trace_id is not None: + trace_ctx["trace_id"] = Langfuse.create_trace_id(seed=trace_id) + + with lf.start_as_current_observation( + as_type="span", + name=name, + input=input, + metadata=metadata or {}, + **({"trace_context": trace_ctx} if trace_ctx else {}), + ) as span: + with propagate_attributes( + user_id=user_id, + session_id=session_id, + tags=tags or [], + ): + yield span + except Exception as exc: + logger.warning("tracing: trace_span(%s) failed: %s", name, exc) + yield _NullSpan() + + +# ── LangChain callback handler ────────────────────────────────────────── + + +def get_langfuse_callback() -> Any | None: + """Return a LangChain ``CallbackHandler`` that auto-inherits the current trace. + + Must be called inside a ``trace_span()`` block for proper linking. + Returns *None* when Langfuse is disabled. + """ + if _disabled and not _initialised: + return None + + try: + from langfuse.langchain import CallbackHandler + return CallbackHandler() + except Exception as exc: + logger.warning("tracing: get_langfuse_callback failed: %s", exc) + return None + + +# ── Prompt management ──────────────────────────────────────────────────── + + +def get_prompt( + name: str, + *, + version: int | None = None, + label: str | None = None, + fallback: str | None = None, + cache_ttl_seconds: int = 300, +) -> str | None: + """Fetch a managed prompt from Langfuse by name. + + Returns the compiled prompt string, or *fallback* if the prompt is not + found or Langfuse is disabled. + """ + lf = _get_client() + if lf is None: + return fallback + + try: + kwargs: dict[str, Any] = { + "name": name, + "cache_ttl_seconds": cache_ttl_seconds, + } + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + return prompt.prompt + except Exception as exc: + logger.warning("tracing: get_prompt(%s) failed: %s", name, exc) + return fallback + + +def link_prompt_to_trace( + span: Any, + prompt_name: str, + *, + version: int | None = None, + label: str | None = None, +) -> None: + """Attach prompt metadata to a span/trace.""" + lf = _get_client() + if lf is None or isinstance(span, _NullSpan): + return + + try: + kwargs: dict[str, Any] = {"name": prompt_name} + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + span.update(metadata={"prompt": {"name": prompt_name, "version": prompt.version}}) + except Exception as exc: + logger.warning("tracing: link_prompt_to_trace(%s) failed: %s", prompt_name, exc) + + +# ── Scoring helper ─────────────────────────────────────────────────────── + + +def score_trace( + trace_id: str, + name: str, + value: float, + *, + comment: str | None = None, +) -> None: + """Post a score to a trace (e.g. user feedback, latency, quality).""" + lf = _get_client() + if lf is None: + return + + try: + lf.create_score(trace_id=trace_id, name=name, value=value, comment=comment) + except Exception as exc: + logger.warning("tracing: score_trace failed: %s", exc) + + +# ── Shutdown ───────────────────────────────────────────────────────────── + + +def flush() -> None: + """Flush pending Langfuse events.""" + lf = _get_client() + if lf is not None: + try: + lf.flush() + except Exception as exc: + logger.warning("tracing: flush failed: %s", exc) + + +def shutdown() -> None: + """Flush and close the Langfuse client.""" + global _initialised, _disabled + lf = _get_client() + if lf is not None: + try: + lf.flush() + lf.shutdown() + except Exception as exc: + logger.warning("tracing: shutdown failed: %s", exc) + _initialised = False + _disabled = False diff --git a/services/batch-agent/requirements.txt b/services/batch-agent/requirements.txt index 42e9b67..1c61ea5 100644 --- a/services/batch-agent/requirements.txt +++ b/services/batch-agent/requirements.txt @@ -14,6 +14,7 @@ langchain-litellm>=0.3.0 litellm>=1.50.0 openai>=1.50.0 httpx>=0.27.0 +langfuse>=3.0.0 croniter>=2.0.0 google-api-python-client>=2.130.0 google-auth>=2.30.0