diff --git a/AI_REFACTOR_PLAN.md b/AI_REFACTOR_PLAN.md index 12fe505..ac46d5e 100644 --- a/AI_REFACTOR_PLAN.md +++ b/AI_REFACTOR_PLAN.md @@ -512,113 +512,12 @@ Cloud Agent: --- -## Phase 5 — Shared Memory (Agent KV + Chat WS Fix) +## ~~Phase 5 — Shared Memory~~ (SUPERSEDED) -> **Objective:** Give chat agents persistent memory via a KV store on the Electron client. Agents can `store_memory()` to remember user preferences, patterns, and corrections, and `recall_memories()` to retrieve them. All data lives in Electron's SQLite `agent_memory` table (local-first, never stored server-side). This also requires fixing the chat WS handler to support bidirectional tool calls — currently a critical gap that blocks all agent tools from working over the `/chat/stream` endpoint. +> **This phase has been fully replaced by `V3_MIGRATION_PLAN.md`.** > -> **Electron Phase 5 plan:** `../adiuva/AI_REFACTOR_PLAN.md` Phase 5 section. +> - Chat WS fix → V3 Step 5 (Unified WS Handler — single multiplexed socket) +> - Agent memory → V3 Steps 6–7 (Cloud-side MemGPT-style memory in PostgreSQL + pgvector, encrypted at rest with per-user Fernet key) > -> **Why agent KV matters:** Chat agents are currently stateless — they can't remember "User prefers to-do in lowercase" or "Client X billing cycle is the 15th". With KV memory, agents become learning assistants that improve over time. Users feel the AI "knows them" without any data leaving their device. -> -> **Why the chat WS fix is critical:** The existing `/chat/stream` WS handler (`app/api/routes/chat.py`) never calls `set_client_executor()`. This means `execute_on_client()` raises `RuntimeError` whenever any agent tool tries to call it during a chat session. All 23 tools are broken over chat WS. This must be fixed before memory tools (or any tools) can work. -> -> **New Electron tables** (managed by Electron, accessed by backend via `execute_on_client`): -> - `chat_messages`: `id`, `scope`, `role`, `content`, `error`, `created_at` -> - `agent_memory`: `id`, `agent_name`, `key`, `value`, `scope`, `created_at`, `updated_at` (unique on `agent_name, key, scope`) - -### Step 5.1 — Fix chat WS for bidirectional tool calls (PREREQUISITE) - -> **This is the highest-priority backend fix.** Without it, zero agent tools work over the chat WS connection. - -- [ ] Rewrite `app/api/routes/chat.py` — `chat_stream()` WS handler: - - After auth + accept, receive first frame as `{"type": "chat_request", ...}` (not raw `ChatRequest`) - - Parse frame, extract `message` and `context` - - Set up a local `pending_calls: dict[str, asyncio.Future]` for tool-call round-trips - - Define executor callback: - ```python - async def execute_callback(payload: dict) -> dict: - call_id = payload["id"] - fut = asyncio.get_event_loop().create_future() - pending_calls[call_id] = fut - await websocket.send_text(json.dumps({"type": "tool_call", **payload})) - return await asyncio.wait_for(fut, timeout=30.0) - ``` - - Call `set_client_executor(execute_callback)` before orchestrating - - Run two concurrent tasks: - 1. **Receive loop**: dispatches incoming frames — `tool_result` resolves pending Futures, `pong` ignored - 2. **Orchestration task**: calls `orchestrate_stream()`, wraps chunks in `{"type": "text_chunk", "text": "..."}` frames, sends `{"type": "final", "response": "..."}` on completion - - Call `clear_client_executor()` in finally block - - Keep heartbeat ping every 30s - - 30s timeout on each `tool_result` — tool returns error string to LLM on timeout -- [ ] Update `orchestrate_stream()` in `app/core/orchestrator.py` if needed: - - Ensure it properly yields text chunks (currently chunks by fixed 50-char slices — consider switching to yielding full response as single chunk for now) -- **Files:** `app/api/routes/chat.py`, `app/core/orchestrator.py` -- **Outcome:** Full bidirectional WS. Tool calls, text streaming, and heartbeats happen concurrently. All 23 existing agent tools now work over chat WS. - -### Step 5.2 — Agent memory tools - -- [ ] Create `app/agents/tools/memory_tools.py`: - - `create_memory_tools(agent_name: str) -> list[Tool]` — factory function that returns two LangChain `@tool` functions with `agent_name` bound via closure: - - **`store_memory(key: str, value: str, scope: str = "global")`**: - - Calls `execute_on_client(action="select", table="agentMemory", filters={"agentName": agent_name, "key": key, "scope": scope})` - - If row exists: `execute_on_client(action="update", table="agentMemory", data={"id": row["id"], "updates": {"value": value, "updatedAt": }})` - - If not: `execute_on_client(action="insert", table="agentMemory", data={"agentName": agent_name, "key": key, "value": value, "scope": scope})` - - Returns `"Stored memory: [key] = [value]"` - - **`recall_memories(key_pattern: str = None, scope: str = "global", limit: int = 10)`**: - - Calls `execute_on_client(action="select", table="agentMemory", filters={"agentName": agent_name, "scope": scope, "search": key_pattern})` - - Returns formatted list: `"key1: value1\nkey2: value2\n..."` or `"No memories found."` - - Timestamps are Unix milliseconds (consistent with Electron's `Date.now()`) - - Agent name scoping: each agent only sees its own memories (filtered by `agentName`) -- **Files:** `app/agents/tools/memory_tools.py` -- **Outcome:** Two reusable tools any agent can include. Upsert semantics via select-then-insert/update. - -### Step 5.3 — Register memory tools on all agents - -- [ ] Update `app/agents/task_agent.py`: - - Import `create_memory_tools` from `app/agents/tools/memory_tools` - - Add memory tools to `get_tools()`: `return [list_tasks, create_task, ..., *create_memory_tools("task_agent")]` - - Append to `_SYSTEM_PROMPT`: `"\n\nYou can store important facts about user preferences using store_memory and recall past facts using recall_memories. Store corrections, preferences, and patterns the user shares (e.g. 'User prefers short task titles', 'Default priority is medium'). Always check memories before giving advice."` -- [ ] Update `app/agents/project_agent.py` — same pattern with `create_memory_tools("project_agent")` -- [ ] Update `app/agents/note_agent.py` — same pattern with `create_memory_tools("note_agent")` -- [ ] Update `app/agents/checkpoint_agent.py` — same pattern with `create_memory_tools("checkpoint_agent")` -- **Files:** `app/agents/task_agent.py`, `app/agents/project_agent.py`, `app/agents/note_agent.py`, `app/agents/checkpoint_agent.py` -- **Outcome:** All 4 chat agents can store and recall persistent memories. Each agent's memories are scoped by `agentName`. - -### Step 5.4 — Extend ChatContext with agent memories - -- [ ] Update `app/schemas.py`: - - Add `agent_memories: list[dict[str, Any]] = Field(default_factory=list)` to `ChatContext` - - These are pre-loaded by Electron (from `agent_memory` table) and included in every request -- [ ] Agent `handle()` methods already receive full `context` dict — memories are visible in `context["agent_memories"]` -- [ ] Agent system prompts reference memories from context: agents see pre-loaded memories AND can call `recall_memories` for targeted lookup -- **Files:** `app/schemas.py` -- **Outcome:** Backend receives pre-loaded memories from Electron. Agents have dual-path access: context injection (passive) + tool call (active). - -### Phase 5 — Verification - -| # | Scenario | Expected | -|---|---|---| -| 1 | **Chat WS bidirectional** | Connect → send `chat_request` → receive `tool_call` → respond `tool_result` → receive `text_chunk` → `final` | -| 2 | **All existing tools work** | "List my tasks" over chat WS → `tool_call(select, tasks)` → Electron returns rows → LLM responds with real task data | -| 3 | **Store memory** | "Remember that I prefer short task titles" → `store_memory("task_title_preference", "short")` → `tool_call(insert, agentMemory)` → Electron persists | -| 4 | **Recall memory** | New chat session → "How should I name tasks?" → agent sees pre-loaded memory in context or calls `recall_memories` → references stored preference | -| 5 | **Upsert semantics** | Store same key twice → only one row exists with updated value | -| 6 | **Agent scope isolation** | `task_agent` stores memory → `note_agent` cannot see it (filtered by `agentName`) | -| 7 | **Project scope** | Store memory with `scope="project:"` → only visible in that project's chat context | -| 8 | **Tool timeout** | Disconnect Electron mid-tool-call → 30s timeout → tool returns error → LLM handles gracefully | -| 9 | **Concurrent tool calls** | Agent calls `list_tasks` then `recall_memories` in sequence → both WS round-trips succeed | -| 10 | **Existing tests pass** | `pytest` — no regressions in agent tools or orchestrator | - -### Phase 5 — Step Dependencies - -``` -Step 5.1 (chat WS fix) ──────────────► Step 5.2 (memory tools) ──► Step 5.3 (register on agents) - ──► Step 5.4 (extend ChatContext) - -Step 5.1 is the BLOCKER — nothing else works until bidirectional tool calls are wired. -Steps 5.3 and 5.4 can run in parallel after 5.2. -``` - ---- - -- **One step at a time.** Mark `[x]` and commit with `step N.N complete: `. \ No newline at end of file +> The on-device KV approach (Electron SQLite `agent_memory` table) is no longer the target architecture. +> See `V3_MIGRATION_PLAN.md` for the current plan. \ No newline at end of file diff --git a/V3_MIGRATION_PLAN.md b/V3_MIGRATION_PLAN.md new file mode 100644 index 0000000..c8b565f --- /dev/null +++ b/V3_MIGRATION_PLAN.md @@ -0,0 +1,284 @@ +# V3 Migration Plan — Multi-Agent AI Productivity App + +> Incremental migration from current architecture to v3. +> Each step is self-contained, testable, and backwards-compatible. +> No BYOK — server manages all LLM keys. +> Memory encryption: server-side per-user Fernet key (Option A). + +--- + +## Decisions Log + +| Topic | Decision | +|---|---| +| WS topology | Single multiplexed socket (merge chat into device WS) | +| LLM keys | Server-managed only, no user key passthrough | +| Memory encryption | Per-user server-generated Fernet key, encrypted at rest, decrypted in-memory | +| device_manager | Already multi-user correct (keyed by user_id), no structural change | + +--- + +## Step 1 — WS Frame Protocol (schemas.py) + +**Goal**: Define the v3 frame vocabulary so all subsequent steps can import it. + +**Changes**: +- `app/schemas.py` — Add to `WsFrameType` enum: + - `home_request`, `popup_request` + - `stream_start`, `stream_text`, `stream_block`, `stream_end` + - `popup_domain` + - `data_request`, `data_response`, `mutation` +- Add Pydantic models: + - `WsHomeRequest(type, message, conversation_history?)` + - `WsPopupRequest(type, message, scope: {type, id?})` + - `WsStreamStart(type, request_id)` + - `WsStreamText(type, request_id, chunk)` + - `WsStreamBlock(type, request_id, block_type, data)` + - `WsStreamEnd(type, request_id, mutations?)` + - `WsPopupDomain(type, request_id, domain)` +- Keep all existing frame types (backward compat). + +**Files touched**: `app/schemas.py` + +**Test**: Unit test that validates each new model serializes/deserializes correctly. +``` +pytest tests/test_schemas_v3.py +``` + +--- + +## Step 2 — Agent Streaming + Tool Result Capture (agent_registry.py, agents/) + +**Goal**: Agents can stream LLM tokens and expose structured tool results. + +**Changes**: +- `app/core/agent_registry.py`: + - Add `_tool_loop_stream()` to `ChatAgent` — same logic as `_tool_loop()` but the **final** LLM call (when no more tool calls) uses `llm.astream()` and yields tokens. + - Add `self.tool_results: list[dict]` attribute to `ChatAgent.__init__()`. + - In both `_tool_loop` and `_tool_loop_stream`, capture raw `execute_on_client` results when tools run (store in `self.tool_results`). +- `app/agents/*.py` — Each agent's tools already return text summaries. No change to tools. The raw data capture happens at the `_tool_loop` level by intercepting `ToolMessage` content that comes from `execute_on_client`. + +**Files touched**: `app/core/agent_registry.py` + +**Test**: Unit test with mocked LLM that verifies `_tool_loop_stream()` yields tokens and `agent.tool_results` contains structured data after a tool call. +``` +pytest tests/test_agent_streaming.py +``` + +--- + +## Step 3 — Router Refactor (orchestrator.py) + +**Goal**: Orchestrator returns agent name alongside execution, supports streaming. + +**Changes**: +- `app/core/orchestrator.py`: + - Add `orchestrate_v3(user_id, message, context, mode)` that: + 1. Calls `classify_intent()` (unchanged) -> `agent_name` + 2. Instantiates agent via registry + 3. Returns `(agent_name, agent_instance)` — caller drives execution + - Add `orchestrate_v3_stream(user_id, message, context)` -> `AsyncGenerator` that: + 1. Calls `classify_intent()` -> `agent_name` + 2. Calls `agent.handle_stream()` (uses `_tool_loop_stream`) + 3. Yields `(agent_name, token)` tuples — first yield includes agent name for domain detection + - Keep `orchestrate()` and `orchestrate_stream()` unchanged (backward compat for POST /chat). + +**Files touched**: `app/core/orchestrator.py` + +**Test**: Unit test with mocked LLM and mocked registry that verifies `orchestrate_v3_stream` yields `(agent_name, token)` pairs. +``` +pytest tests/test_orchestrator_v3.py +``` + +--- + +## Step 4 — Output Formatting Layer (NEW: output_formatter.py) + +**Goal**: Home and Popup responses diverge at this layer only. + +### Block Types (from Electron app components) + +The LLM outputs a JSON block stream. Each block has a `type` field that maps to +an Electron renderer component. The server validates and forwards these blocks. + +**Text block** — streamed immediately, word-by-word: +```json +{ "type": "text", "content": "Here's your task summary..." } +``` + +**Chart blocks** — buffered until complete, validated, sent as `stream_block`. +Chart types match shadcn/ui Recharts wrappers used in the Electron app: +```json +{ "type": "chart", "chartType": "", "title": "...", "data": [...], "config": {...} } +``` +Supported `chartType` values: +- `area` — Area chart (shadcn AreaChart) +- `bar` — Bar chart (shadcn BarChart) +- `line` — Line chart (shadcn LineChart) +- `pie` — Pie chart (shadcn PieChart) +- `radar` — Radar chart (shadcn RadarChart) +- `radial` — Radial/gauge chart (shadcn RadialChart) + +`data` is an array of objects with keys matching the chart's dataKey config. +`config` follows the shadcn ChartConfig format: `{ [dataKey]: { label, color } }`. + +**Entity blocks** — server serializes from `agent.tool_results` (not LLM-generated data): +```json +{ "type": "entity_ref", "entity": "task" } +``` +The server resolves this by looking up the structured data from the agent's +tool call results and emitting a `stream_block` with the full entity data. + +Supported entity types (matching Electron component types): +- `task` — TaskRow component (`TaskItem`: id, title, status, priority, assignee, dueDate, projectId, ...) +- `project` — Project card (id, name, clientId, status) +- `note` — Note card (id, title, createdAt, projectId) +- `checkpoint` — Checkpoint card (GanttCheckpoint: id, title, date, projectId, isAiSuggested, isApproved) + +**Table block** — buffered, validated: +```json +{ "type": "table", "headers": ["Col1", "Col2"], "rows": [["val1", "val2"]] } +``` + +**Timeline block** — buffered, validated (renders via GanttChart component): +```json +{ "type": "timeline", "checkpoints": [{ "id": "...", "title": "...", "date": 1234567890 }] } +``` + +### Changes + +- `app/core/output_formatter.py` (new file): + - `HomeFormatter`: + - Receives token stream from orchestrator + - Accumulates tokens into a JSON-aware buffer + - Detects block boundaries by `type` field: + - `text` -> yields `WsStreamText` immediately (streams content word-by-word) + - `chart` -> buffers until JSON complete, validates `chartType` against allowed set, yields `WsStreamBlock` + - `entity_ref` -> looks up data from `agent.tool_results`, serializes full entity, yields `WsStreamBlock` + - `table` -> buffers, validates headers/rows structure, yields `WsStreamBlock` + - `timeline` -> buffers, validates checkpoint objects, yields `WsStreamBlock` + - Invalid blocks are logged and skipped (never crash the stream) + - `PopupFormatter`: + - Receives `agent_name` from orchestrator + - Maps agent name to domain (deterministic, by code — no LLM): + - `task_agent` -> `"tasks"` + - `checkpoint_agent` -> `"checkpoints"` + - `note_agent` -> `"notes"` + - `project_agent` -> `"projects"` + - Yields `WsPopupDomain` immediately + - Then yields `WsStreamText` for all tokens (text-only, no blocks) + +**Files touched**: `app/core/output_formatter.py` (new) + +**Test**: Unit test that feeds a mock token stream through each formatter and asserts correct frame output sequence. +``` +pytest tests/test_output_formatter.py +``` + +--- + +## Step 5 — Unified WS Handler (device_ws.py, chat.py, main.py) + +**Goal**: Single multiplexed WebSocket handles device frames + Home/Popup chat. + +**Changes**: +- `app/api/routes/device_ws.py`: + - Extend `_message_loop` dispatch to handle `home_request` and `popup_request`: + - On `home_request`: set `ws_context` executor, call `orchestrate_v3_stream`, pipe through `HomeFormatter`, send frames back on same socket. + - On `popup_request`: same, but pipe through `PopupFormatter`. + - Wrap both in try/finally to clear `ws_context`. + - Each request gets a `request_id` (UUID) for frame correlation. + - Concurrent requests from same client are supported (each runs as an async task). +- `app/api/routes/chat.py`: + - Remove `chat_stream` WS endpoint. + - Keep `POST /chat` endpoint unchanged (REST fallback). +- `app/main.py`: + - No change needed (device_ws router already registered). + +**Files touched**: `app/api/routes/device_ws.py`, `app/api/routes/chat.py`, `app/main.py` + +**Test**: Integration test with a WebSocket test client that: +1. Connects to `/api/v1/ws/device` +2. Sends `device_hello` +3. Sends `home_request` -> receives `stream_start`, `stream_text`*, `stream_end` +4. Sends `popup_request` -> receives `popup_domain`, `stream_text`*, `stream_end` +5. Verifies `tool_call`/`tool_result` round-trip still works during chat +``` +pytest tests/test_ws_unified.py +``` + +--- + +## Step 6 — Memory Models + Migration (models.py, alembic) + +**Goal**: Database tables for 4-tier memory, with per-user encryption key. + +**Changes**: +- `app/models.py`: + - Add `encryption_key` column to `User` model (Fernet key, generated on registration). + - Add `MemoryCore` model: `id, user_id, key, value_encrypted, updated_at` + - Add `MemoryAssociative` model: `id, user_id, content_encrypted, embedding (Vector(1536)), entity_type, entity_id, updated_at` + - Add `MemoryEpisodic` model: `id, user_id, summary_encrypted, session_id, created_at` + - Add `MemoryProactive` model: `id, user_id, pattern_encrypted, confidence, source, created_at` +- `alembic/versions/` — New migration adding the 4 memory tables + user encryption_key column. +- `app/api/routes/auth.py` — On user registration, generate and store a Fernet key. + +**Files touched**: `app/models.py`, `alembic/versions/xxx_add_memory_tables.py`, `app/api/routes/auth.py` + +**Test**: Run migration up/down, verify tables exist with correct columns. +``` +alembic upgrade head && alembic downgrade -1 && alembic upgrade head +pytest tests/test_memory_models.py +``` + +--- + +## Step 7 — Memory Middleware (NEW: memory_middleware.py) + +**Goal**: Enrich every Router call with memory context, store interactions after. + +**Changes**: +- `app/core/memory_middleware.py` (new file): + - `MemoryMiddleware` class with: + - `enrich_context(user_id, message) -> dict` (pre-LLM): + 1. Load core memory (user prefs) — always injected + 2. Embed `message`, search `MemoryAssociative` via pgvector — top-k relevant + 3. Fetch recent `MemoryEpisodic` entries — last N sessions + 4. Fetch active `MemoryProactive` patterns — above confidence threshold + 5. Return merged context dict + - `store_episode(user_id, session_id, message, response)` (post-LLM): + 1. Summarize interaction (short LLM call or heuristic) + 2. Encrypt and store in `MemoryEpisodic` + 3. Embed interaction, encrypt and upsert in `MemoryAssociative` + - `update_core(user_id, key, value)` — explicit preference update + - All read/write operations encrypt/decrypt using the user's Fernet key from `User.encryption_key` +- `app/api/routes/device_ws.py` — Update `home_request` and `popup_request` handlers: + - Before orchestrator: `enriched = await memory.enrich_context(user_id, message)` + - After response complete: `await memory.store_episode(user_id, ...)` + +**Files touched**: `app/core/memory_middleware.py` (new), `app/api/routes/device_ws.py` + +**Test**: Unit test with seeded memory rows that verifies: +1. `enrich_context` returns core prefs + associative matches + episodic summaries +2. `store_episode` creates encrypted rows that can be decrypted with the user's key +3. End-to-end WS test: send `home_request`, verify memory enrichment is passed to orchestrator +``` +pytest tests/test_memory_middleware.py +``` + +--- + +## Summary + +| Step | Component | Effort | Depends On | +|------|-----------|--------|------------| +| 1 | WS Frame Protocol | Low | — | +| 2 | Agent Streaming | Medium | Step 1 | +| 3 | Router Refactor | Medium | Step 2 | +| 4 | Output Formatter | High | Steps 1, 3 | +| 5 | Unified WS Handler | High | Steps 1–4 | +| 6 | Memory Models | Medium | — | +| 7 | Memory Middleware | High | Steps 5, 6 | + +Steps 1–5 form the streaming pipeline. Steps 6–7 form the memory system. +Step 6 can run in parallel with Steps 2–4 (no dependencies).