- Replaced direct instantiation of ChatOpenAI with a centralized get_llm function in CheckpointAgent, NoteAgent, ProjectAgent, and TaskAgent. - Introduced a new llm.py module to handle LLM model instantiation and API key management. - Updated settings.py to include LLM_MODEL and LLM_ROUTER_MODEL configurations. - Modified orchestrator.py to use get_router_llm for intent classification. - Updated requirements.txt to include litellm for LLM management. - Adjusted tests to mock get_llm instead of ChatOpenAI directly.
169 lines
5.5 KiB
Python
169 lines
5.5 KiB
Python
"""Orchestrator — LLM-based intent router and agent pipeline."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, AsyncGenerator
|
|
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
|
|
from app.core.agent_registry import AgentRegistry
|
|
from app.core.llm import get_router_llm
|
|
from app.core.agent_registry import registry as _default_registry
|
|
from app.schemas import ChatRequest, ChatResponse, ExecutionPlan
|
|
|
|
_FALLBACK_AGENT = "task_agent"
|
|
|
|
_CLASSIFY_SYSTEM = (
|
|
"You are an intent classifier. Given the user message and context, decide "
|
|
"which agent to route to.\n"
|
|
"Available agents: {agents}\n"
|
|
"Respond with just the agent name, nothing else."
|
|
)
|
|
|
|
_SYNTHESIZE_HUMAN = (
|
|
"Combine the following agent results into one coherent response.\n\n"
|
|
"Agent results:\n{results}\n\n"
|
|
"Original message: {message}"
|
|
)
|
|
|
|
|
|
def _make_llm():
|
|
return get_router_llm()
|
|
|
|
|
|
async def classify_intent(
|
|
message: str,
|
|
context: dict[str, Any],
|
|
reg: AgentRegistry,
|
|
) -> str:
|
|
"""Use gpt-4o-mini to classify intent and return the matching agent name.
|
|
|
|
Falls back to ``task_agent`` when the registry is empty or the model
|
|
returns a name that is not registered.
|
|
"""
|
|
agents = reg.list_agents()
|
|
if not agents:
|
|
return _FALLBACK_AGENT
|
|
|
|
system = _CLASSIFY_SYSTEM.format(agents=json.dumps(agents))
|
|
# Truncate context to keep the classification prompt short
|
|
human = f"Message: {message}\nContext summary: {json.dumps(context)[:500]}"
|
|
|
|
llm = _make_llm()
|
|
response = await llm.ainvoke(
|
|
[SystemMessage(content=system), HumanMessage(content=human)]
|
|
)
|
|
|
|
agent_name = str(response.content).strip().lower()
|
|
known = {a["name"] for a in agents}
|
|
return agent_name if agent_name in known else _FALLBACK_AGENT
|
|
|
|
|
|
async def route_single(
|
|
agent_name: str,
|
|
message: str,
|
|
context: dict[str, Any],
|
|
reg: AgentRegistry,
|
|
) -> ChatResponse:
|
|
"""Route to a single agent and wrap the result in a ``ChatResponse``."""
|
|
response_text = await reg.call_agent(agent_name, message, context)
|
|
return ChatResponse(response=response_text)
|
|
|
|
|
|
async def route_pipeline(
|
|
agent_names: list[str],
|
|
message: str,
|
|
context: dict[str, Any],
|
|
reg: AgentRegistry,
|
|
) -> ChatResponse:
|
|
"""Execute agents sequentially; each agent receives previous results in context.
|
|
|
|
A final LLM synthesis call merges all results into one coherent response.
|
|
"""
|
|
previous_results: list[str] = []
|
|
|
|
for agent_name in agent_names:
|
|
ctx = {**context, "previous_results": list(previous_results)}
|
|
result = await reg.call_agent(agent_name, message, ctx)
|
|
previous_results.append(result)
|
|
|
|
results_str = "\n\n".join(
|
|
f"[{name}]: {res}" for name, res in zip(agent_names, previous_results)
|
|
)
|
|
human = _SYNTHESIZE_HUMAN.format(results=results_str, message=message)
|
|
llm = _make_llm()
|
|
synthesis = await llm.ainvoke([HumanMessage(content=human)])
|
|
return ChatResponse(response=str(synthesis.content))
|
|
|
|
|
|
def _build_plan(agent_name: str, message: str) -> ExecutionPlan:
|
|
"""Build an ``ExecutionPlan`` for the resolved agent.
|
|
|
|
Uses ``ExecutionPlanBuilder`` with the server-side template registry.
|
|
If a default template exists for the agent, an LLM step is emitted;
|
|
otherwise a plain ``handle`` action step is used.
|
|
"""
|
|
from app.core.execution_plan import ExecutionPlanBuilder, template_registry
|
|
|
|
template_id = f"tpl_{agent_name}_default"
|
|
builder = ExecutionPlanBuilder(agent_name)
|
|
if template_registry.has(template_id):
|
|
builder.add_llm_step(template_id, {"message": message})
|
|
else:
|
|
builder.add_step("handle", {"message": message})
|
|
return builder.build()
|
|
|
|
|
|
async def orchestrate(
|
|
request: ChatRequest,
|
|
reg: AgentRegistry | None = None,
|
|
) -> ChatResponse | ExecutionPlan:
|
|
"""Main orchestration entry point.
|
|
|
|
* Classifies the user's intent to select an agent.
|
|
* ``execution_mode == 'direct'``: routes to the agent and returns a
|
|
``ChatResponse``.
|
|
* ``execution_mode == 'plan'``: returns an ``ExecutionPlan`` with the
|
|
resolved agent and a template-ID-only step (prompt IP stays server-side).
|
|
"""
|
|
if reg is None:
|
|
reg = _default_registry
|
|
|
|
context = request.context.model_dump()
|
|
agent_name = await classify_intent(request.message, context, reg)
|
|
|
|
if request.execution_mode == "direct":
|
|
return await route_single(agent_name, request.message, context, reg)
|
|
|
|
# plan mode — return plan, do not execute
|
|
return _build_plan(agent_name, request.message)
|
|
|
|
|
|
async def orchestrate_stream(
|
|
request: ChatRequest,
|
|
reg: AgentRegistry | None = None,
|
|
) -> AsyncGenerator[str, None]:
|
|
"""Streaming orchestration — yields text chunks then a final JSON frame.
|
|
|
|
The final frame is a JSON object:
|
|
``{"done": true, "response": "...", "actions": []}``.
|
|
|
|
Agents do not yet support token-level streaming; the full response is
|
|
fetched first, then emitted in fixed-size chunks. Token-level streaming
|
|
will be wired in Step 6 when agents expose ``astream()``.
|
|
"""
|
|
if reg is None:
|
|
reg = _default_registry
|
|
|
|
context = request.context.model_dump()
|
|
agent_name = await classify_intent(request.message, context, reg)
|
|
response_text = await reg.call_agent(agent_name, request.message, context)
|
|
|
|
chunk_size = 50
|
|
for i in range(0, len(response_text), chunk_size):
|
|
yield response_text[i : i + chunk_size]
|
|
|
|
final = ChatResponse(response=response_text)
|
|
yield json.dumps({"done": True, **final.model_dump()})
|