Files
api/V3_MIGRATION_PLAN.md
2026-03-08 21:21:03 +01:00

13 KiB
Raw Blame History

V3 Migration Plan — Multi-Agent AI Productivity App

Incremental migration from current architecture to v3. Each step is self-contained, testable, and backwards-compatible. No BYOK — server manages all LLM keys. Memory encryption: server-side per-user Fernet key (Option A).


Decisions Log

Topic Decision
WS topology Single multiplexed socket (merge chat into device WS)
LLM keys Server-managed only, no user key passthrough
Memory encryption Per-user server-generated Fernet key, encrypted at rest, decrypted in-memory
device_manager Already multi-user correct (keyed by user_id), no structural change

Step 1 — WS Frame Protocol (schemas.py)

Goal: Define the v3 frame vocabulary so all subsequent steps can import it.

Changes:

  • app/schemas.py — Add to WsFrameType enum:
    • home_request, popup_request
    • stream_start, stream_text, stream_block, stream_end
    • popup_domain
    • data_request, data_response, mutation
  • Add Pydantic models:
    • WsHomeRequest(type, message, conversation_history?)
    • WsPopupRequest(type, message, scope: {type, id?})
    • WsStreamStart(type, request_id)
    • WsStreamText(type, request_id, chunk)
    • WsStreamBlock(type, request_id, block_type, data)
    • WsStreamEnd(type, request_id, mutations?)
    • WsPopupDomain(type, request_id, domain)
  • Keep all existing frame types (backward compat).

Files touched: app/schemas.py

Test: Unit test that validates each new model serializes/deserializes correctly.

pytest tests/test_schemas_v3.py

Status:

  • Step 1 complete

Commit: After tests pass, commit with:

git commit -m "step-1: add v3 ws frame protocol (schemas.py)"

Step 2 — Agent Streaming + Tool Result Capture (agent_registry.py, agents/)

Goal: Agents can stream LLM tokens and expose structured tool results.

Changes:

  • app/core/agent_registry.py:
    • Add _tool_loop_stream() to ChatAgent — same logic as _tool_loop() but the final LLM call (when no more tool calls) uses llm.astream() and yields tokens.
    • Add self.tool_results: list[dict] attribute to ChatAgent.__init__().
    • In both _tool_loop and _tool_loop_stream, capture raw execute_on_client results when tools run (store in self.tool_results).
  • app/agents/*.py — Each agent's tools already return text summaries. No change to tools. The raw data capture happens at the _tool_loop level by intercepting ToolMessage content that comes from execute_on_client.

Files touched: app/core/agent_registry.py

Test: Unit test with mocked LLM that verifies _tool_loop_stream() yields tokens and agent.tool_results contains structured data after a tool call.

pytest tests/test_agent_streaming.py

Status:

  • Step 2 complete

Commit: After tests pass, commit with:

git commit -m "step-2: add agent streaming and tool result capture (agent_registry.py)"

Step 3 — Router Refactor (orchestrator.py)

Goal: Orchestrator returns agent name alongside execution, supports streaming.

Changes:

  • app/core/orchestrator.py:
    • Add orchestrate_v3(user_id, message, context, mode) that:
      1. Calls classify_intent() (unchanged) -> agent_name
      2. Instantiates agent via registry
      3. Returns (agent_name, agent_instance) — caller drives execution
    • Add orchestrate_v3_stream(user_id, message, context) -> AsyncGenerator that:
      1. Calls classify_intent() -> agent_name
      2. Calls agent.handle_stream() (uses _tool_loop_stream)
      3. Yields (agent_name, token) tuples — first yield includes agent name for domain detection
    • Keep orchestrate() and orchestrate_stream() unchanged (backward compat for POST /chat).

Files touched: app/core/orchestrator.py

Test: Unit test with mocked LLM and mocked registry that verifies orchestrate_v3_stream yields (agent_name, token) pairs.

pytest tests/test_orchestrator_v3.py

Status:

  • Step 3 complete

Commit: After tests pass, commit with:

git commit -m "step-3: add router refactor with streaming support (orchestrator.py)"

Step 4 — Output Formatting Layer (NEW: output_formatter.py)

Goal: Home and Popup responses diverge at this layer only.

Block Types (from Electron app components)

The LLM outputs a JSON block stream. Each block has a type field that maps to an Electron renderer component. The server validates and forwards these blocks.

Text block — streamed immediately, word-by-word:

{ "type": "text", "content": "Here's your task summary..." }

Chart blocks — buffered until complete, validated, sent as stream_block. Chart types match shadcn/ui Recharts wrappers used in the Electron app:

{ "type": "chart", "chartType": "<type>", "title": "...", "data": [...], "config": {...} }

Supported chartType values:

  • area — Area chart (shadcn AreaChart)
  • bar — Bar chart (shadcn BarChart)
  • line — Line chart (shadcn LineChart)
  • pie — Pie chart (shadcn PieChart)
  • radar — Radar chart (shadcn RadarChart)
  • radial — Radial/gauge chart (shadcn RadialChart)

data is an array of objects with keys matching the chart's dataKey config. config follows the shadcn ChartConfig format: { [dataKey]: { label, color } }.

Entity blocks — server serializes from agent.tool_results (not LLM-generated data):

{ "type": "entity_ref", "entity": "task" }

The server resolves this by looking up the structured data from the agent's tool call results and emitting a stream_block with the full entity data.

Supported entity types (matching Electron component types):

  • task — TaskRow component (TaskItem: id, title, status, priority, assignee, dueDate, projectId, ...)
  • project — Project card (id, name, clientId, status)
  • note — Note card (id, title, createdAt, projectId)
  • checkpoint — Checkpoint card (GanttCheckpoint: id, title, date, projectId, isAiSuggested, isApproved)

Table block — buffered, validated:

{ "type": "table", "headers": ["Col1", "Col2"], "rows": [["val1", "val2"]] }

Timeline block — buffered, validated (renders via GanttChart component):

{ "type": "timeline", "checkpoints": [{ "id": "...", "title": "...", "date": 1234567890 }] }

Changes

  • app/core/output_formatter.py (new file):
    • HomeFormatter:
      • Receives token stream from orchestrator
      • Accumulates tokens into a JSON-aware buffer
      • Detects block boundaries by type field:
        • text -> yields WsStreamText immediately (streams content word-by-word)
        • chart -> buffers until JSON complete, validates chartType against allowed set, yields WsStreamBlock
        • entity_ref -> looks up data from agent.tool_results, serializes full entity, yields WsStreamBlock
        • table -> buffers, validates headers/rows structure, yields WsStreamBlock
        • timeline -> buffers, validates checkpoint objects, yields WsStreamBlock
      • Invalid blocks are logged and skipped (never crash the stream)
    • PopupFormatter:
      • Receives agent_name from orchestrator
      • Maps agent name to domain (deterministic, by code — no LLM):
        • task_agent -> "tasks"
        • checkpoint_agent -> "checkpoints"
        • note_agent -> "notes"
        • project_agent -> "projects"
      • Yields WsPopupDomain immediately
      • Then yields WsStreamText for all tokens (text-only, no blocks)

Files touched: app/core/output_formatter.py (new)

Test: Unit test that feeds a mock token stream through each formatter and asserts correct frame output sequence.

pytest tests/test_output_formatter.py

Status:

  • Step 4 complete

Commit: After tests pass, commit with:

git commit -m "step-4: add output formatting layer (output_formatter.py)"

Step 5 — Unified WS Handler (device_ws.py, chat.py, main.py)

Goal: Single multiplexed WebSocket handles device frames + Home/Popup chat.

Changes:

  • app/api/routes/device_ws.py:
    • Extend _message_loop dispatch to handle home_request and popup_request:
      • On home_request: set ws_context executor, call orchestrate_v3_stream, pipe through HomeFormatter, send frames back on same socket.
      • On popup_request: same, but pipe through PopupFormatter.
      • Wrap both in try/finally to clear ws_context.
    • Each request gets a request_id (UUID) for frame correlation.
    • Concurrent requests from same client are supported (each runs as an async task).
  • app/api/routes/chat.py:
    • Remove chat_stream WS endpoint.
    • Keep POST /chat endpoint unchanged (REST fallback).
  • app/main.py:
    • No change needed (device_ws router already registered).

Files touched: app/api/routes/device_ws.py, app/api/routes/chat.py, app/main.py

Test: Integration test with a WebSocket test client that:

  1. Connects to /api/v1/ws/device
  2. Sends device_hello
  3. Sends home_request -> receives stream_start, stream_text*, stream_end
  4. Sends popup_request -> receives popup_domain, stream_text*, stream_end
  5. Verifies tool_call/tool_result round-trip still works during chat
pytest tests/test_ws_unified.py

Status:

  • Step 5 complete

Commit: After tests pass, commit with:

git commit -m "step-5: unify ws handler (device_ws.py, chat.py)"

Step 6 — Memory Models + Migration (models.py, alembic)

Goal: Database tables for 4-tier memory, with per-user encryption key.

Changes:

  • app/models.py:
    • Add encryption_key column to User model (Fernet key, generated on registration).
    • Add MemoryCore model: id, user_id, key, value_encrypted, updated_at
    • Add MemoryAssociative model: id, user_id, content_encrypted, embedding (Vector(1536)), entity_type, entity_id, updated_at
    • Add MemoryEpisodic model: id, user_id, summary_encrypted, session_id, created_at
    • Add MemoryProactive model: id, user_id, pattern_encrypted, confidence, source, created_at
  • alembic/versions/ — New migration adding the 4 memory tables + user encryption_key column.
  • app/api/routes/auth.py — On user registration, generate and store a Fernet key.

Files touched: app/models.py, alembic/versions/xxx_add_memory_tables.py, app/api/routes/auth.py

Test: Run migration up/down, verify tables exist with correct columns.

alembic upgrade head && alembic downgrade -1 && alembic upgrade head
pytest tests/test_memory_models.py

Status:

  • Step 6 complete

Commit: After tests pass, commit with:

git commit -m "step-6: add memory models and migration (models.py, alembic)"

Step 7 — Memory Middleware (NEW: memory_middleware.py)

Goal: Enrich every Router call with memory context, store interactions after.

Changes:

  • app/core/memory_middleware.py (new file):
    • MemoryMiddleware class with:
      • enrich_context(user_id, message) -> dict (pre-LLM):
        1. Load core memory (user prefs) — always injected
        2. Embed message, search MemoryAssociative via pgvector — top-k relevant
        3. Fetch recent MemoryEpisodic entries — last N sessions
        4. Fetch active MemoryProactive patterns — above confidence threshold
        5. Return merged context dict
      • store_episode(user_id, session_id, message, response) (post-LLM):
        1. Summarize interaction (short LLM call or heuristic)
        2. Encrypt and store in MemoryEpisodic
        3. Embed interaction, encrypt and upsert in MemoryAssociative
      • update_core(user_id, key, value) — explicit preference update
      • All read/write operations encrypt/decrypt using the user's Fernet key from User.encryption_key
  • app/api/routes/device_ws.py — Update home_request and popup_request handlers:
    • Before orchestrator: enriched = await memory.enrich_context(user_id, message)
    • After response complete: await memory.store_episode(user_id, ...)

Files touched: app/core/memory_middleware.py (new), app/api/routes/device_ws.py

Test: Unit test with seeded memory rows that verifies:

  1. enrich_context returns core prefs + associative matches + episodic summaries
  2. store_episode creates encrypted rows that can be decrypted with the user's key
  3. End-to-end WS test: send home_request, verify memory enrichment is passed to orchestrator
pytest tests/test_memory_middleware.py

Status:

  • Step 7 complete

Commit: After tests pass, commit with:

git commit -m "step-7: add memory middleware (memory_middleware.py, device_ws.py)"

Summary

Step Component Effort Depends On
1 WS Frame Protocol Low
2 Agent Streaming Medium Step 1
3 Router Refactor Medium Step 2
4 Output Formatter High Steps 1, 3
5 Unified WS Handler High Steps 14
6 Memory Models Medium
7 Memory Middleware High Steps 5, 6

Steps 15 form the streaming pipeline. Steps 67 form the memory system. Step 6 can run in parallel with Steps 24 (no dependencies).