diff --git a/services/batch-agent/app/agent_runner.py b/services/batch-agent/app/agent_runner.py index f9fe00c..d692cde 100644 --- a/services/batch-agent/app/agent_runner.py +++ b/services/batch-agent/app/agent_runner.py @@ -167,12 +167,6 @@ and what you created. """ -def _get_system_prompt(langfuse_name: str, fallback: str) -> str: - """Fetch a managed prompt from Langfuse, falling back to the hardcoded string.""" - managed = tracing.get_prompt(langfuse_name, fallback=None) - return managed if managed is not None else fallback - - # ── LLM tool-calling loop ───────────────────────────────────────────────── @@ -427,9 +421,13 @@ async def _classify_file( if d in _DOMAIN_DESCRIPTIONS ) - system = _get_system_prompt("batch_file_classifier", _STEP1_SYSTEM_PROMPT).format( - domain_definitions=domain_definitions, - projects_list=projects_list, + system = tracing.compile_prompt( + "batch_file_classifier", + fallback=_STEP1_SYSTEM_PROMPT, + variables={ + "domain_definitions": domain_definitions, + "projects_list": projects_list, + }, ) llm = get_llm(callbacks=[langfuse_handler] if langfuse_handler else None) @@ -604,13 +602,15 @@ async def run_local_agent(user_id: str, trigger_data: dict[str, Any], *, langfus existing_context = "\n\n".join(existing_blocks) - system_prompt = _get_system_prompt( - "batch_processing", _PROCESSING_SYSTEM_PROMPT - ).format( - existing_context=existing_context, - project_context=project_context, - data_types=", ".join(domains), - custom_prompt_section=custom_section, + system_prompt = tracing.compile_prompt( + "batch_processing", + fallback=_PROCESSING_SYSTEM_PROMPT, + variables={ + "existing_context": existing_context, + "project_context": project_context, + "data_types": ", ".join(domains), + "custom_prompt_section": custom_section, + }, ) processing_tools = _build_processing_tools(domains) @@ -790,13 +790,15 @@ async def run_cloud_agent(user_id: str, config_id: str, *, langfuse_handler: Any continue items_processed += 1 - processing_prompt = _get_system_prompt( - "batch_cloud_processing", _CLOUD_PROCESSING_PROMPT - ).format( - data_types=", ".join(config.data_types), - project_context="Determine the appropriate project from the message context.", - file_list=f"Message from {config.provider} (id: {msg.id})", - custom_prompt_section=custom_section, + processing_prompt = tracing.compile_prompt( + "batch_cloud_processing", + fallback=_CLOUD_PROCESSING_PROMPT, + variables={ + "data_types": ", ".join(config.data_types), + "project_context": "Determine the appropriate project from the message context.", + "file_list": f"Message from {config.provider} (id: {msg.id})", + "custom_prompt_section": custom_section, + }, ) try: diff --git a/services/batch-agent/app/journey.py b/services/batch-agent/app/journey.py index a0f3655..26151af 100644 --- a/services/batch-agent/app/journey.py +++ b/services/batch-agent/app/journey.py @@ -145,15 +145,17 @@ def _build_system_prompt( if existing_template else "" ) - # Try Langfuse-managed prompt first, fall back to hardcoded template - managed = tracing.get_prompt("journey_system", fallback=None) - template = managed if managed is not None else _SYSTEM_PROMPT_TEMPLATE - return template.format( - directory=directory, - data_types=", ".join(data_types), - template_start=_TEMPLATE_START, - template_end=_TEMPLATE_END, - existing_section=existing_section, + # Use Langfuse compile_prompt ({{variable}} syntax) with Python .format() fallback + return tracing.compile_prompt( + "journey_system", + fallback=_SYSTEM_PROMPT_TEMPLATE, + variables={ + "directory": directory, + "data_types": ", ".join(data_types), + "template_start": _TEMPLATE_START, + "template_end": _TEMPLATE_END, + "existing_section": existing_section, + }, ) diff --git a/services/batch-agent/app/tracing.py b/services/batch-agent/app/tracing.py index 613210a..430edf1 100644 --- a/services/batch-agent/app/tracing.py +++ b/services/batch-agent/app/tracing.py @@ -167,9 +167,9 @@ def get_prompt( fallback: str | None = None, cache_ttl_seconds: int = 300, ) -> str | None: - """Fetch a managed prompt from Langfuse by name. + """Fetch a managed prompt from Langfuse by name (without variable compilation). - Returns the compiled prompt string, or *fallback* if the prompt is not + Returns the raw prompt string, or *fallback* if the prompt is not found or Langfuse is disabled. """ lf = _get_client() @@ -192,6 +192,46 @@ def get_prompt( return fallback +def compile_prompt( + name: str, + *, + fallback: str, + variables: dict[str, str], + version: int | None = None, + label: str | None = None, + cache_ttl_seconds: int = 300, +) -> str: + """Fetch a managed prompt from Langfuse and compile it with ``{{variables}}``. + + If the prompt exists in Langfuse, uses the SDK's ``.compile(**variables)`` + which replaces ``{{key}}`` placeholders. If Langfuse is disabled or the + prompt is not found, falls back to ``fallback.format(**variables)`` (Python + ``{key}`` placeholders). + + This means: + - Langfuse prompts use ``{{variable}}`` syntax. + - Hardcoded fallback strings use Python ``{variable}`` syntax. + """ + lf = _get_client() + if lf is None: + return fallback.format(**variables) + + try: + kwargs: dict[str, Any] = { + "name": name, + "cache_ttl_seconds": cache_ttl_seconds, + } + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + return prompt.compile(**variables) + except Exception as exc: + logger.warning("tracing: compile_prompt(%s) failed, using fallback: %s", name, exc) + return fallback.format(**variables) + + def link_prompt_to_trace( span: Any, prompt_name: str, diff --git a/services/chat/app/tracing.py b/services/chat/app/tracing.py index d115d5b..23a57fd 100644 --- a/services/chat/app/tracing.py +++ b/services/chat/app/tracing.py @@ -167,9 +167,9 @@ def get_prompt( fallback: str | None = None, cache_ttl_seconds: int = 300, ) -> str | None: - """Fetch a managed prompt from Langfuse by name. + """Fetch a managed prompt from Langfuse by name (without variable compilation). - Returns the compiled prompt string, or *fallback* if the prompt is not + Returns the raw prompt string, or *fallback* if the prompt is not found or Langfuse is disabled. """ lf = _get_client() @@ -192,6 +192,46 @@ def get_prompt( return fallback +def compile_prompt( + name: str, + *, + fallback: str, + variables: dict[str, str], + version: int | None = None, + label: str | None = None, + cache_ttl_seconds: int = 300, +) -> str: + """Fetch a managed prompt from Langfuse and compile it with ``{{variables}}``. + + If the prompt exists in Langfuse, uses the SDK's ``.compile(**variables)`` + which replaces ``{{key}}`` placeholders. If Langfuse is disabled or the + prompt is not found, falls back to ``fallback.format(**variables)`` (Python + ``{key}`` placeholders). + + This means: + - Langfuse prompts use ``{{variable}}`` syntax. + - Hardcoded fallback strings use Python ``{variable}`` syntax. + """ + lf = _get_client() + if lf is None: + return fallback.format(**variables) + + try: + kwargs: dict[str, Any] = { + "name": name, + "cache_ttl_seconds": cache_ttl_seconds, + } + if version is not None: + kwargs["version"] = version + if label is not None: + kwargs["label"] = label + prompt = lf.get_prompt(**kwargs) + return prompt.compile(**variables) + except Exception as exc: + logger.warning("tracing: compile_prompt(%s) failed, using fallback: %s", name, exc) + return fallback.format(**variables) + + def link_prompt_to_trace( span: Any, prompt_name: str,