"""Orchestrator — LLM-based intent router and agent pipeline.""" from __future__ import annotations import json from typing import Any, AsyncGenerator from langchain_core.messages import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from app.config.settings import settings from app.core.agent_registry import AgentRegistry from app.core.agent_registry import registry as _default_registry from app.schemas import ChatRequest, ChatResponse, ExecutionPlan, PlanStep _FALLBACK_AGENT = "task_agent" _CLASSIFY_SYSTEM = ( "You are an intent classifier. Given the user message and context, decide " "which agent to route to.\n" "Available agents: {agents}\n" "Respond with just the agent name, nothing else." ) _SYNTHESIZE_HUMAN = ( "Combine the following agent results into one coherent response.\n\n" "Agent results:\n{results}\n\n" "Original message: {message}" ) def _make_llm(model: str = "gpt-4o-mini") -> ChatOpenAI: return ChatOpenAI(model=model, temperature=0, api_key=settings.OPENAI_API_KEY) async def classify_intent( message: str, context: dict[str, Any], reg: AgentRegistry, ) -> str: """Use gpt-4o-mini to classify intent and return the matching agent name. Falls back to ``task_agent`` when the registry is empty or the model returns a name that is not registered. """ agents = reg.list_agents() if not agents: return _FALLBACK_AGENT system = _CLASSIFY_SYSTEM.format(agents=json.dumps(agents)) # Truncate context to keep the classification prompt short human = f"Message: {message}\nContext summary: {json.dumps(context)[:500]}" llm = _make_llm() response = await llm.ainvoke( [SystemMessage(content=system), HumanMessage(content=human)] ) agent_name = str(response.content).strip().lower() known = {a["name"] for a in agents} return agent_name if agent_name in known else _FALLBACK_AGENT async def route_single( agent_name: str, message: str, context: dict[str, Any], reg: AgentRegistry, ) -> ChatResponse: """Route to a single agent and wrap the result in a ``ChatResponse``.""" response_text = await reg.call_agent(agent_name, message, context) return ChatResponse(response=response_text) async def route_pipeline( agent_names: list[str], message: str, context: dict[str, Any], reg: AgentRegistry, ) -> ChatResponse: """Execute agents sequentially; each agent receives previous results in context. A final LLM synthesis call merges all results into one coherent response. """ previous_results: list[str] = [] for agent_name in agent_names: ctx = {**context, "previous_results": list(previous_results)} result = await reg.call_agent(agent_name, message, ctx) previous_results.append(result) results_str = "\n\n".join( f"[{name}]: {res}" for name, res in zip(agent_names, previous_results) ) human = _SYNTHESIZE_HUMAN.format(results=results_str, message=message) llm = _make_llm() synthesis = await llm.ainvoke([HumanMessage(content=human)]) return ChatResponse(response=str(synthesis.content)) def _build_plan(agent_name: str, message: str) -> ExecutionPlan: """Build a minimal ``ExecutionPlan`` for the resolved agent. The full ``ExecutionPlanBuilder`` (with template registry and caching) is implemented in Step 5. This function produces the single-step baseline plan that the orchestrator returns in ``'plan'`` mode. """ return ExecutionPlan( agent=agent_name, steps=[ PlanStep( action="handle", prompt_template=f"tpl_{agent_name}_default", variables={"message": message}, ) ], ) async def orchestrate( request: ChatRequest, reg: AgentRegistry | None = None, ) -> ChatResponse | ExecutionPlan: """Main orchestration entry point. * Classifies the user's intent to select an agent. * ``execution_mode == 'direct'``: routes to the agent and returns a ``ChatResponse``. * ``execution_mode == 'plan'``: returns an ``ExecutionPlan`` with the resolved agent and a template-ID-only step (prompt IP stays server-side). """ if reg is None: reg = _default_registry context = request.context.model_dump() agent_name = await classify_intent(request.message, context, reg) if request.execution_mode == "direct": return await route_single(agent_name, request.message, context, reg) # plan mode — return plan, do not execute return _build_plan(agent_name, request.message) async def orchestrate_stream( request: ChatRequest, reg: AgentRegistry | None = None, ) -> AsyncGenerator[str, None]: """Streaming orchestration — yields text chunks then a final JSON frame. The final frame is a JSON object: ``{"done": true, "response": "...", "actions": []}``. Agents do not yet support token-level streaming; the full response is fetched first, then emitted in fixed-size chunks. Token-level streaming will be wired in Step 6 when agents expose ``astream()``. """ if reg is None: reg = _default_registry context = request.context.model_dump() agent_name = await classify_intent(request.message, context, reg) response_text = await reg.call_agent(agent_name, request.message, context) chunk_size = 50 for i in range(0, len(response_text), chunk_size): yield response_text[i : i + chunk_size] final = ChatResponse(response=response_text) yield json.dumps({"done": True, **final.model_dump()})