refactor: replace orchestrator with LangGraph deep-agent supervisors
- Add app/core/deep_agent.py with Home and Floating supervisor graphs using LangGraph create_react_agent (hierarchical pattern) - Strip ChatAgent classes from all 4 agent files, keep @tool functions - Rewrite output_formatter.py for event-based (token/tool_end/mutations) stream - Update device_ws.py to use run_home_stream/run_floating_stream - Rewrite chat.py REST route to use run_home - Add update_core_memory tool to both supervisors - Add langgraph>=0.3.0 to requirements.txt - Remove orchestrator.py, execution_plan.py, agent_registry.py, plans.py - Remove PlanAction, PlanStep, ExecutionPlan, execution_mode from schemas - Update all affected tests to match new API - Remove 6 deprecated test files for deleted modules - Clean up stale docstrings referencing removed orchestrator
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Import all agent modules to trigger @registry.register decorators."""
|
||||
"""Agent tool modules — imported by deep_agent.py to build sub-agent graphs."""
|
||||
|
||||
from app.agents import timeline_agent, note_agent, project_agent, task_agent
|
||||
|
||||
|
||||
@@ -1,31 +1,14 @@
|
||||
"""Note agent — Markdown note management (list, get, create, update, delete)."""
|
||||
"""Note agent — tool definitions for Markdown note CRUD."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.tools import tool
|
||||
|
||||
from app.core.agent_registry import ChatAgent, registry
|
||||
from app.core.llm import embed, get_llm
|
||||
from app.core.llm import embed
|
||||
from app.core.ws_context import execute_on_client
|
||||
|
||||
_SYSTEM_PROMPT = (
|
||||
"You are a note-taking assistant. You help users create, retrieve, update,\n"
|
||||
"and delete Markdown notes in their workspace.\n\n"
|
||||
"Rules:\n"
|
||||
" - content is always Markdown; preserve formatting when updating\n"
|
||||
" - project_id is optional; link a note to a project when mentioned\n"
|
||||
" - When updating, call get_note first if you need to read existing content\n"
|
||||
" before appending or replacing sections\n"
|
||||
" - list_notes without project_id returns all notes; scope with project_id\n"
|
||||
" when the user is working within a specific project\n"
|
||||
" - Do not fabricate note content — reflect what the user provides or what\n"
|
||||
" is already in the note (retrieved via get_note)."
|
||||
)
|
||||
|
||||
|
||||
@tool
|
||||
async def list_notes(project_id: str = "") -> str:
|
||||
@@ -122,23 +105,4 @@ async def delete_note(note_id: str) -> str:
|
||||
return f"Note {note_id} deleted."
|
||||
|
||||
|
||||
@registry.register
|
||||
class NoteAgent(ChatAgent):
|
||||
def get_name(self) -> str:
|
||||
return "note_agent"
|
||||
|
||||
def get_description(self) -> str:
|
||||
return "Manages notes: list, get, create, update, delete"
|
||||
|
||||
def get_tools(self) -> list[Any]:
|
||||
return [list_notes, get_note, create_note, update_note, delete_note]
|
||||
|
||||
async def handle(self, query: str, context: dict[str, Any]) -> str:
|
||||
llm = get_llm()
|
||||
messages = [
|
||||
SystemMessage(content=_SYSTEM_PROMPT),
|
||||
HumanMessage(
|
||||
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
|
||||
),
|
||||
]
|
||||
return await self._tool_loop(llm, messages, self.get_tools())
|
||||
|
||||
@@ -1,33 +1,13 @@
|
||||
"""Project agent — full lifecycle management (list, get, create, update, archive, delete)."""
|
||||
"""Project agent — tool definitions for project lifecycle CRUD."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.tools import tool
|
||||
|
||||
from app.core.agent_registry import ChatAgent, registry
|
||||
from app.core.llm import get_llm
|
||||
from app.core.ws_context import execute_on_client
|
||||
|
||||
_SYSTEM_PROMPT = (
|
||||
"You are a project management assistant. You help users create, find,\n"
|
||||
"update, and archive projects in their workspace.\n\n"
|
||||
"Rules:\n"
|
||||
" - status must be one of: active, archived\n"
|
||||
" - client_id is optional; link to a client only when explicitly mentioned\n"
|
||||
" - ai_summary is populated only when the user asks for a project summary;\n"
|
||||
" derive it from context data — do not fabricate content\n"
|
||||
" - Use list_projects for scoped queries; list_all_projects only when the\n"
|
||||
" user wants a complete cross-client view including archived projects\n"
|
||||
" - get_project requires a project UUID; resolve the ID first by calling\n"
|
||||
" list_projects if you only have a project name\n"
|
||||
" - Prefer archiving (update_project status=archived) over deletion;\n"
|
||||
" only call delete_project when the user explicitly confirms deletion."
|
||||
)
|
||||
|
||||
|
||||
@tool
|
||||
async def list_projects(
|
||||
@@ -137,30 +117,4 @@ async def delete_project(project_id: str) -> str:
|
||||
return f"Project {project_id} permanently deleted."
|
||||
|
||||
|
||||
@registry.register
|
||||
class ProjectAgent(ChatAgent):
|
||||
def get_name(self) -> str:
|
||||
return "project_agent"
|
||||
|
||||
def get_description(self) -> str:
|
||||
return "Manages projects: list, get, create, update, archive, delete"
|
||||
|
||||
def get_tools(self) -> list[Any]:
|
||||
return [
|
||||
list_projects,
|
||||
list_all_projects,
|
||||
get_project,
|
||||
create_project,
|
||||
update_project,
|
||||
delete_project,
|
||||
]
|
||||
|
||||
async def handle(self, query: str, context: dict[str, Any]) -> str:
|
||||
llm = get_llm()
|
||||
messages = [
|
||||
SystemMessage(content=_SYSTEM_PROMPT),
|
||||
HumanMessage(
|
||||
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
|
||||
),
|
||||
]
|
||||
return await self._tool_loop(llm, messages, self.get_tools())
|
||||
|
||||
@@ -1,35 +1,14 @@
|
||||
"""Task agent — full CRUD for tasks and task comments."""
|
||||
"""Task agent — tool definitions for task and task comment CRUD."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.tools import tool
|
||||
|
||||
from app.core.agent_registry import ChatAgent, registry
|
||||
from app.core.llm import get_llm
|
||||
from app.core.ws_context import execute_on_client
|
||||
|
||||
_SYSTEM_PROMPT = (
|
||||
"You are a task management assistant for a project workspace.\n"
|
||||
"You create, update, list, and track tasks and their comments.\n\n"
|
||||
"Rules:\n"
|
||||
" - status must be one of: todo, in_progress, done\n"
|
||||
" - priority must be one of: high, medium, low\n"
|
||||
" - due_date is a Unix timestamp in milliseconds; convert human dates\n"
|
||||
" - assignees is a JSON-encoded array of strings (e.g. '[\"Alice\",\"Bob\"]')\n"
|
||||
" - project_id is optional; link to a project when the user mentions one\n"
|
||||
" - is_ai_suggested: 1 only when proactively proposing a task the user\n"
|
||||
" did not explicitly request; 0 otherwise\n"
|
||||
" - is_approved defaults to 0; set to 1 only when the user confirms\n"
|
||||
" - Use list_tasks_due_today for 'what's due today' queries\n"
|
||||
" - For update_task, use -1 for integer fields you do not want to change\n"
|
||||
" - Always confirm the action in plain, user-friendly language."
|
||||
)
|
||||
|
||||
|
||||
# ── Task tools ────────────────────────────────────────────────────────
|
||||
|
||||
@@ -220,35 +199,4 @@ async def delete_task_comment(comment_id: str) -> str:
|
||||
return f"Comment {comment_id} deleted."
|
||||
|
||||
|
||||
# ── Agent ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@registry.register
|
||||
class TaskAgent(ChatAgent):
|
||||
def get_name(self) -> str:
|
||||
return "task_agent"
|
||||
|
||||
def get_description(self) -> str:
|
||||
return "Manages tasks and comments: list, create, update, delete, due-today, comments"
|
||||
|
||||
def get_tools(self) -> list[Any]:
|
||||
return [
|
||||
list_tasks,
|
||||
create_task,
|
||||
update_task,
|
||||
delete_task,
|
||||
list_tasks_due_today,
|
||||
list_task_comments,
|
||||
add_task_comment,
|
||||
delete_task_comment,
|
||||
]
|
||||
|
||||
async def handle(self, query: str, context: dict[str, Any]) -> str:
|
||||
llm = get_llm()
|
||||
messages = [
|
||||
SystemMessage(content=_SYSTEM_PROMPT),
|
||||
HumanMessage(
|
||||
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
|
||||
),
|
||||
]
|
||||
return await self._tool_loop(llm, messages, self.get_tools())
|
||||
|
||||
@@ -1,30 +1,13 @@
|
||||
"""Timeline agent — project milestone management (list, create, update, delete)."""
|
||||
"""Timeline agent — tool definitions for project milestone CRUD."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.tools import tool
|
||||
|
||||
from app.core.agent_registry import ChatAgent, registry
|
||||
from app.core.llm import get_llm
|
||||
from app.core.ws_context import execute_on_client
|
||||
|
||||
_SYSTEM_PROMPT = (
|
||||
"You are a project timeline assistant. Timelines are milestone dates that\n"
|
||||
"track progress on a project — they are not calendar events.\n\n"
|
||||
"Rules:\n"
|
||||
" - project_id is REQUIRED for every create; confirm with the user if unknown\n"
|
||||
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
|
||||
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
|
||||
" - is_approved: 0 until the user explicitly confirms; then 1\n"
|
||||
" - For update_timeline, use -1 for integer fields you do not want to change\n"
|
||||
" - Listing without a project_id returns all timelines across projects\n"
|
||||
" - Always echo the title and formatted date in your confirmation."
|
||||
)
|
||||
|
||||
|
||||
@tool
|
||||
async def list_timelines(project_id: str = "") -> str:
|
||||
@@ -106,23 +89,4 @@ async def delete_timeline(timeline_id: str) -> str:
|
||||
return f"Timeline {timeline_id} deleted."
|
||||
|
||||
|
||||
@registry.register
|
||||
class TimelineAgent(ChatAgent):
|
||||
def get_name(self) -> str:
|
||||
return "timeline_agent"
|
||||
|
||||
def get_description(self) -> str:
|
||||
return "Manages project timelines (milestones): list, create, update, delete"
|
||||
|
||||
def get_tools(self) -> list[Any]:
|
||||
return [list_timelines, create_timeline, update_timeline, delete_timeline]
|
||||
|
||||
async def handle(self, query: str, context: dict[str, Any]) -> str:
|
||||
llm = get_llm()
|
||||
messages = [
|
||||
SystemMessage(content=_SYSTEM_PROMPT),
|
||||
HumanMessage(
|
||||
content=f"User query: {query}\nContext: {json.dumps(context)[:1000]}"
|
||||
),
|
||||
]
|
||||
return await self._tool_loop(llm, messages, self.get_tools())
|
||||
|
||||
@@ -9,8 +9,10 @@ from fastapi import APIRouter, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app.api.deps import get_current_user
|
||||
from app.core.orchestrator import orchestrate
|
||||
from app.schemas import ChatRequest, UserProfile
|
||||
from app.core.deep_agent import run_home
|
||||
from app.core.memory_middleware import MemoryMiddleware
|
||||
from app.db import async_session
|
||||
from app.schemas import ChatRequest, ChatResponse, UserProfile
|
||||
|
||||
router = APIRouter(prefix="/chat", tags=["chat"])
|
||||
|
||||
@@ -20,10 +22,21 @@ async def chat(
|
||||
body: ChatRequest,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> JSONResponse:
|
||||
"""Route a chat message through the orchestrator.
|
||||
"""Route a chat message through the Home deep agent (non-streaming)."""
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
memory_context = await memory.enrich_context(current_user.id, body.message)
|
||||
|
||||
Returns ``ChatResponse`` for ``execution_mode='direct'``,
|
||||
or ``ExecutionPlan`` for ``execution_mode='plan'``.
|
||||
"""
|
||||
result = await orchestrate(body)
|
||||
context = {
|
||||
**body.context.model_dump(),
|
||||
**memory_context,
|
||||
}
|
||||
|
||||
response_text = await run_home(
|
||||
user_id=current_user.id,
|
||||
message=body.message,
|
||||
context=context,
|
||||
db_session_factory=async_session,
|
||||
)
|
||||
result = ChatResponse(response=response_text)
|
||||
return JSONResponse(content=result.model_dump())
|
||||
|
||||
@@ -43,7 +43,7 @@ from app.config.settings import settings
|
||||
from app.core.agent_runner import trigger_pending_runs
|
||||
from app.core.device_manager import device_manager
|
||||
from app.core.memory_middleware import MemoryMiddleware
|
||||
from app.core.orchestrator import orchestrate_v3_stream
|
||||
from app.core.deep_agent import run_home_stream, run_floating_stream
|
||||
from app.core.output_formatter import HomeFormatter, FloatingFormatter
|
||||
from app.core.ws_context import clear_client_executor, set_client_executor
|
||||
from app.db import async_session
|
||||
@@ -204,9 +204,17 @@ async def _make_ws_executor(websocket: WebSocket, user_id: str):
|
||||
"""Return a callback that sends tool_call frames and awaits tool_result."""
|
||||
async def _executor(payload: dict) -> dict:
|
||||
payload["type"] = WsFrameType.tool_call
|
||||
call_id = payload["id"]
|
||||
logger.info("ws_executor: sending tool_call id=%s action=%s", call_id, payload.get("action"))
|
||||
await websocket.send_text(json.dumps(payload))
|
||||
future = device_manager.create_pending_call(user_id, payload["id"])
|
||||
return await future
|
||||
future = device_manager.create_pending_call(user_id, call_id)
|
||||
result = await future
|
||||
logger.info("ws_executor: tool_result id=%s result_type=%s result_keys=%s",
|
||||
call_id, type(result).__name__,
|
||||
list(result.keys()) if isinstance(result, dict) else "N/A")
|
||||
if result is None:
|
||||
logger.error("ws_executor: future resolved to None for call_id=%s user=%s", call_id, user_id)
|
||||
return result
|
||||
return _executor
|
||||
|
||||
|
||||
@@ -233,21 +241,13 @@ async def _handle_home_request(
|
||||
executor = await _make_ws_executor(websocket, user_id)
|
||||
set_client_executor(executor)
|
||||
response_chunks: list[str] = []
|
||||
agent_holder: list = []
|
||||
try:
|
||||
token_stream = orchestrate_v3_stream(
|
||||
user_id, message, context, agent_holder=agent_holder
|
||||
event_stream = run_home_stream(
|
||||
user_id, message, context, db_session_factory=async_session
|
||||
)
|
||||
formatter = HomeFormatter(request_id=request_id, tool_results=[])
|
||||
async for ws_frame in formatter.format(token_stream):
|
||||
# Inject mutations from agent tool_results into stream_end
|
||||
if ws_frame.type == "stream_end" and agent_holder: # type: ignore[union-attr]
|
||||
ws_frame.mutations = [ # type: ignore[union-attr]
|
||||
{"action": r["action"], "table": r["table"], "data": r["data"]}
|
||||
for r in getattr(agent_holder[0], "tool_results", [])
|
||||
]
|
||||
formatter = HomeFormatter(request_id=request_id)
|
||||
async for ws_frame in formatter.format(event_stream):
|
||||
await websocket.send_text(ws_frame.model_dump_json())
|
||||
# Collect text chunks to build the full response for episode storage
|
||||
if ws_frame.type == "stream_text": # type: ignore[union-attr]
|
||||
response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
|
||||
except Exception as exc:
|
||||
@@ -287,18 +287,13 @@ async def _handle_floating_request(
|
||||
executor = await _make_ws_executor(websocket, user_id)
|
||||
set_client_executor(executor)
|
||||
response_chunks: list[str] = []
|
||||
agent_holder: list = []
|
||||
try:
|
||||
token_stream = orchestrate_v3_stream(
|
||||
user_id, message, context, agent_holder=agent_holder
|
||||
event_stream = run_floating_stream(
|
||||
user_id, message, context, scope=scope,
|
||||
db_session_factory=async_session,
|
||||
)
|
||||
formatter = FloatingFormatter(request_id=request_id)
|
||||
async for ws_frame in formatter.format(token_stream):
|
||||
if ws_frame.type == "stream_end" and agent_holder: # type: ignore[union-attr]
|
||||
ws_frame.mutations = [ # type: ignore[union-attr]
|
||||
{"action": r["action"], "table": r["table"], "data": r["data"]}
|
||||
for r in getattr(agent_holder[0], "tool_results", [])
|
||||
]
|
||||
async for ws_frame in formatter.format(event_stream):
|
||||
await websocket.send_text(ws_frame.model_dump_json())
|
||||
if ws_frame.type == "stream_text": # type: ignore[union-attr]
|
||||
response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
"""Plans routes: GET /plans/playbook and GET /plans/playbook/{plan_id}."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
|
||||
from app.api.deps import get_current_user
|
||||
from app.core.execution_plan import plan_cache
|
||||
from app.schemas import ExecutionPlan, UserProfile
|
||||
|
||||
router = APIRouter(prefix="/plans", tags=["plans"])
|
||||
|
||||
|
||||
@router.get("/playbook", response_model=list[ExecutionPlan])
|
||||
async def list_playbooks(
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> list[ExecutionPlan]:
|
||||
"""Return all cached execution plan playbooks for the authenticated user.
|
||||
|
||||
TODO(Step11): filter by tier — power+ plans gated behind batch_builder feature.
|
||||
"""
|
||||
return plan_cache.get_all_playbooks()
|
||||
|
||||
|
||||
@router.get("/playbook/{plan_id}", response_model=ExecutionPlan)
|
||||
async def get_playbook(
|
||||
plan_id: str,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> ExecutionPlan:
|
||||
"""Return a specific execution plan playbook by ID."""
|
||||
plan = plan_cache.get_plan(plan_id)
|
||||
if plan is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Plan not found: {plan_id}",
|
||||
)
|
||||
return plan
|
||||
@@ -1,217 +0,0 @@
|
||||
"""Agent Registry — base classes and singleton registry for chat agents."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
|
||||
class BaseAgent(ABC):
|
||||
"""Common base for all agents."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
user_id: str = "",
|
||||
shared_memory: dict[str, Any] | None = None,
|
||||
vector_store_context: list[str] | None = None,
|
||||
) -> None:
|
||||
self.user_id = user_id
|
||||
self.shared_memory: dict[str, Any] = shared_memory or {}
|
||||
self.vector_store_context: list[str] = vector_store_context or []
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
def get_description(self) -> str: ...
|
||||
|
||||
@property
|
||||
def skills(self) -> list[str]:
|
||||
"""Override in subclasses to advertise capabilities."""
|
||||
return []
|
||||
|
||||
|
||||
class ChatAgent(BaseAgent):
|
||||
"""Base class for LLM-powered chat agents."""
|
||||
|
||||
def __init__(self, **kwargs: Any) -> None:
|
||||
super().__init__(**kwargs)
|
||||
# Populated by _tool_loop / _tool_loop_stream with raw execute_on_client results.
|
||||
self.tool_results: list[dict] = []
|
||||
|
||||
@abstractmethod
|
||||
async def handle(self, query: str, context: dict[str, Any]) -> str:
|
||||
"""Process a user query and return a text response."""
|
||||
...
|
||||
|
||||
async def handle_stream(
|
||||
self, query: str, context: dict[str, Any]
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Streaming variant of handle().
|
||||
|
||||
Default: calls handle() and yields the full response as one chunk.
|
||||
Override in subclasses for true token-level streaming via _tool_loop_stream.
|
||||
"""
|
||||
yield await self.handle(query, context)
|
||||
|
||||
@abstractmethod
|
||||
def get_tools(self) -> list[Any]:
|
||||
"""Return LangChain tool definitions available to this agent."""
|
||||
...
|
||||
|
||||
async def _tool_loop(
|
||||
self,
|
||||
llm: Any,
|
||||
messages: list[Any],
|
||||
tools: list[Any],
|
||||
max_iter: int = 5,
|
||||
) -> str:
|
||||
"""Shared tool-calling loop.
|
||||
|
||||
Binds *tools* to *llm*, invokes iteratively until the model stops
|
||||
requesting tool calls or *max_iter* is reached, and returns the
|
||||
final text response. Captures raw execute_on_client results in
|
||||
``self.tool_results``.
|
||||
"""
|
||||
from langchain_core.messages import AIMessage, ToolMessage
|
||||
|
||||
from app.core.ws_context import clear_tool_result_collector, set_tool_result_collector
|
||||
|
||||
collector: list[dict] = []
|
||||
set_tool_result_collector(collector)
|
||||
try:
|
||||
llm_with_tools = llm.bind_tools(tools) if tools else llm
|
||||
|
||||
for _ in range(max_iter):
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
messages.append(response)
|
||||
|
||||
if not response.tool_calls:
|
||||
return str(response.content)
|
||||
|
||||
# Execute each requested tool call
|
||||
tool_map = {t.name: t for t in tools}
|
||||
for call in response.tool_calls:
|
||||
tool_fn = tool_map.get(call["name"])
|
||||
if tool_fn is None:
|
||||
result = f"Unknown tool: {call['name']}"
|
||||
else:
|
||||
result = await tool_fn.ainvoke(call["args"])
|
||||
messages.append(
|
||||
ToolMessage(content=str(result), tool_call_id=call["id"])
|
||||
)
|
||||
|
||||
# Exhausted iterations — ask model for a final answer without tools
|
||||
response = await llm.ainvoke(messages)
|
||||
return str(response.content)
|
||||
finally:
|
||||
clear_tool_result_collector()
|
||||
self.tool_results = collector
|
||||
|
||||
async def _tool_loop_stream(
|
||||
self,
|
||||
llm: Any,
|
||||
messages: list[Any],
|
||||
tools: list[Any],
|
||||
max_iter: int = 5,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Streaming variant of ``_tool_loop``.
|
||||
|
||||
Behaves identically for tool-calling iterations (uses ainvoke to parse
|
||||
tool calls). For the final response — when the model produces no further
|
||||
tool calls — switches to ``llm.astream()`` and yields text tokens.
|
||||
Captures raw execute_on_client results in ``self.tool_results``.
|
||||
"""
|
||||
from langchain_core.messages import AIMessage, ToolMessage
|
||||
|
||||
from app.core.ws_context import clear_tool_result_collector, set_tool_result_collector
|
||||
|
||||
collector: list[dict] = []
|
||||
set_tool_result_collector(collector)
|
||||
try:
|
||||
llm_with_tools = llm.bind_tools(tools) if tools else llm
|
||||
|
||||
for _ in range(max_iter):
|
||||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||||
|
||||
if not response.tool_calls:
|
||||
# Stream the final answer — don't keep the ainvoke result.
|
||||
async for chunk in llm.astream(messages):
|
||||
if chunk.content:
|
||||
yield str(chunk.content)
|
||||
return
|
||||
|
||||
messages.append(response)
|
||||
|
||||
# Execute each requested tool call
|
||||
tool_map = {t.name: t for t in tools}
|
||||
for call in response.tool_calls:
|
||||
tool_fn = tool_map.get(call["name"])
|
||||
if tool_fn is None:
|
||||
result = f"Unknown tool: {call['name']}"
|
||||
else:
|
||||
result = await tool_fn.ainvoke(call["args"])
|
||||
messages.append(
|
||||
ToolMessage(content=str(result), tool_call_id=call["id"])
|
||||
)
|
||||
|
||||
# Exhausted iterations — stream a final answer without tools
|
||||
async for chunk in llm.astream(messages):
|
||||
if chunk.content:
|
||||
yield str(chunk.content)
|
||||
finally:
|
||||
clear_tool_result_collector()
|
||||
self.tool_results = collector
|
||||
|
||||
|
||||
class AgentRegistry:
|
||||
"""Singleton registry for ChatAgent subclasses."""
|
||||
|
||||
_instance: AgentRegistry | None = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._agents: dict[str, type[ChatAgent]] = {}
|
||||
|
||||
def __new__(cls) -> AgentRegistry:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._agents = {}
|
||||
return cls._instance
|
||||
|
||||
# ── public API ───────────────────────────────────────────────────
|
||||
|
||||
def register(self, agent_class: type[ChatAgent]) -> type[ChatAgent]:
|
||||
"""Class decorator — registers an agent by its name."""
|
||||
instance = agent_class()
|
||||
name = instance.get_name()
|
||||
self._agents[name] = agent_class
|
||||
return agent_class
|
||||
|
||||
def get(self, name: str) -> ChatAgent:
|
||||
"""Return a fresh instance of the named agent."""
|
||||
cls = self._agents.get(name)
|
||||
if cls is None:
|
||||
raise KeyError(f"Agent not found: {name}")
|
||||
return cls()
|
||||
|
||||
def list_agents(self) -> list[dict[str, str]]:
|
||||
"""Return ``[{name, description}]`` for the orchestrator prompt."""
|
||||
result: list[dict[str, str]] = []
|
||||
for cls in self._agents.values():
|
||||
inst = cls()
|
||||
result.append(
|
||||
{"name": inst.get_name(), "description": inst.get_description()}
|
||||
)
|
||||
return result
|
||||
|
||||
async def call_agent(
|
||||
self, name: str, query: str, context: dict[str, Any]
|
||||
) -> str:
|
||||
"""Instantiate the named agent and call its ``handle`` method."""
|
||||
agent = self.get(name)
|
||||
return await agent.handle(query, context)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
registry = AgentRegistry()
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Agent run orchestrator.
|
||||
"""Agent run manager.
|
||||
|
||||
Drives two agent types:
|
||||
|
||||
|
||||
429
app/core/deep_agent.py
Normal file
429
app/core/deep_agent.py
Normal file
@@ -0,0 +1,429 @@
|
||||
"""Deep Agent — LangGraph hierarchical supervisors for home and floating modes.
|
||||
|
||||
Two supervisor graphs (both ``create_react_agent``):
|
||||
* **HomeSupervisor** — gathers data from multiple domains, presents
|
||||
structured overview with tool-result blocks.
|
||||
* **FloatingSupervisor** — focused, scoped assistant for a single entity/domain.
|
||||
|
||||
Each supervisor delegates to four sub-agent tools, each a compiled
|
||||
``create_react_agent`` wrapping the domain CRUD tools (task, project, note,
|
||||
timeline). The sub-agents talk to Electron via ``execute_on_client``.
|
||||
|
||||
Streaming uses ``astream(stream_mode=["messages", "updates"])`` so that
|
||||
callers can sniff:
|
||||
* ``("messages", (token, metadata))`` — text tokens for streaming
|
||||
* ``("updates", ...)`` — tool call results for mutations
|
||||
|
||||
An ``update_core_memory`` tool is available to both supervisors for
|
||||
persisting user preferences mid-conversation (MemGPT-style).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
from langchain_core.messages import AIMessageChunk, HumanMessage
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.prebuilt import create_react_agent
|
||||
|
||||
from app.core.llm import get_llm
|
||||
from app.core.ws_context import (
|
||||
clear_tool_result_collector,
|
||||
set_tool_result_collector,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Sub-agent tool imports ────────────────────────────────────────────
|
||||
|
||||
from app.agents.task_agent import ( # noqa: E402
|
||||
add_task_comment,
|
||||
create_task,
|
||||
delete_task,
|
||||
delete_task_comment,
|
||||
list_task_comments,
|
||||
list_tasks,
|
||||
list_tasks_due_today,
|
||||
update_task,
|
||||
)
|
||||
from app.agents.note_agent import ( # noqa: E402
|
||||
create_note,
|
||||
delete_note,
|
||||
get_note,
|
||||
list_notes,
|
||||
update_note,
|
||||
)
|
||||
from app.agents.project_agent import ( # noqa: E402
|
||||
create_project,
|
||||
delete_project,
|
||||
get_project,
|
||||
list_all_projects,
|
||||
list_projects,
|
||||
update_project,
|
||||
)
|
||||
from app.agents.timeline_agent import ( # noqa: E402
|
||||
create_timeline,
|
||||
delete_timeline,
|
||||
list_timelines,
|
||||
update_timeline,
|
||||
)
|
||||
|
||||
# ── Sub-agent definitions ─────────────────────────────────────────────
|
||||
|
||||
_TASK_TOOLS = [
|
||||
list_tasks,
|
||||
create_task,
|
||||
update_task,
|
||||
delete_task,
|
||||
list_tasks_due_today,
|
||||
list_task_comments,
|
||||
add_task_comment,
|
||||
delete_task_comment,
|
||||
]
|
||||
|
||||
_NOTE_TOOLS = [list_notes, get_note, create_note, update_note, delete_note]
|
||||
|
||||
_PROJECT_TOOLS = [
|
||||
list_projects,
|
||||
list_all_projects,
|
||||
get_project,
|
||||
create_project,
|
||||
update_project,
|
||||
delete_project,
|
||||
]
|
||||
|
||||
_TIMELINE_TOOLS = [list_timelines, create_timeline, update_timeline, delete_timeline]
|
||||
|
||||
|
||||
def _build_subagent_tool(
|
||||
name: str,
|
||||
description: str,
|
||||
system_prompt: str,
|
||||
tools: list,
|
||||
):
|
||||
"""Build a compiled sub-agent graph and wrap it as a LangChain tool."""
|
||||
subgraph = create_react_agent(
|
||||
model=get_llm(),
|
||||
tools=tools,
|
||||
prompt=system_prompt,
|
||||
name=name,
|
||||
)
|
||||
|
||||
@tool(name=name, description=description)
|
||||
async def _run(query: str) -> str:
|
||||
result = await subgraph.ainvoke(
|
||||
{"messages": [HumanMessage(content=query)]}
|
||||
)
|
||||
messages = result["messages"]
|
||||
# Return the last AI message content
|
||||
for msg in reversed(messages):
|
||||
if hasattr(msg, "content") and msg.content and not getattr(msg, "tool_calls", None):
|
||||
return str(msg.content)
|
||||
return "No response from sub-agent."
|
||||
|
||||
return _run
|
||||
|
||||
|
||||
def _make_subagent_tools() -> list:
|
||||
"""Create the four sub-agent tools for the supervisor."""
|
||||
return [
|
||||
_build_subagent_tool(
|
||||
name="task_agent",
|
||||
description=(
|
||||
"Manages tasks and comments: list, create, update, delete, "
|
||||
"due-today, comments. Delegate task-related queries here."
|
||||
),
|
||||
system_prompt=(
|
||||
"You are a task management assistant. You create, update, list, "
|
||||
"and track tasks and their comments.\n\n"
|
||||
"Rules:\n"
|
||||
" - status must be one of: todo, in_progress, done\n"
|
||||
" - priority must be one of: high, medium, low\n"
|
||||
" - due_date is a Unix timestamp in milliseconds\n"
|
||||
" - assignees is a JSON-encoded array of strings\n"
|
||||
" - is_approved defaults to 0; set to 1 only when the user confirms\n"
|
||||
" - For update_task, use -1 for integer fields you do not want to change\n"
|
||||
" - Always confirm the action in plain, user-friendly language."
|
||||
),
|
||||
tools=_TASK_TOOLS,
|
||||
),
|
||||
_build_subagent_tool(
|
||||
name="note_agent",
|
||||
description=(
|
||||
"Manages notes: list, get, create, update, delete. "
|
||||
"Delegate note-related queries here."
|
||||
),
|
||||
system_prompt=(
|
||||
"You are a note-taking assistant. You help users create, retrieve, "
|
||||
"update, and delete Markdown notes in their workspace.\n\n"
|
||||
"Rules:\n"
|
||||
" - content is always Markdown; preserve formatting when updating\n"
|
||||
" - When updating, call get_note first if you need to read existing "
|
||||
"content before appending or replacing sections\n"
|
||||
" - Do not fabricate note content."
|
||||
),
|
||||
tools=_NOTE_TOOLS,
|
||||
),
|
||||
_build_subagent_tool(
|
||||
name="project_agent",
|
||||
description=(
|
||||
"Manages projects: list, get, create, update, archive, delete. "
|
||||
"Delegate project-related queries here."
|
||||
),
|
||||
system_prompt=(
|
||||
"You are a project management assistant. You help users create, "
|
||||
"find, update, and archive projects.\n\n"
|
||||
"Rules:\n"
|
||||
" - status must be one of: active, archived\n"
|
||||
" - Prefer archiving over deletion\n"
|
||||
" - ai_summary is populated only when the user asks for a summary."
|
||||
),
|
||||
tools=_PROJECT_TOOLS,
|
||||
),
|
||||
_build_subagent_tool(
|
||||
name="timeline_agent",
|
||||
description=(
|
||||
"Manages project timelines (milestones): list, create, update, "
|
||||
"delete. Delegate timeline/milestone queries here."
|
||||
),
|
||||
system_prompt=(
|
||||
"You are a project timeline assistant. Timelines are milestone "
|
||||
"dates that track progress on a project.\n\n"
|
||||
"Rules:\n"
|
||||
" - project_id is REQUIRED for every create\n"
|
||||
" - date is a Unix timestamp in milliseconds\n"
|
||||
" - For update_timeline, use -1 for integer fields you do not "
|
||||
"want to change."
|
||||
),
|
||||
tools=_TIMELINE_TOOLS,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
# ── Update core memory tool ──────────────────────────────────────────
|
||||
|
||||
def _make_update_core_memory_tool(user_id: str, db_session_factory):
|
||||
"""Create a tool that persists a key/value preference in core memory."""
|
||||
|
||||
@tool
|
||||
async def update_core_memory(key: str, value: str) -> str:
|
||||
"""Save a user preference or fact to long-term core memory.
|
||||
key: short label for the memory (e.g. 'preferred_language', 'timezone')
|
||||
value: the value to remember
|
||||
Use this when the user states a preference or fact worth remembering.
|
||||
"""
|
||||
from app.core.memory_middleware import MemoryMiddleware
|
||||
|
||||
async with db_session_factory() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
await memory.update_core(user_id, key, value)
|
||||
return f"Remembered: {key} = {value}"
|
||||
|
||||
return update_core_memory
|
||||
|
||||
|
||||
# ── System prompts ────────────────────────────────────────────────────
|
||||
|
||||
_HOME_SYSTEM = (
|
||||
"You are Adiuva, a smart workspace assistant on the Home dashboard.\n"
|
||||
"Your job is to help the user by gathering data from their workspace and "
|
||||
"presenting a comprehensive overview.\n\n"
|
||||
"You have sub-agent tools (task_agent, note_agent, project_agent, "
|
||||
"timeline_agent) that can query and mutate workspace data. Delegate to "
|
||||
"the appropriate sub-agent(s) based on the user's request. You can call "
|
||||
"multiple sub-agents if needed.\n\n"
|
||||
"You also have an update_core_memory tool — use it when the user states "
|
||||
"a preference or important fact worth remembering long-term.\n\n"
|
||||
"After gathering data, synthesize a clear, helpful response for the user.\n\n"
|
||||
"Memory context:\n{memory_context}"
|
||||
)
|
||||
|
||||
_FLOATING_SYSTEM = (
|
||||
"You are Adiuva, a focused workspace assistant in the floating panel.\n"
|
||||
"The user is currently working in the '{scope_type}' section"
|
||||
"{scope_detail}.\n\n"
|
||||
"You have sub-agent tools (task_agent, note_agent, project_agent, "
|
||||
"timeline_agent) that can query and mutate workspace data. Focus your "
|
||||
"help on the user's current scope, but you can use other sub-agents "
|
||||
"if the request requires it.\n\n"
|
||||
"You also have an update_core_memory tool — use it when the user states "
|
||||
"a preference or important fact worth remembering long-term.\n\n"
|
||||
"Provide direct, conversational responses.\n\n"
|
||||
"Memory context:\n{memory_context}"
|
||||
)
|
||||
|
||||
|
||||
def _format_memory_context(memory: dict[str, Any]) -> str:
|
||||
"""Format the memory dict into a readable string for the system prompt."""
|
||||
if not memory:
|
||||
return "(no memory available)"
|
||||
parts = []
|
||||
if memory.get("core_memory"):
|
||||
parts.append("Preferences: " + json.dumps(memory["core_memory"]))
|
||||
if memory.get("associative_memory"):
|
||||
parts.append("Related memories: " + "; ".join(memory["associative_memory"][:3]))
|
||||
if memory.get("episodic_memory"):
|
||||
parts.append("Recent sessions: " + "; ".join(memory["episodic_memory"][:3]))
|
||||
if memory.get("proactive_hints"):
|
||||
parts.append("Patterns: " + "; ".join(memory["proactive_hints"][:3]))
|
||||
return "\n".join(parts) if parts else "(no memory available)"
|
||||
|
||||
|
||||
# ── Graph builders ────────────────────────────────────────────────────
|
||||
|
||||
def build_home_graph(
|
||||
user_id: str,
|
||||
memory_context: dict[str, Any],
|
||||
db_session_factory,
|
||||
):
|
||||
"""Build the Home supervisor graph."""
|
||||
subagent_tools = _make_subagent_tools()
|
||||
memory_tool = _make_update_core_memory_tool(user_id, db_session_factory)
|
||||
all_tools = subagent_tools + [memory_tool]
|
||||
|
||||
prompt = _HOME_SYSTEM.format(
|
||||
memory_context=_format_memory_context(memory_context),
|
||||
)
|
||||
|
||||
return create_react_agent(
|
||||
model=get_llm(),
|
||||
tools=all_tools,
|
||||
prompt=prompt,
|
||||
name="home_supervisor",
|
||||
)
|
||||
|
||||
|
||||
def build_floating_graph(
|
||||
user_id: str,
|
||||
memory_context: dict[str, Any],
|
||||
scope: dict[str, Any],
|
||||
db_session_factory,
|
||||
):
|
||||
"""Build the Floating supervisor graph."""
|
||||
subagent_tools = _make_subagent_tools()
|
||||
memory_tool = _make_update_core_memory_tool(user_id, db_session_factory)
|
||||
all_tools = subagent_tools + [memory_tool]
|
||||
|
||||
scope_type = scope.get("type", "general")
|
||||
scope_id = scope.get("id")
|
||||
scope_detail = f" (id: {scope_id})" if scope_id else ""
|
||||
|
||||
prompt = _FLOATING_SYSTEM.format(
|
||||
scope_type=scope_type,
|
||||
scope_detail=scope_detail,
|
||||
memory_context=_format_memory_context(memory_context),
|
||||
)
|
||||
|
||||
return create_react_agent(
|
||||
model=get_llm(),
|
||||
tools=all_tools,
|
||||
prompt=prompt,
|
||||
name="floating_supervisor",
|
||||
)
|
||||
|
||||
|
||||
# ── Stream event type ────────────────────────────────────────────────
|
||||
|
||||
# Events yielded by run_*_stream:
|
||||
# ("token", str) — text token for streaming
|
||||
# ("tool_start", dict) — {"name": "task_agent", "args": {...}}
|
||||
# ("tool_end", dict) — {"name": "task_agent", "result": "..."}
|
||||
|
||||
|
||||
# ── Stream runners ────────────────────────────────────────────────────
|
||||
|
||||
async def _run_graph_stream(
|
||||
graph,
|
||||
message: str,
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
"""Run a supervisor graph with streaming, yielding event tuples.
|
||||
|
||||
Uses ``stream_mode=["messages", "updates"]`` to get both token-level
|
||||
streaming and update events for tool calls.
|
||||
"""
|
||||
inputs = {"messages": [HumanMessage(content=message)]}
|
||||
|
||||
collector: list[dict] = []
|
||||
set_tool_result_collector(collector)
|
||||
try:
|
||||
async for stream_mode, chunk in graph.astream(
|
||||
inputs,
|
||||
stream_mode=["messages", "updates"],
|
||||
):
|
||||
if stream_mode == "messages":
|
||||
msg, metadata = chunk
|
||||
# Only yield tokens from the supervisor's final response
|
||||
# (not from sub-agent internal LLM calls)
|
||||
if (
|
||||
isinstance(msg, AIMessageChunk)
|
||||
and msg.content
|
||||
and not msg.tool_calls
|
||||
and metadata.get("langgraph_node") == "agent"
|
||||
):
|
||||
yield ("token", str(msg.content))
|
||||
|
||||
elif stream_mode == "updates":
|
||||
# Updates is a dict of {node_name: state_update}
|
||||
if not isinstance(chunk, dict):
|
||||
continue
|
||||
for node_name, state_update in chunk.items():
|
||||
if node_name != "tools":
|
||||
continue
|
||||
# Tool node executed — extract tool call results
|
||||
tool_messages = state_update.get("messages", [])
|
||||
for tool_msg in tool_messages:
|
||||
if hasattr(tool_msg, "name") and hasattr(tool_msg, "content"):
|
||||
yield (
|
||||
"tool_end",
|
||||
{"name": tool_msg.name, "result": str(tool_msg.content)},
|
||||
)
|
||||
finally:
|
||||
clear_tool_result_collector()
|
||||
|
||||
# Yield the collected mutations so callers can attach them to stream_end
|
||||
yield ("mutations", collector)
|
||||
|
||||
|
||||
async def run_home_stream(
|
||||
user_id: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
db_session_factory,
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
"""Run the Home supervisor and yield streaming events."""
|
||||
graph = build_home_graph(user_id, context, db_session_factory)
|
||||
async for event in _run_graph_stream(graph, message):
|
||||
yield event
|
||||
|
||||
|
||||
async def run_floating_stream(
|
||||
user_id: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
scope: dict[str, Any],
|
||||
db_session_factory,
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
"""Run the Floating supervisor and yield streaming events."""
|
||||
graph = build_floating_graph(user_id, context, scope, db_session_factory)
|
||||
async for event in _run_graph_stream(graph, message):
|
||||
yield event
|
||||
|
||||
|
||||
async def run_home(
|
||||
user_id: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
db_session_factory,
|
||||
) -> str:
|
||||
"""Run the Home supervisor (non-streaming) and return full response text."""
|
||||
graph = build_home_graph(user_id, context, db_session_factory)
|
||||
result = await graph.ainvoke(
|
||||
{"messages": [HumanMessage(content=message)]}
|
||||
)
|
||||
messages = result["messages"]
|
||||
for msg in reversed(messages):
|
||||
if hasattr(msg, "content") and msg.content and not getattr(msg, "tool_calls", None):
|
||||
return str(msg.content)
|
||||
return ""
|
||||
@@ -1,222 +0,0 @@
|
||||
"""Execution Plan generator — builder, template registry, and LRU plan cache."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import OrderedDict
|
||||
from typing import Any
|
||||
|
||||
from app.schemas import ExecutionPlan, PlanStep
|
||||
|
||||
|
||||
# ── Prompt Template Registry ──────────────────────────────────────────
|
||||
|
||||
|
||||
class PromptTemplateRegistry:
|
||||
"""Server-side store mapping template IDs to prompt text.
|
||||
|
||||
Clients only ever receive template IDs (e.g. ``"tpl_task_agent_default"``).
|
||||
The actual prompt text is resolved here on the server, keeping prompt IP
|
||||
out of API responses.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._templates: dict[str, str] = {}
|
||||
|
||||
def register(self, template_id: str, prompt_text: str) -> None:
|
||||
self._templates[template_id] = prompt_text
|
||||
|
||||
def get(self, template_id: str) -> str:
|
||||
"""Resolve a template ID to its prompt text.
|
||||
|
||||
Raises ``KeyError`` if the template is not registered.
|
||||
"""
|
||||
text = self._templates.get(template_id)
|
||||
if text is None:
|
||||
raise KeyError(f"Template not found: {template_id!r}")
|
||||
return text
|
||||
|
||||
def has(self, template_id: str) -> bool:
|
||||
return template_id in self._templates
|
||||
|
||||
def list_ids(self) -> list[str]:
|
||||
"""Return all registered template IDs (never the text)."""
|
||||
return list(self._templates.keys())
|
||||
|
||||
|
||||
# ── Execution Plan Builder ────────────────────────────────────────────
|
||||
|
||||
|
||||
class ExecutionPlanBuilder:
|
||||
"""Fluent builder for ``ExecutionPlan`` objects.
|
||||
|
||||
Example::
|
||||
|
||||
plan = (
|
||||
ExecutionPlanBuilder("task_agent")
|
||||
.add_llm_step("tpl_task_agent_default", {"message": user_msg})
|
||||
.add_data_step("create_record", data_from_step=0)
|
||||
.build()
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, agent: str) -> None:
|
||||
self._agent = agent
|
||||
self._steps: list[PlanStep] = []
|
||||
|
||||
# ── step adders ──────────────────────────────────────────────────
|
||||
|
||||
def add_step(
|
||||
self, action: str, params: dict[str, Any] | None = None
|
||||
) -> ExecutionPlanBuilder:
|
||||
"""Append a generic action step with optional parameters."""
|
||||
self._steps.append(PlanStep(action=action, variables=params))
|
||||
return self
|
||||
|
||||
def add_llm_step(
|
||||
self, template_id: str, variables: dict[str, Any] | None = None
|
||||
) -> ExecutionPlanBuilder:
|
||||
"""Append an LLM step referencing a server-side template by ID."""
|
||||
self._steps.append(
|
||||
PlanStep(action="llm", prompt_template=template_id, variables=variables)
|
||||
)
|
||||
return self
|
||||
|
||||
def add_data_step(self, action: str, data_from_step: int) -> ExecutionPlanBuilder:
|
||||
"""Append a step whose input comes from the output of an earlier step."""
|
||||
self._steps.append(PlanStep(action=action, data_from_step=data_from_step))
|
||||
return self
|
||||
|
||||
# ── build ────────────────────────────────────────────────────────
|
||||
|
||||
def build(self) -> ExecutionPlan:
|
||||
"""Validate step references and return the ``ExecutionPlan``.
|
||||
|
||||
Raises ``ValueError`` if any ``data_from_step`` references a
|
||||
non-existent or future step index.
|
||||
"""
|
||||
for i, step in enumerate(self._steps):
|
||||
if step.data_from_step is not None:
|
||||
if not (0 <= step.data_from_step < i):
|
||||
raise ValueError(
|
||||
f"Step {i}: data_from_step={step.data_from_step} must "
|
||||
f"reference a preceding step index in range 0..{i - 1}"
|
||||
)
|
||||
return ExecutionPlan(agent=self._agent, steps=list(self._steps))
|
||||
|
||||
|
||||
# ── Plan Cache (LRU) ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class PlanCache:
|
||||
"""In-memory LRU cache for ``ExecutionPlan`` objects.
|
||||
|
||||
Plans stored here are accessible as playbooks via ``get_all_playbooks()``.
|
||||
The cache also serves as a runtime memoisation layer so that repeated
|
||||
identical intent classifications can skip re-building the plan.
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int = 1000) -> None:
|
||||
self._maxsize = maxsize
|
||||
self._cache: OrderedDict[str, ExecutionPlan] = OrderedDict()
|
||||
|
||||
def cache_plan(self, key: str, plan: ExecutionPlan) -> None:
|
||||
"""Store *plan* under *key*, evicting the LRU entry if at capacity."""
|
||||
if key in self._cache:
|
||||
del self._cache[key] # remove so re-insertion places it at the end
|
||||
elif len(self._cache) >= self._maxsize:
|
||||
self._cache.popitem(last=False) # evict least-recently-used
|
||||
self._cache[key] = plan
|
||||
|
||||
def get_plan(self, key: str) -> ExecutionPlan | None:
|
||||
"""Return the cached plan for *key*, or ``None`` if not present.
|
||||
|
||||
Accessing a plan marks it as most-recently used.
|
||||
"""
|
||||
if key not in self._cache:
|
||||
return None
|
||||
self._cache.move_to_end(key)
|
||||
return self._cache[key]
|
||||
|
||||
def get_all_playbooks(self) -> list[ExecutionPlan]:
|
||||
"""Return all cached plans (most-recently used last)."""
|
||||
return list(self._cache.values())
|
||||
|
||||
|
||||
# ── Module-level singletons ───────────────────────────────────────────
|
||||
|
||||
template_registry = PromptTemplateRegistry()
|
||||
plan_cache = PlanCache()
|
||||
|
||||
|
||||
def _register_builtin_templates() -> None:
|
||||
"""Register the built-in server-side prompt templates.
|
||||
|
||||
These strings never leave the server. Clients only receive the IDs.
|
||||
"""
|
||||
_tpls: dict[str, str] = {
|
||||
"tpl_task_agent_default": (
|
||||
"You are a task management assistant. Help the user create, update, "
|
||||
"list, and track tasks. Use correct status values (todo, in_progress, "
|
||||
"done) and priority values (high, medium, low) from the workspace model."
|
||||
),
|
||||
"tpl_timeline_agent_default": (
|
||||
"You are a project timeline assistant. Help the user create and manage "
|
||||
"milestone timelines on their projects. Every timeline requires a "
|
||||
"project_id and a date expressed as a Unix timestamp in milliseconds."
|
||||
),
|
||||
"tpl_project_agent_default": (
|
||||
"You are a project management assistant. Help the user create, find, "
|
||||
"update, and archive projects. Projects have a name, an optional client, "
|
||||
"and a status of either active or archived."
|
||||
),
|
||||
"tpl_note_agent_default": (
|
||||
"You are a note-taking assistant. Help the user create, retrieve, update, "
|
||||
"and delete Markdown notes. Notes can optionally be linked to a project."
|
||||
),
|
||||
"tpl_task_extract_from_project": (
|
||||
"Extract all actionable tasks from the provided project context. "
|
||||
"Return a structured list of tasks, each with a title, inferred priority "
|
||||
"(high, medium, or low), suggested status (todo), and a due_date in "
|
||||
"milliseconds where a deadline can be inferred."
|
||||
),
|
||||
"tpl_note_weekly_summary": (
|
||||
"Generate a weekly project summary note from the provided workspace data. "
|
||||
"Include: tasks completed this week, tasks due soon, active projects, "
|
||||
"and upcoming timelines. Format the output as clean Markdown."
|
||||
),
|
||||
}
|
||||
for tid, text in _tpls.items():
|
||||
template_registry.register(tid, text)
|
||||
|
||||
|
||||
def _load_playbooks() -> None:
|
||||
"""Pre-build and cache the built-in playbooks."""
|
||||
playbooks: list[tuple[str, ExecutionPlan]] = [
|
||||
(
|
||||
"create_tasks_from_project",
|
||||
ExecutionPlanBuilder("project_agent")
|
||||
.add_llm_step(
|
||||
"tpl_task_extract_from_project",
|
||||
{"source": "project_context"},
|
||||
)
|
||||
.add_data_step("create_record", data_from_step=0)
|
||||
.build(),
|
||||
),
|
||||
(
|
||||
"generate_weekly_note",
|
||||
ExecutionPlanBuilder("note_agent")
|
||||
.add_llm_step(
|
||||
"tpl_note_weekly_summary",
|
||||
{"period": "last_7_days"},
|
||||
)
|
||||
.add_data_step("create_record", data_from_step=0)
|
||||
.build(),
|
||||
),
|
||||
]
|
||||
for key, plan in playbooks:
|
||||
plan_cache.cache_plan(key, plan)
|
||||
|
||||
|
||||
# Initialise on module load
|
||||
_register_builtin_templates()
|
||||
_load_playbooks()
|
||||
@@ -1,6 +1,6 @@
|
||||
"""LLM factory — centralised model instantiation via LiteLLM.
|
||||
|
||||
Every agent and the orchestrator call ``get_llm()`` or ``get_router_llm()``
|
||||
Every agent and the deep-agent supervisors call ``get_llm()`` or ``get_router_llm()``
|
||||
instead of directly constructing a provider-specific class. The model string
|
||||
follows the `LiteLLM model naming convention
|
||||
<https://docs.litellm.ai/docs/providers>`_:
|
||||
|
||||
@@ -43,7 +43,7 @@ _PROACTIVE_CONFIDENCE_THRESHOLD = 0.6
|
||||
|
||||
|
||||
class MemoryMiddleware:
|
||||
"""Enrich orchestrator context with memory and persist interactions after."""
|
||||
"""Enrich agent context with memory and persist interactions after."""
|
||||
|
||||
def __init__(self, db: AsyncSession) -> None:
|
||||
self._db = db
|
||||
@@ -51,7 +51,7 @@ class MemoryMiddleware:
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
async def enrich_context(self, user_id: str, message: str) -> dict[str, Any]:
|
||||
"""Build memory context dict to inject into the orchestrator before LLM call.
|
||||
"""Build memory context dict to inject into the agent before LLM call.
|
||||
|
||||
Returns a dict with keys:
|
||||
core_memory — {key: plaintext_value, ...}
|
||||
|
||||
@@ -1,210 +0,0 @@
|
||||
"""Orchestrator — LLM-based intent router and agent pipeline."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
|
||||
from app.core.agent_registry import AgentRegistry, ChatAgent
|
||||
from app.core.llm import get_router_llm
|
||||
from app.core.agent_registry import registry as _default_registry
|
||||
from app.schemas import ChatRequest, ChatResponse, ExecutionPlan
|
||||
|
||||
_FALLBACK_AGENT = "task_agent"
|
||||
|
||||
_CLASSIFY_SYSTEM = (
|
||||
"You are an intent classifier. Given the user message and context, decide "
|
||||
"which agent to route to.\n"
|
||||
"Available agents: {agents}\n"
|
||||
"Respond with just the agent name, nothing else."
|
||||
)
|
||||
|
||||
_SYNTHESIZE_HUMAN = (
|
||||
"Combine the following agent results into one coherent response.\n\n"
|
||||
"Agent results:\n{results}\n\n"
|
||||
"Original message: {message}"
|
||||
)
|
||||
|
||||
|
||||
def _make_llm():
|
||||
return get_router_llm()
|
||||
|
||||
|
||||
async def classify_intent(
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
reg: AgentRegistry,
|
||||
) -> str:
|
||||
"""Use gpt-4o-mini to classify intent and return the matching agent name.
|
||||
|
||||
Falls back to ``task_agent`` when the registry is empty or the model
|
||||
returns a name that is not registered.
|
||||
"""
|
||||
agents = reg.list_agents()
|
||||
if not agents:
|
||||
return _FALLBACK_AGENT
|
||||
|
||||
system = _CLASSIFY_SYSTEM.format(agents=json.dumps(agents))
|
||||
# Truncate context to keep the classification prompt short
|
||||
human = f"Message: {message}\nContext summary: {json.dumps(context)[:500]}"
|
||||
|
||||
llm = _make_llm()
|
||||
response = await llm.ainvoke(
|
||||
[SystemMessage(content=system), HumanMessage(content=human)]
|
||||
)
|
||||
|
||||
agent_name = str(response.content).strip().lower()
|
||||
known = {a["name"] for a in agents}
|
||||
return agent_name if agent_name in known else _FALLBACK_AGENT
|
||||
|
||||
|
||||
async def route_single(
|
||||
agent_name: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
reg: AgentRegistry,
|
||||
) -> ChatResponse:
|
||||
"""Route to a single agent and wrap the result in a ``ChatResponse``."""
|
||||
response_text = await reg.call_agent(agent_name, message, context)
|
||||
return ChatResponse(response=response_text)
|
||||
|
||||
|
||||
async def route_pipeline(
|
||||
agent_names: list[str],
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
reg: AgentRegistry,
|
||||
) -> ChatResponse:
|
||||
"""Execute agents sequentially; each agent receives previous results in context.
|
||||
|
||||
A final LLM synthesis call merges all results into one coherent response.
|
||||
"""
|
||||
previous_results: list[str] = []
|
||||
|
||||
for agent_name in agent_names:
|
||||
ctx = {**context, "previous_results": list(previous_results)}
|
||||
result = await reg.call_agent(agent_name, message, ctx)
|
||||
previous_results.append(result)
|
||||
|
||||
results_str = "\n\n".join(
|
||||
f"[{name}]: {res}" for name, res in zip(agent_names, previous_results)
|
||||
)
|
||||
human = _SYNTHESIZE_HUMAN.format(results=results_str, message=message)
|
||||
llm = _make_llm()
|
||||
synthesis = await llm.ainvoke([HumanMessage(content=human)])
|
||||
return ChatResponse(response=str(synthesis.content))
|
||||
|
||||
|
||||
def _build_plan(agent_name: str, message: str) -> ExecutionPlan:
|
||||
"""Build an ``ExecutionPlan`` for the resolved agent.
|
||||
|
||||
Uses ``ExecutionPlanBuilder`` with the server-side template registry.
|
||||
If a default template exists for the agent, an LLM step is emitted;
|
||||
otherwise a plain ``handle`` action step is used.
|
||||
"""
|
||||
from app.core.execution_plan import ExecutionPlanBuilder, template_registry
|
||||
|
||||
template_id = f"tpl_{agent_name}_default"
|
||||
builder = ExecutionPlanBuilder(agent_name)
|
||||
if template_registry.has(template_id):
|
||||
builder.add_llm_step(template_id, {"message": message})
|
||||
else:
|
||||
builder.add_step("handle", {"message": message})
|
||||
return builder.build()
|
||||
|
||||
|
||||
async def orchestrate(
|
||||
request: ChatRequest,
|
||||
reg: AgentRegistry | None = None,
|
||||
) -> ChatResponse | ExecutionPlan:
|
||||
"""Main orchestration entry point.
|
||||
|
||||
* Classifies the user's intent to select an agent.
|
||||
* ``execution_mode == 'direct'``: routes to the agent and returns a
|
||||
``ChatResponse``.
|
||||
* ``execution_mode == 'plan'``: returns an ``ExecutionPlan`` with the
|
||||
resolved agent and a template-ID-only step (prompt IP stays server-side).
|
||||
"""
|
||||
if reg is None:
|
||||
reg = _default_registry
|
||||
|
||||
context = request.context.model_dump()
|
||||
agent_name = await classify_intent(request.message, context, reg)
|
||||
|
||||
if request.execution_mode == "direct":
|
||||
return await route_single(agent_name, request.message, context, reg)
|
||||
|
||||
# plan mode — return plan, do not execute
|
||||
return _build_plan(agent_name, request.message)
|
||||
|
||||
|
||||
async def orchestrate_v3(
|
||||
user_id: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
reg: AgentRegistry | None = None,
|
||||
) -> tuple[str, ChatAgent]:
|
||||
"""v3 orchestration — returns (agent_name, agent_instance); caller drives execution.
|
||||
|
||||
Classifies intent and instantiates the matching agent. The caller is responsible
|
||||
for invoking handle(), handle_stream(), or _tool_loop_stream() as needed.
|
||||
"""
|
||||
if reg is None:
|
||||
reg = _default_registry
|
||||
agent_name = await classify_intent(message, context, reg)
|
||||
return agent_name, reg.get(agent_name)
|
||||
|
||||
|
||||
async def orchestrate_v3_stream(
|
||||
user_id: str,
|
||||
message: str,
|
||||
context: dict[str, Any],
|
||||
reg: AgentRegistry | None = None,
|
||||
agent_holder: list | None = None,
|
||||
) -> AsyncGenerator[tuple[str, str], None]:
|
||||
"""v3 streaming orchestration — yields (agent_name, token) pairs.
|
||||
|
||||
The first yield always carries the agent_name with an empty token so that
|
||||
callers (e.g. FloatingFormatter) can detect the routing domain before any text
|
||||
tokens arrive.
|
||||
|
||||
If *agent_holder* is provided (a list), the agent instance is appended so
|
||||
callers can access ``agent.tool_results`` after the stream completes.
|
||||
"""
|
||||
if reg is None:
|
||||
reg = _default_registry
|
||||
agent_name = await classify_intent(message, context, reg)
|
||||
agent = reg.get(agent_name)
|
||||
if agent_holder is not None:
|
||||
agent_holder.append(agent)
|
||||
yield agent_name, "" # domain signal — no token yet
|
||||
async for token in agent.handle_stream(message, context):
|
||||
yield agent_name, token
|
||||
|
||||
|
||||
async def orchestrate_stream(
|
||||
request: ChatRequest,
|
||||
reg: AgentRegistry | None = None,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Streaming orchestration — yields plain text chunks only.
|
||||
|
||||
The WebSocket handler in ``app/api/routes/chat.py`` is responsible for
|
||||
wrapping each chunk in a ``text_chunk`` frame and sending the final
|
||||
``final`` frame once the generator is exhausted.
|
||||
|
||||
Agents do not yet support token-level streaming; the full response is
|
||||
fetched first (which may involve multiple WS round-trips for tool calls),
|
||||
then emitted in fixed-size chunks.
|
||||
"""
|
||||
if reg is None:
|
||||
reg = _default_registry
|
||||
|
||||
context = request.context.model_dump()
|
||||
agent_name = await classify_intent(request.message, context, reg)
|
||||
response_text = await reg.call_agent(agent_name, request.message, context)
|
||||
|
||||
chunk_size = 50
|
||||
for i in range(0, len(response_text), chunk_size):
|
||||
yield response_text[i : i + chunk_size]
|
||||
@@ -1,12 +1,23 @@
|
||||
"""Output Formatter — transforms orchestrator token streams into WS frame sequences.
|
||||
"""Output Formatter — transforms deep-agent event streams into WS frame sequences.
|
||||
|
||||
HomeFormatter: produces stream_start, stream_text / stream_block, stream_end
|
||||
FloatingFormatter: produces floating_domain, stream_text, stream_end
|
||||
Consumes ``(event_type, data)`` tuples yielded by ``deep_agent.run_*_stream()``:
|
||||
* ``("token", str)`` — supervisor text token
|
||||
* ``("tool_end", dict)`` — sub-agent finished: ``{name, result}``
|
||||
* ``("mutations", list)`` — collected CRUD mutations for ``stream_end``
|
||||
|
||||
HomeFormatter:
|
||||
* Sniffs ``tool_end`` events → emits ``WsStreamBlock`` (entity_ref with raw data)
|
||||
* Streams text tokens → emits ``WsStreamText``
|
||||
* Attaches mutations → injects into ``WsStreamEnd``
|
||||
|
||||
FloatingFormatter:
|
||||
* Sniffs first ``tool_end`` name → emits ``WsFloatingDomain``
|
||||
* Streams text tokens → emits ``WsStreamText``
|
||||
* Attaches mutations → injects into ``WsStreamEnd``
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
@@ -21,10 +32,7 @@ from app.schemas import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Valid chart types (matching shadcn/ui Recharts wrappers in Electron)
|
||||
_VALID_CHART_TYPES = {"area", "bar", "line", "pie", "radar", "radial"}
|
||||
|
||||
# Map agent name → floating domain
|
||||
# Map sub-agent tool name → floating domain / entity type
|
||||
_AGENT_DOMAIN: dict[str, str] = {
|
||||
"task_agent": "tasks",
|
||||
"timeline_agent": "timelines",
|
||||
@@ -36,180 +44,74 @@ WsFrame = WsStreamStart | WsStreamText | WsStreamBlock | WsStreamEnd | WsFloatin
|
||||
|
||||
|
||||
class HomeFormatter:
|
||||
"""Parses a token stream from orchestrate_v3_stream and yields WS frames.
|
||||
"""Consumes a deep-agent event stream and yields WS frames for the Home view.
|
||||
|
||||
The LLM is expected to output a newline-delimited sequence of JSON objects,
|
||||
each with a ``type`` field:
|
||||
- ``text`` → yields WsStreamText immediately (word-by-word)
|
||||
- ``chart`` → buffers full JSON, validates, yields WsStreamBlock
|
||||
- ``entity_ref`` → resolves from tool_results, yields WsStreamBlock
|
||||
- ``table`` → buffers full JSON, validates, yields WsStreamBlock
|
||||
- ``timeline`` → buffers full JSON, validates, yields WsStreamBlock
|
||||
|
||||
Invalid or unknown blocks are logged and skipped — stream never crashes.
|
||||
"""
|
||||
|
||||
def __init__(self, request_id: str, tool_results: list[dict]) -> None:
|
||||
self.request_id = request_id
|
||||
self.tool_results = tool_results
|
||||
|
||||
async def format(
|
||||
self,
|
||||
token_stream: AsyncGenerator[tuple[str, str], None],
|
||||
) -> AsyncGenerator[WsFrame, None]:
|
||||
yield WsStreamStart(request_id=self.request_id)
|
||||
|
||||
buffer = ""
|
||||
async for _agent_name, token in token_stream:
|
||||
if not token:
|
||||
continue
|
||||
buffer += token
|
||||
# Flush any complete JSON objects from the buffer
|
||||
async for frame in self._flush_complete_objects(buffer):
|
||||
buffer = "" # reset after flush
|
||||
yield frame
|
||||
break # only one flush per iteration; rest accumulates
|
||||
|
||||
# Flush any remaining content
|
||||
if buffer.strip():
|
||||
async for frame in self._flush_complete_objects(buffer, final=True):
|
||||
yield frame
|
||||
|
||||
yield WsStreamEnd(request_id=self.request_id)
|
||||
|
||||
async def _flush_complete_objects(
|
||||
self, text: str, final: bool = False
|
||||
) -> AsyncGenerator[WsFrame, None]:
|
||||
"""Try to parse and yield all complete JSON objects from *text*.
|
||||
|
||||
Yields nothing if text is incomplete JSON (unless *final* is True,
|
||||
in which case remaining text is emitted as plain stream_text).
|
||||
"""
|
||||
remaining = text.strip()
|
||||
while remaining:
|
||||
# Fast path: plain text (not JSON)
|
||||
if not remaining.startswith("{"):
|
||||
# Yield as plain text chunk
|
||||
newline_idx = remaining.find("\n")
|
||||
if newline_idx == -1:
|
||||
if final:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=remaining)
|
||||
remaining = ""
|
||||
else:
|
||||
return # accumulate more
|
||||
else:
|
||||
line = remaining[:newline_idx].strip()
|
||||
remaining = remaining[newline_idx + 1:].strip()
|
||||
if line:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=line)
|
||||
continue
|
||||
|
||||
# Try to decode a JSON object
|
||||
try:
|
||||
obj, end_idx = _try_parse_json(remaining)
|
||||
except ValueError:
|
||||
if final:
|
||||
# Emit as raw text if we can't parse
|
||||
yield WsStreamText(request_id=self.request_id, chunk=remaining)
|
||||
remaining = ""
|
||||
return
|
||||
|
||||
if obj is None:
|
||||
if final:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=remaining)
|
||||
remaining = ""
|
||||
return # incomplete — need more tokens
|
||||
|
||||
remaining = remaining[end_idx:].strip()
|
||||
block_type = obj.get("type")
|
||||
|
||||
frame = self._dispatch_block(obj, block_type)
|
||||
if frame is not None:
|
||||
yield frame
|
||||
|
||||
def _dispatch_block(self, obj: dict, block_type: str | None) -> WsFrame | None:
|
||||
if block_type == "text":
|
||||
content = obj.get("content", "")
|
||||
if content:
|
||||
return WsStreamText(request_id=self.request_id, chunk=str(content))
|
||||
return None
|
||||
|
||||
if block_type == "chart":
|
||||
chart_type = obj.get("chartType")
|
||||
if chart_type not in _VALID_CHART_TYPES:
|
||||
logger.warning("HomeFormatter: invalid chartType=%r — skipping", chart_type)
|
||||
return None
|
||||
if not isinstance(obj.get("data"), list):
|
||||
logger.warning("HomeFormatter: chart missing data array — skipping")
|
||||
return None
|
||||
return WsStreamBlock(
|
||||
request_id=self.request_id,
|
||||
block_type="chart",
|
||||
data=obj,
|
||||
)
|
||||
|
||||
if block_type == "entity_ref":
|
||||
entity = obj.get("entity")
|
||||
resolved = self._resolve_entity(entity)
|
||||
if resolved is None:
|
||||
logger.warning("HomeFormatter: entity_ref %r not found in tool_results — skipping", entity)
|
||||
return None
|
||||
return WsStreamBlock(
|
||||
request_id=self.request_id,
|
||||
block_type="entity_ref",
|
||||
data={"entity": entity, "items": resolved},
|
||||
)
|
||||
|
||||
if block_type == "table":
|
||||
if not isinstance(obj.get("headers"), list) or not isinstance(obj.get("rows"), list):
|
||||
logger.warning("HomeFormatter: table missing headers/rows — skipping")
|
||||
return None
|
||||
return WsStreamBlock(
|
||||
request_id=self.request_id,
|
||||
block_type="table",
|
||||
data=obj,
|
||||
)
|
||||
|
||||
if block_type == "timeline":
|
||||
if not isinstance(obj.get("timelines"), list):
|
||||
logger.warning("HomeFormatter: timeline missing timelines — skipping")
|
||||
return None
|
||||
return WsStreamBlock(
|
||||
request_id=self.request_id,
|
||||
block_type="timeline",
|
||||
data=obj,
|
||||
)
|
||||
|
||||
logger.warning("HomeFormatter: unknown block type=%r — skipping", block_type)
|
||||
return None
|
||||
|
||||
def _resolve_entity(self, entity: str | None) -> list[dict] | None:
|
||||
"""Find matching items in tool_results by entity type."""
|
||||
if not entity:
|
||||
return None
|
||||
matches = [r for r in self.tool_results if r.get("entity") == entity]
|
||||
return matches if matches else None
|
||||
|
||||
|
||||
class FloatingFormatter:
|
||||
"""Parses a token stream from orchestrate_v3_stream and yields WS frames.
|
||||
|
||||
Emits floating_domain immediately (from agent_name), then streams all tokens
|
||||
as plain stream_text — no block parsing for floating context.
|
||||
``tool_end`` events from sub-agents are emitted as ``WsStreamBlock``
|
||||
(entity_ref) so the client can render structured data. Text tokens are
|
||||
forwarded as ``WsStreamText``. Mutations are attached to ``WsStreamEnd``.
|
||||
"""
|
||||
|
||||
def __init__(self, request_id: str) -> None:
|
||||
self.request_id = request_id
|
||||
self._mutations: list[dict] = []
|
||||
|
||||
async def format(
|
||||
self,
|
||||
token_stream: AsyncGenerator[tuple[str, str], None],
|
||||
event_stream: AsyncGenerator[tuple[str, Any], None],
|
||||
) -> AsyncGenerator[WsFrame, None]:
|
||||
yield WsStreamStart(request_id=self.request_id)
|
||||
|
||||
async for event_type, data in event_stream:
|
||||
if event_type == "token":
|
||||
if data:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=data)
|
||||
|
||||
elif event_type == "tool_end":
|
||||
# Sub-agent finished — emit its result as an entity_ref block
|
||||
name = data.get("name", "")
|
||||
entity = _AGENT_DOMAIN.get(name)
|
||||
if entity:
|
||||
yield WsStreamBlock(
|
||||
request_id=self.request_id,
|
||||
block_type="entity_ref",
|
||||
data={"entity": entity, "result": data.get("result", "")},
|
||||
)
|
||||
|
||||
elif event_type == "mutations":
|
||||
self._mutations = data or []
|
||||
|
||||
yield WsStreamEnd(
|
||||
request_id=self.request_id,
|
||||
mutations=[
|
||||
{"action": m["action"], "table": m["table"], "data": m["data"]}
|
||||
for m in self._mutations
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class FloatingFormatter:
|
||||
"""Consumes a deep-agent event stream and yields WS frames for the Floating view.
|
||||
|
||||
Sniffs the first ``tool_end`` event name to derive the domain (e.g.
|
||||
``task_agent`` → ``"tasks"``), then streams text tokens as plain
|
||||
``WsStreamText``. No block parsing for floating context.
|
||||
"""
|
||||
|
||||
def __init__(self, request_id: str) -> None:
|
||||
self.request_id = request_id
|
||||
self._mutations: list[dict] = []
|
||||
|
||||
async def format(
|
||||
self,
|
||||
event_stream: AsyncGenerator[tuple[str, Any], None],
|
||||
) -> AsyncGenerator[WsFrame, None]:
|
||||
domain_sent = False
|
||||
|
||||
async for agent_name, token in token_stream:
|
||||
if not domain_sent:
|
||||
domain = _AGENT_DOMAIN.get(agent_name, "tasks")
|
||||
async for event_type, data in event_stream:
|
||||
if event_type == "tool_end" and not domain_sent:
|
||||
# Sniff domain from the first sub-agent that completes
|
||||
name = data.get("name", "")
|
||||
domain = _AGENT_DOMAIN.get(name, "tasks")
|
||||
yield WsFloatingDomain(
|
||||
request_id=self.request_id,
|
||||
domain=domain, # type: ignore[arg-type]
|
||||
@@ -217,28 +119,33 @@ class FloatingFormatter:
|
||||
yield WsStreamStart(request_id=self.request_id)
|
||||
domain_sent = True
|
||||
|
||||
if token:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=token)
|
||||
elif event_type == "token":
|
||||
if not domain_sent:
|
||||
# First token arrived before any tool_end — default domain
|
||||
yield WsFloatingDomain(
|
||||
request_id=self.request_id,
|
||||
domain="tasks", # type: ignore[arg-type]
|
||||
)
|
||||
yield WsStreamStart(request_id=self.request_id)
|
||||
domain_sent = True
|
||||
if data:
|
||||
yield WsStreamText(request_id=self.request_id, chunk=data)
|
||||
|
||||
yield WsStreamEnd(request_id=self.request_id)
|
||||
elif event_type == "mutations":
|
||||
self._mutations = data or []
|
||||
|
||||
# If no events triggered domain_sent (edge case), still emit structure
|
||||
if not domain_sent:
|
||||
yield WsFloatingDomain(
|
||||
request_id=self.request_id,
|
||||
domain="tasks", # type: ignore[arg-type]
|
||||
)
|
||||
yield WsStreamStart(request_id=self.request_id)
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _try_parse_json(text: str) -> tuple[dict[str, Any] | None, int]:
|
||||
"""Attempt to parse the first complete JSON object from *text*.
|
||||
|
||||
Returns ``(parsed_dict, end_index)`` on success, ``(None, 0)`` when the
|
||||
object is incomplete, and raises ``ValueError`` when text is not JSON.
|
||||
"""
|
||||
decoder = json.JSONDecoder()
|
||||
try:
|
||||
obj, end_idx = decoder.raw_decode(text)
|
||||
if not isinstance(obj, dict):
|
||||
raise ValueError("Expected JSON object")
|
||||
return obj, end_idx
|
||||
except json.JSONDecodeError as exc:
|
||||
# Incomplete JSON — need more tokens
|
||||
if "Unterminated" in str(exc) or exc.pos == len(text):
|
||||
return None, 0
|
||||
raise ValueError(str(exc)) from exc
|
||||
yield WsStreamEnd(
|
||||
request_id=self.request_id,
|
||||
mutations=[
|
||||
{"action": m["action"], "table": m["table"], "data": m["data"]}
|
||||
for m in self._mutations
|
||||
],
|
||||
)
|
||||
|
||||
@@ -7,18 +7,21 @@ The callback sends a `tool_call` WS frame and awaits the `tool_result`.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from contextvars import ContextVar
|
||||
from typing import Any, Callable, Coroutine
|
||||
from uuid import uuid4
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Holds the execute callback for the current WS session.
|
||||
# Set by the chat WS handler before the orchestrator runs; cleared after.
|
||||
# Set by the chat WS handler before the deep agent runs; cleared after.
|
||||
_client_executor: ContextVar[Callable[[dict], Coroutine[Any, Any, dict]]] = ContextVar(
|
||||
"_client_executor"
|
||||
)
|
||||
|
||||
# Optional collector that captures raw execute_on_client results.
|
||||
# Set by _tool_loop / _tool_loop_stream to populate ChatAgent.tool_results.
|
||||
# Set by the deep agent tool loop to capture CRUD mutations.
|
||||
_tool_result_collector: ContextVar[list[dict] | None] = ContextVar(
|
||||
"_tool_result_collector", default=None
|
||||
)
|
||||
@@ -81,7 +84,12 @@ async def execute_on_client(
|
||||
if limit is not None:
|
||||
payload["limit"] = limit
|
||||
|
||||
logger.info("execute_on_client: sending payload action=%s table=%s id=%s", action, table, payload["id"])
|
||||
result = await callback(payload)
|
||||
if result is None:
|
||||
logger.error("execute_on_client: callback returned None for action=%s table=%s id=%s", action, table, payload["id"])
|
||||
else:
|
||||
logger.info("execute_on_client: got result type=%s keys=%s", type(result).__name__, list(result.keys()) if isinstance(result, dict) else "N/A")
|
||||
collector = _tool_result_collector.get(None)
|
||||
if collector is not None:
|
||||
collector.append({
|
||||
|
||||
@@ -18,10 +18,7 @@ from app.config.settings import settings
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# Startup: initialise DB connection pool and agent registry
|
||||
from app.core.agent_registry import registry # noqa: F401 — triggers module load
|
||||
import app.agents # noqa: F401 — triggers @registry.register decorators
|
||||
|
||||
# Startup: initialise DB connection pool
|
||||
yield
|
||||
|
||||
# Shutdown: dispose SQLAlchemy connection pool
|
||||
@@ -51,11 +48,10 @@ def create_app() -> FastAPI:
|
||||
app.add_middleware(SanitizerMiddleware)
|
||||
app.add_middleware(TierRateLimitMiddleware)
|
||||
|
||||
from app.api.routes import agent_setup, agents, auth, backup, billing, chat, device_ws, plans, plugins, storage, vectors
|
||||
from app.api.routes import agent_setup, agents, auth, backup, billing, chat, device_ws, plugins, storage, vectors
|
||||
|
||||
app.include_router(auth.router, prefix="/api/v1")
|
||||
app.include_router(chat.router, prefix="/api/v1")
|
||||
app.include_router(plans.router, prefix="/api/v1")
|
||||
app.include_router(storage.router, prefix="/api/v1")
|
||||
app.include_router(vectors.router, prefix="/api/v1")
|
||||
app.include_router(backup.router, prefix="/api/v1")
|
||||
|
||||
@@ -41,41 +41,13 @@ class ChatContext(BaseModel):
|
||||
conversation_history: list[dict[str, Any]] = Field(default_factory=list)
|
||||
|
||||
|
||||
class PlanAction(BaseModel):
|
||||
type: Literal[
|
||||
"create_record",
|
||||
"update_record",
|
||||
"delete_record",
|
||||
"index_document",
|
||||
"send_notification",
|
||||
]
|
||||
table: str | None = None
|
||||
data: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
message: str
|
||||
context: ChatContext = Field(default_factory=ChatContext)
|
||||
execution_mode: Literal["direct", "plan"] = "direct"
|
||||
|
||||
|
||||
class ChatResponse(BaseModel):
|
||||
response: str
|
||||
actions: list[PlanAction] = Field(default_factory=list)
|
||||
|
||||
|
||||
# ── Execution Plans ──────────────────────────────────────────────────
|
||||
|
||||
class PlanStep(BaseModel):
|
||||
action: str
|
||||
prompt_template: str | None = None
|
||||
variables: dict[str, Any] | None = None
|
||||
data_from_step: int | None = None
|
||||
|
||||
|
||||
class ExecutionPlan(BaseModel):
|
||||
agent: str
|
||||
steps: list[PlanStep] = Field(default_factory=list)
|
||||
|
||||
|
||||
# ── Backup ───────────────────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user