From 55500cc818cf5830dcc38f280783fd51be601ed9 Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Mon, 23 Mar 2026 22:30:36 +0100 Subject: [PATCH] feat(batch-agent): add Langfuse prompt management - _get_system_prompt helper: fetches managed prompts from Langfuse with hardcoded fallback (same pattern as chat service) - journey.py: journey_system prompt manageable via Langfuse - agent_runner.py: batch_file_classifier, batch_processing, batch_cloud_processing prompts all manageable via Langfuse - redis_consumer.py: link_prompt_to_trace for all three handlers --- services/batch-agent/app/agent_runner.py | 17 ++++++++++++++--- services/batch-agent/app/journey.py | 6 +++++- services/batch-agent/app/redis_consumer.py | 3 +++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/services/batch-agent/app/agent_runner.py b/services/batch-agent/app/agent_runner.py index 39980f5..f9fe00c 100644 --- a/services/batch-agent/app/agent_runner.py +++ b/services/batch-agent/app/agent_runner.py @@ -28,6 +28,7 @@ from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.llm import get_llm from app.ws_context import execute_on_client, set_current_user, clear_current_user +import app.tracing as tracing from shared.db import async_session from shared.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig from shared.redis import redis_client, ws_out_channel @@ -166,6 +167,12 @@ and what you created. """ +def _get_system_prompt(langfuse_name: str, fallback: str) -> str: + """Fetch a managed prompt from Langfuse, falling back to the hardcoded string.""" + managed = tracing.get_prompt(langfuse_name, fallback=None) + return managed if managed is not None else fallback + + # ── LLM tool-calling loop ───────────────────────────────────────────────── @@ -420,7 +427,7 @@ async def _classify_file( if d in _DOMAIN_DESCRIPTIONS ) - system = _STEP1_SYSTEM_PROMPT.format( + system = _get_system_prompt("batch_file_classifier", _STEP1_SYSTEM_PROMPT).format( domain_definitions=domain_definitions, projects_list=projects_list, ) @@ -597,7 +604,9 @@ async def run_local_agent(user_id: str, trigger_data: dict[str, Any], *, langfus existing_context = "\n\n".join(existing_blocks) - system_prompt = _PROCESSING_SYSTEM_PROMPT.format( + system_prompt = _get_system_prompt( + "batch_processing", _PROCESSING_SYSTEM_PROMPT + ).format( existing_context=existing_context, project_context=project_context, data_types=", ".join(domains), @@ -781,7 +790,9 @@ async def run_cloud_agent(user_id: str, config_id: str, *, langfuse_handler: Any continue items_processed += 1 - processing_prompt = _CLOUD_PROCESSING_PROMPT.format( + processing_prompt = _get_system_prompt( + "batch_cloud_processing", _CLOUD_PROCESSING_PROMPT + ).format( data_types=", ".join(config.data_types), project_context="Determine the appropriate project from the message context.", file_list=f"Message from {config.provider} (id: {msg.id})", diff --git a/services/batch-agent/app/journey.py b/services/batch-agent/app/journey.py index 4f26b2e..a0f3655 100644 --- a/services/batch-agent/app/journey.py +++ b/services/batch-agent/app/journey.py @@ -26,6 +26,7 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, Tool from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.llm import get_llm +import app.tracing as tracing logger = logging.getLogger(__name__) @@ -144,7 +145,10 @@ def _build_system_prompt( if existing_template else "" ) - return _SYSTEM_PROMPT_TEMPLATE.format( + # Try Langfuse-managed prompt first, fall back to hardcoded template + managed = tracing.get_prompt("journey_system", fallback=None) + template = managed if managed is not None else _SYSTEM_PROMPT_TEMPLATE + return template.format( directory=directory, data_types=", ".join(data_types), template_start=_TEMPLATE_START, diff --git a/services/batch-agent/app/redis_consumer.py b/services/batch-agent/app/redis_consumer.py index e1967c8..12b1c5a 100644 --- a/services/batch-agent/app/redis_consumer.py +++ b/services/batch-agent/app/redis_consumer.py @@ -46,6 +46,7 @@ async def _handle_journey_start(user_id: str, data: dict[str, Any]) -> None: ) as span: langfuse_handler = tracing.get_langfuse_callback() reply = await handle_journey_start(user_id, data, langfuse_handler=langfuse_handler) + tracing.link_prompt_to_trace(span, "journey_system") span.update(output=reply.get("message", "")[:500]) await _publish_to_user(user_id, reply) tracing.flush() @@ -78,6 +79,7 @@ async def _handle_journey_message(user_id: str, data: dict[str, Any]) -> None: ) as span: langfuse_handler = tracing.get_langfuse_callback() reply = await handle_journey_message(user_id, data, langfuse_handler=langfuse_handler) + tracing.link_prompt_to_trace(span, "journey_system") span.update(output=reply.get("message", "")[:500]) await _publish_to_user(user_id, reply) tracing.flush() @@ -112,6 +114,7 @@ async def _handle_agent_trigger(user_id: str, data: dict[str, Any]) -> None: ) as span: langfuse_handler = tracing.get_langfuse_callback() await run_local_agent(user_id, data, langfuse_handler=langfuse_handler) + tracing.link_prompt_to_trace(span, "batch_processing") span.update(output={"status": "completed"}) tracing.flush() except Exception as exc: