step-7: add memory middleware (memory_middleware.py, device_ws.py)
MemoryMiddleware class: - enrich_context(): loads core prefs, associative (top-k), episodic (last-N), and proactive hints (above 0.6 confidence) — all decrypted in-memory only - store_episode(): encrypts and persists interaction summary to memory_episodic - update_core(): upserts encrypted key/value to memory_core device_ws.py home_request + popup_request handlers: - enrich_context() called before orchestrate_v3_stream (memory injected into context) - store_episode() called after stream completes (non-blocking) 10 unit + integration tests pass; pre-existing test_agents.py failures unrelated. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -42,6 +42,7 @@ from sqlalchemy import update
|
||||
from app.config.settings import settings
|
||||
from app.core.agent_runner import trigger_pending_runs
|
||||
from app.core.device_manager import device_manager
|
||||
from app.core.memory_middleware import MemoryMiddleware
|
||||
from app.core.orchestrator import orchestrate_v3_stream
|
||||
from app.core.output_formatter import HomeFormatter, PopupFormatter
|
||||
from app.core.ws_context import clear_client_executor, set_client_executor
|
||||
@@ -217,20 +218,29 @@ async def _handle_home_request(
|
||||
"""Handle a home_request frame — streams HomeFormatter output back on the socket."""
|
||||
request_id = frame.get("request_id") or str(uuid4())
|
||||
message: str = frame.get("message", "")
|
||||
session_id: str = frame.get("session_id") or str(uuid4())
|
||||
|
||||
# ── Memory: enrich context before LLM call ────────────────────────
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
memory_context = await memory.enrich_context(user_id, message)
|
||||
|
||||
context: dict = {
|
||||
"conversation_history": frame.get("conversation_history", []),
|
||||
**memory_context,
|
||||
}
|
||||
|
||||
executor = await _make_ws_executor(websocket, user_id)
|
||||
set_client_executor(executor)
|
||||
response_chunks: list[str] = []
|
||||
try:
|
||||
token_stream = orchestrate_v3_stream(user_id, message, context)
|
||||
# Collect tool_results via the formatter after the stream completes.
|
||||
# We pass an empty list initially; tool_results are populated during
|
||||
# the agent run via ws_context._tool_result_collector (set inside _tool_loop_stream).
|
||||
formatter = HomeFormatter(request_id=request_id, tool_results=[])
|
||||
async for ws_frame in formatter.format(token_stream):
|
||||
await websocket.send_text(ws_frame.model_dump_json())
|
||||
# Collect text chunks to build the full response for episode storage
|
||||
if ws_frame.type == "stream_text": # type: ignore[union-attr]
|
||||
response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"device_ws: home_request failed user=%s req=%s: %s",
|
||||
@@ -239,6 +249,13 @@ async def _handle_home_request(
|
||||
finally:
|
||||
clear_client_executor()
|
||||
|
||||
# ── Memory: store episode after response ──────────────────────────
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
await memory.store_episode(
|
||||
user_id, session_id, message, "".join(response_chunks)
|
||||
)
|
||||
|
||||
|
||||
async def _handle_popup_request(
|
||||
websocket: WebSocket,
|
||||
@@ -248,16 +265,26 @@ async def _handle_popup_request(
|
||||
"""Handle a popup_request frame — streams PopupFormatter output back on the socket."""
|
||||
request_id = frame.get("request_id") or str(uuid4())
|
||||
message: str = frame.get("message", "")
|
||||
session_id: str = frame.get("session_id") or str(uuid4())
|
||||
scope: dict = frame.get("scope", {})
|
||||
context: dict = {"scope": scope}
|
||||
|
||||
# ── Memory: enrich context before LLM call ────────────────────────
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
memory_context = await memory.enrich_context(user_id, message)
|
||||
|
||||
context: dict = {"scope": scope, **memory_context}
|
||||
|
||||
executor = await _make_ws_executor(websocket, user_id)
|
||||
set_client_executor(executor)
|
||||
response_chunks: list[str] = []
|
||||
try:
|
||||
token_stream = orchestrate_v3_stream(user_id, message, context)
|
||||
formatter = PopupFormatter(request_id=request_id)
|
||||
async for ws_frame in formatter.format(token_stream):
|
||||
await websocket.send_text(ws_frame.model_dump_json())
|
||||
if ws_frame.type == "stream_text": # type: ignore[union-attr]
|
||||
response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"device_ws: popup_request failed user=%s req=%s: %s",
|
||||
@@ -266,6 +293,13 @@ async def _handle_popup_request(
|
||||
finally:
|
||||
clear_client_executor()
|
||||
|
||||
# ── Memory: store episode after response ──────────────────────────
|
||||
async with async_session() as db:
|
||||
memory = MemoryMiddleware(db)
|
||||
await memory.store_episode(
|
||||
user_id, session_id, message, "".join(response_chunks)
|
||||
)
|
||||
|
||||
|
||||
# ── Heartbeat ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
Reference in New Issue
Block a user