update memory implementation strategy

This commit is contained in:
2026-03-08 00:47:24 +01:00
parent 45415bb9ee
commit 3b3b3baf25
2 changed files with 290 additions and 107 deletions

View File

@@ -512,113 +512,12 @@ Cloud Agent:
---
## Phase 5 — Shared Memory (Agent KV + Chat WS Fix)
## ~~Phase 5 — Shared Memory~~ (SUPERSEDED)
> **Objective:** Give chat agents persistent memory via a KV store on the Electron client. Agents can `store_memory()` to remember user preferences, patterns, and corrections, and `recall_memories()` to retrieve them. All data lives in Electron's SQLite `agent_memory` table (local-first, never stored server-side). This also requires fixing the chat WS handler to support bidirectional tool calls — currently a critical gap that blocks all agent tools from working over the `/chat/stream` endpoint.
> **This phase has been fully replaced by `V3_MIGRATION_PLAN.md`.**
>
> **Electron Phase 5 plan:** `../adiuva/AI_REFACTOR_PLAN.md` Phase 5 section.
> - Chat WS fix → V3 Step 5 (Unified WS Handler — single multiplexed socket)
> - Agent memory → V3 Steps 67 (Cloud-side MemGPT-style memory in PostgreSQL + pgvector, encrypted at rest with per-user Fernet key)
>
> **Why agent KV matters:** Chat agents are currently stateless — they can't remember "User prefers to-do in lowercase" or "Client X billing cycle is the 15th". With KV memory, agents become learning assistants that improve over time. Users feel the AI "knows them" without any data leaving their device.
>
> **Why the chat WS fix is critical:** The existing `/chat/stream` WS handler (`app/api/routes/chat.py`) never calls `set_client_executor()`. This means `execute_on_client()` raises `RuntimeError` whenever any agent tool tries to call it during a chat session. All 23 tools are broken over chat WS. This must be fixed before memory tools (or any tools) can work.
>
> **New Electron tables** (managed by Electron, accessed by backend via `execute_on_client`):
> - `chat_messages`: `id`, `scope`, `role`, `content`, `error`, `created_at`
> - `agent_memory`: `id`, `agent_name`, `key`, `value`, `scope`, `created_at`, `updated_at` (unique on `agent_name, key, scope`)
### Step 5.1 — Fix chat WS for bidirectional tool calls (PREREQUISITE)
> **This is the highest-priority backend fix.** Without it, zero agent tools work over the chat WS connection.
- [ ] Rewrite `app/api/routes/chat.py` — `chat_stream()` WS handler:
- After auth + accept, receive first frame as `{"type": "chat_request", ...}` (not raw `ChatRequest`)
- Parse frame, extract `message` and `context`
- Set up a local `pending_calls: dict[str, asyncio.Future]` for tool-call round-trips
- Define executor callback:
```python
async def execute_callback(payload: dict) -> dict:
call_id = payload["id"]
fut = asyncio.get_event_loop().create_future()
pending_calls[call_id] = fut
await websocket.send_text(json.dumps({"type": "tool_call", **payload}))
return await asyncio.wait_for(fut, timeout=30.0)
```
- Call `set_client_executor(execute_callback)` before orchestrating
- Run two concurrent tasks:
1. **Receive loop**: dispatches incoming frames — `tool_result` resolves pending Futures, `pong` ignored
2. **Orchestration task**: calls `orchestrate_stream()`, wraps chunks in `{"type": "text_chunk", "text": "..."}` frames, sends `{"type": "final", "response": "..."}` on completion
- Call `clear_client_executor()` in finally block
- Keep heartbeat ping every 30s
- 30s timeout on each `tool_result` — tool returns error string to LLM on timeout
- [ ] Update `orchestrate_stream()` in `app/core/orchestrator.py` if needed:
- Ensure it properly yields text chunks (currently chunks by fixed 50-char slices — consider switching to yielding full response as single chunk for now)
- **Files:** `app/api/routes/chat.py`, `app/core/orchestrator.py`
- **Outcome:** Full bidirectional WS. Tool calls, text streaming, and heartbeats happen concurrently. All 23 existing agent tools now work over chat WS.
### Step 5.2 — Agent memory tools
- [ ] Create `app/agents/tools/memory_tools.py`:
- `create_memory_tools(agent_name: str) -> list[Tool]` — factory function that returns two LangChain `@tool` functions with `agent_name` bound via closure:
- **`store_memory(key: str, value: str, scope: str = "global")`**:
- Calls `execute_on_client(action="select", table="agentMemory", filters={"agentName": agent_name, "key": key, "scope": scope})`
- If row exists: `execute_on_client(action="update", table="agentMemory", data={"id": row["id"], "updates": {"value": value, "updatedAt": <now_ms>}})`
- If not: `execute_on_client(action="insert", table="agentMemory", data={"agentName": agent_name, "key": key, "value": value, "scope": scope})`
- Returns `"Stored memory: [key] = [value]"`
- **`recall_memories(key_pattern: str = None, scope: str = "global", limit: int = 10)`**:
- Calls `execute_on_client(action="select", table="agentMemory", filters={"agentName": agent_name, "scope": scope, "search": key_pattern})`
- Returns formatted list: `"key1: value1\nkey2: value2\n..."` or `"No memories found."`
- Timestamps are Unix milliseconds (consistent with Electron's `Date.now()`)
- Agent name scoping: each agent only sees its own memories (filtered by `agentName`)
- **Files:** `app/agents/tools/memory_tools.py`
- **Outcome:** Two reusable tools any agent can include. Upsert semantics via select-then-insert/update.
### Step 5.3 — Register memory tools on all agents
- [ ] Update `app/agents/task_agent.py`:
- Import `create_memory_tools` from `app/agents/tools/memory_tools`
- Add memory tools to `get_tools()`: `return [list_tasks, create_task, ..., *create_memory_tools("task_agent")]`
- Append to `_SYSTEM_PROMPT`: `"\n\nYou can store important facts about user preferences using store_memory and recall past facts using recall_memories. Store corrections, preferences, and patterns the user shares (e.g. 'User prefers short task titles', 'Default priority is medium'). Always check memories before giving advice."`
- [ ] Update `app/agents/project_agent.py` — same pattern with `create_memory_tools("project_agent")`
- [ ] Update `app/agents/note_agent.py` — same pattern with `create_memory_tools("note_agent")`
- [ ] Update `app/agents/checkpoint_agent.py` — same pattern with `create_memory_tools("checkpoint_agent")`
- **Files:** `app/agents/task_agent.py`, `app/agents/project_agent.py`, `app/agents/note_agent.py`, `app/agents/checkpoint_agent.py`
- **Outcome:** All 4 chat agents can store and recall persistent memories. Each agent's memories are scoped by `agentName`.
### Step 5.4 — Extend ChatContext with agent memories
- [ ] Update `app/schemas.py`:
- Add `agent_memories: list[dict[str, Any]] = Field(default_factory=list)` to `ChatContext`
- These are pre-loaded by Electron (from `agent_memory` table) and included in every request
- [ ] Agent `handle()` methods already receive full `context` dict — memories are visible in `context["agent_memories"]`
- [ ] Agent system prompts reference memories from context: agents see pre-loaded memories AND can call `recall_memories` for targeted lookup
- **Files:** `app/schemas.py`
- **Outcome:** Backend receives pre-loaded memories from Electron. Agents have dual-path access: context injection (passive) + tool call (active).
### Phase 5 — Verification
| # | Scenario | Expected |
|---|---|---|
| 1 | **Chat WS bidirectional** | Connect → send `chat_request` → receive `tool_call` → respond `tool_result` → receive `text_chunk` → `final` |
| 2 | **All existing tools work** | "List my tasks" over chat WS → `tool_call(select, tasks)` → Electron returns rows → LLM responds with real task data |
| 3 | **Store memory** | "Remember that I prefer short task titles" → `store_memory("task_title_preference", "short")` → `tool_call(insert, agentMemory)` → Electron persists |
| 4 | **Recall memory** | New chat session → "How should I name tasks?" → agent sees pre-loaded memory in context or calls `recall_memories` → references stored preference |
| 5 | **Upsert semantics** | Store same key twice → only one row exists with updated value |
| 6 | **Agent scope isolation** | `task_agent` stores memory → `note_agent` cannot see it (filtered by `agentName`) |
| 7 | **Project scope** | Store memory with `scope="project:<uuid>"` → only visible in that project's chat context |
| 8 | **Tool timeout** | Disconnect Electron mid-tool-call → 30s timeout → tool returns error → LLM handles gracefully |
| 9 | **Concurrent tool calls** | Agent calls `list_tasks` then `recall_memories` in sequence → both WS round-trips succeed |
| 10 | **Existing tests pass** | `pytest` — no regressions in agent tools or orchestrator |
### Phase 5 — Step Dependencies
```
Step 5.1 (chat WS fix) ──────────────► Step 5.2 (memory tools) ──► Step 5.3 (register on agents)
──► Step 5.4 (extend ChatContext)
Step 5.1 is the BLOCKER — nothing else works until bidirectional tool calls are wired.
Steps 5.3 and 5.4 can run in parallel after 5.2.
```
---
- **One step at a time.** Mark `[x]` and commit with `step N.N complete: <outcome>`.
> The on-device KV approach (Electron SQLite `agent_memory` table) is no longer the target architecture.
> See `V3_MIGRATION_PLAN.md` for the current plan.

284
V3_MIGRATION_PLAN.md Normal file
View File

@@ -0,0 +1,284 @@
# V3 Migration Plan — Multi-Agent AI Productivity App
> Incremental migration from current architecture to v3.
> Each step is self-contained, testable, and backwards-compatible.
> No BYOK — server manages all LLM keys.
> Memory encryption: server-side per-user Fernet key (Option A).
---
## Decisions Log
| Topic | Decision |
|---|---|
| WS topology | Single multiplexed socket (merge chat into device WS) |
| LLM keys | Server-managed only, no user key passthrough |
| Memory encryption | Per-user server-generated Fernet key, encrypted at rest, decrypted in-memory |
| device_manager | Already multi-user correct (keyed by user_id), no structural change |
---
## Step 1 — WS Frame Protocol (schemas.py)
**Goal**: Define the v3 frame vocabulary so all subsequent steps can import it.
**Changes**:
- `app/schemas.py` — Add to `WsFrameType` enum:
- `home_request`, `popup_request`
- `stream_start`, `stream_text`, `stream_block`, `stream_end`
- `popup_domain`
- `data_request`, `data_response`, `mutation`
- Add Pydantic models:
- `WsHomeRequest(type, message, conversation_history?)`
- `WsPopupRequest(type, message, scope: {type, id?})`
- `WsStreamStart(type, request_id)`
- `WsStreamText(type, request_id, chunk)`
- `WsStreamBlock(type, request_id, block_type, data)`
- `WsStreamEnd(type, request_id, mutations?)`
- `WsPopupDomain(type, request_id, domain)`
- Keep all existing frame types (backward compat).
**Files touched**: `app/schemas.py`
**Test**: Unit test that validates each new model serializes/deserializes correctly.
```
pytest tests/test_schemas_v3.py
```
---
## Step 2 — Agent Streaming + Tool Result Capture (agent_registry.py, agents/)
**Goal**: Agents can stream LLM tokens and expose structured tool results.
**Changes**:
- `app/core/agent_registry.py`:
- Add `_tool_loop_stream()` to `ChatAgent` — same logic as `_tool_loop()` but the **final** LLM call (when no more tool calls) uses `llm.astream()` and yields tokens.
- Add `self.tool_results: list[dict]` attribute to `ChatAgent.__init__()`.
- In both `_tool_loop` and `_tool_loop_stream`, capture raw `execute_on_client` results when tools run (store in `self.tool_results`).
- `app/agents/*.py` — Each agent's tools already return text summaries. No change to tools. The raw data capture happens at the `_tool_loop` level by intercepting `ToolMessage` content that comes from `execute_on_client`.
**Files touched**: `app/core/agent_registry.py`
**Test**: Unit test with mocked LLM that verifies `_tool_loop_stream()` yields tokens and `agent.tool_results` contains structured data after a tool call.
```
pytest tests/test_agent_streaming.py
```
---
## Step 3 — Router Refactor (orchestrator.py)
**Goal**: Orchestrator returns agent name alongside execution, supports streaming.
**Changes**:
- `app/core/orchestrator.py`:
- Add `orchestrate_v3(user_id, message, context, mode)` that:
1. Calls `classify_intent()` (unchanged) -> `agent_name`
2. Instantiates agent via registry
3. Returns `(agent_name, agent_instance)` — caller drives execution
- Add `orchestrate_v3_stream(user_id, message, context)` -> `AsyncGenerator` that:
1. Calls `classify_intent()` -> `agent_name`
2. Calls `agent.handle_stream()` (uses `_tool_loop_stream`)
3. Yields `(agent_name, token)` tuples — first yield includes agent name for domain detection
- Keep `orchestrate()` and `orchestrate_stream()` unchanged (backward compat for POST /chat).
**Files touched**: `app/core/orchestrator.py`
**Test**: Unit test with mocked LLM and mocked registry that verifies `orchestrate_v3_stream` yields `(agent_name, token)` pairs.
```
pytest tests/test_orchestrator_v3.py
```
---
## Step 4 — Output Formatting Layer (NEW: output_formatter.py)
**Goal**: Home and Popup responses diverge at this layer only.
### Block Types (from Electron app components)
The LLM outputs a JSON block stream. Each block has a `type` field that maps to
an Electron renderer component. The server validates and forwards these blocks.
**Text block** — streamed immediately, word-by-word:
```json
{ "type": "text", "content": "Here's your task summary..." }
```
**Chart blocks** — buffered until complete, validated, sent as `stream_block`.
Chart types match shadcn/ui Recharts wrappers used in the Electron app:
```json
{ "type": "chart", "chartType": "<type>", "title": "...", "data": [...], "config": {...} }
```
Supported `chartType` values:
- `area` — Area chart (shadcn AreaChart)
- `bar` — Bar chart (shadcn BarChart)
- `line` — Line chart (shadcn LineChart)
- `pie` — Pie chart (shadcn PieChart)
- `radar` — Radar chart (shadcn RadarChart)
- `radial` — Radial/gauge chart (shadcn RadialChart)
`data` is an array of objects with keys matching the chart's dataKey config.
`config` follows the shadcn ChartConfig format: `{ [dataKey]: { label, color } }`.
**Entity blocks** — server serializes from `agent.tool_results` (not LLM-generated data):
```json
{ "type": "entity_ref", "entity": "task" }
```
The server resolves this by looking up the structured data from the agent's
tool call results and emitting a `stream_block` with the full entity data.
Supported entity types (matching Electron component types):
- `task` — TaskRow component (`TaskItem`: id, title, status, priority, assignee, dueDate, projectId, ...)
- `project` — Project card (id, name, clientId, status)
- `note` — Note card (id, title, createdAt, projectId)
- `checkpoint` — Checkpoint card (GanttCheckpoint: id, title, date, projectId, isAiSuggested, isApproved)
**Table block** — buffered, validated:
```json
{ "type": "table", "headers": ["Col1", "Col2"], "rows": [["val1", "val2"]] }
```
**Timeline block** — buffered, validated (renders via GanttChart component):
```json
{ "type": "timeline", "checkpoints": [{ "id": "...", "title": "...", "date": 1234567890 }] }
```
### Changes
- `app/core/output_formatter.py` (new file):
- `HomeFormatter`:
- Receives token stream from orchestrator
- Accumulates tokens into a JSON-aware buffer
- Detects block boundaries by `type` field:
- `text` -> yields `WsStreamText` immediately (streams content word-by-word)
- `chart` -> buffers until JSON complete, validates `chartType` against allowed set, yields `WsStreamBlock`
- `entity_ref` -> looks up data from `agent.tool_results`, serializes full entity, yields `WsStreamBlock`
- `table` -> buffers, validates headers/rows structure, yields `WsStreamBlock`
- `timeline` -> buffers, validates checkpoint objects, yields `WsStreamBlock`
- Invalid blocks are logged and skipped (never crash the stream)
- `PopupFormatter`:
- Receives `agent_name` from orchestrator
- Maps agent name to domain (deterministic, by code — no LLM):
- `task_agent` -> `"tasks"`
- `checkpoint_agent` -> `"checkpoints"`
- `note_agent` -> `"notes"`
- `project_agent` -> `"projects"`
- Yields `WsPopupDomain` immediately
- Then yields `WsStreamText` for all tokens (text-only, no blocks)
**Files touched**: `app/core/output_formatter.py` (new)
**Test**: Unit test that feeds a mock token stream through each formatter and asserts correct frame output sequence.
```
pytest tests/test_output_formatter.py
```
---
## Step 5 — Unified WS Handler (device_ws.py, chat.py, main.py)
**Goal**: Single multiplexed WebSocket handles device frames + Home/Popup chat.
**Changes**:
- `app/api/routes/device_ws.py`:
- Extend `_message_loop` dispatch to handle `home_request` and `popup_request`:
- On `home_request`: set `ws_context` executor, call `orchestrate_v3_stream`, pipe through `HomeFormatter`, send frames back on same socket.
- On `popup_request`: same, but pipe through `PopupFormatter`.
- Wrap both in try/finally to clear `ws_context`.
- Each request gets a `request_id` (UUID) for frame correlation.
- Concurrent requests from same client are supported (each runs as an async task).
- `app/api/routes/chat.py`:
- Remove `chat_stream` WS endpoint.
- Keep `POST /chat` endpoint unchanged (REST fallback).
- `app/main.py`:
- No change needed (device_ws router already registered).
**Files touched**: `app/api/routes/device_ws.py`, `app/api/routes/chat.py`, `app/main.py`
**Test**: Integration test with a WebSocket test client that:
1. Connects to `/api/v1/ws/device`
2. Sends `device_hello`
3. Sends `home_request` -> receives `stream_start`, `stream_text`*, `stream_end`
4. Sends `popup_request` -> receives `popup_domain`, `stream_text`*, `stream_end`
5. Verifies `tool_call`/`tool_result` round-trip still works during chat
```
pytest tests/test_ws_unified.py
```
---
## Step 6 — Memory Models + Migration (models.py, alembic)
**Goal**: Database tables for 4-tier memory, with per-user encryption key.
**Changes**:
- `app/models.py`:
- Add `encryption_key` column to `User` model (Fernet key, generated on registration).
- Add `MemoryCore` model: `id, user_id, key, value_encrypted, updated_at`
- Add `MemoryAssociative` model: `id, user_id, content_encrypted, embedding (Vector(1536)), entity_type, entity_id, updated_at`
- Add `MemoryEpisodic` model: `id, user_id, summary_encrypted, session_id, created_at`
- Add `MemoryProactive` model: `id, user_id, pattern_encrypted, confidence, source, created_at`
- `alembic/versions/` — New migration adding the 4 memory tables + user encryption_key column.
- `app/api/routes/auth.py` — On user registration, generate and store a Fernet key.
**Files touched**: `app/models.py`, `alembic/versions/xxx_add_memory_tables.py`, `app/api/routes/auth.py`
**Test**: Run migration up/down, verify tables exist with correct columns.
```
alembic upgrade head && alembic downgrade -1 && alembic upgrade head
pytest tests/test_memory_models.py
```
---
## Step 7 — Memory Middleware (NEW: memory_middleware.py)
**Goal**: Enrich every Router call with memory context, store interactions after.
**Changes**:
- `app/core/memory_middleware.py` (new file):
- `MemoryMiddleware` class with:
- `enrich_context(user_id, message) -> dict` (pre-LLM):
1. Load core memory (user prefs) — always injected
2. Embed `message`, search `MemoryAssociative` via pgvector — top-k relevant
3. Fetch recent `MemoryEpisodic` entries — last N sessions
4. Fetch active `MemoryProactive` patterns — above confidence threshold
5. Return merged context dict
- `store_episode(user_id, session_id, message, response)` (post-LLM):
1. Summarize interaction (short LLM call or heuristic)
2. Encrypt and store in `MemoryEpisodic`
3. Embed interaction, encrypt and upsert in `MemoryAssociative`
- `update_core(user_id, key, value)` — explicit preference update
- All read/write operations encrypt/decrypt using the user's Fernet key from `User.encryption_key`
- `app/api/routes/device_ws.py` — Update `home_request` and `popup_request` handlers:
- Before orchestrator: `enriched = await memory.enrich_context(user_id, message)`
- After response complete: `await memory.store_episode(user_id, ...)`
**Files touched**: `app/core/memory_middleware.py` (new), `app/api/routes/device_ws.py`
**Test**: Unit test with seeded memory rows that verifies:
1. `enrich_context` returns core prefs + associative matches + episodic summaries
2. `store_episode` creates encrypted rows that can be decrypted with the user's key
3. End-to-end WS test: send `home_request`, verify memory enrichment is passed to orchestrator
```
pytest tests/test_memory_middleware.py
```
---
## Summary
| Step | Component | Effort | Depends On |
|------|-----------|--------|------------|
| 1 | WS Frame Protocol | Low | — |
| 2 | Agent Streaming | Medium | Step 1 |
| 3 | Router Refactor | Medium | Step 2 |
| 4 | Output Formatter | High | Steps 1, 3 |
| 5 | Unified WS Handler | High | Steps 14 |
| 6 | Memory Models | Medium | — |
| 7 | Memory Middleware | High | Steps 5, 6 |
Steps 15 form the streaming pipeline. Steps 67 form the memory system.
Step 6 can run in parallel with Steps 24 (no dependencies).