501 lines
30 KiB
Markdown
501 lines
30 KiB
Markdown
# AI Refactor Plan — Adiuva Backend
|
||
|
||
> **Objective:** Transform backend tools from JSON-action-descriptor-returning functions into real bidirectional executors. Each tool sends structured CRUD operations to the Electron client via WebSocket, receives real data back, and returns meaningful results to the LLM. The LLM reasons about actual user data instead of serialized action payloads.
|
||
>
|
||
> **Electron app:** Lives at `../adiuva/`. See `../adiuva/AI_REFACTOR_PLAN.md`.
|
||
>
|
||
> **Protocol:** Execute steps sequentially. Each step is atomic and committable. Mark `[x]` when done.
|
||
|
||
---
|
||
|
||
## Architecture — Before vs After
|
||
|
||
### Before (current)
|
||
```
|
||
LLM calls list_tasks(status="todo")
|
||
→ tool returns: '{"action":"list","table":"tasks","filters":{"status":"todo"}}'
|
||
→ _tool_loop feeds that JSON string as ToolMessage to LLM
|
||
→ LLM sees a descriptor, NOT real data — cannot reason about tasks
|
||
→ Final response: generic "Here are your tasks" (no actual task data)
|
||
→ Action descriptors sent in final WS frame for Electron to execute post-response
|
||
```
|
||
|
||
### After (target)
|
||
```
|
||
LLM calls list_tasks(status="todo")
|
||
→ tool calls execute_on_client(action="select", table="tasks", filters={status:"todo"})
|
||
→ WS frame sent to Electron: {type:"tool_call", id:"abc", action:"select", table:"tasks", filters:{status:"todo"}}
|
||
→ Electron runs: db.select().from(tasks).where(eq(tasks.status, "todo")).all()
|
||
→ WS frame back: {type:"tool_result", id:"abc", rows:[{id:"1",title:"Buy milk",...}, ...]}
|
||
→ tool returns: "Found 3 tasks: 1. Buy milk (high, due tomorrow) 2. ..."
|
||
→ _tool_loop feeds that as ToolMessage to LLM
|
||
→ LLM sees REAL data — can reason, count, compare, summarize
|
||
```
|
||
|
||
---
|
||
|
||
## WS Protocol — Typed Frames
|
||
|
||
| Direction | `type` | Payload |
|
||
|---|---|---|
|
||
| Client → Server | `chat_request` | `{ message: str, context: ChatContext }` |
|
||
| Server → Client | `text_chunk` | `{ text: str }` |
|
||
| Server → Client | `tool_call` | `{ id: str, action: str, table?: str, data?: dict, filters?: dict, vector?: list[float], limit?: int }` |
|
||
| Client → Server | `tool_result` | `{ id: str, row?: dict, rows?: list[dict], results?: list[dict], deleted?: bool, ok?: bool, error?: str }` |
|
||
| Server → Client | `final` | `{ response: str }` |
|
||
| Server → Client | `ping` | `{}` |
|
||
|
||
**Actions:**
|
||
|
||
| `action` | What Electron does (Drizzle) | `tool_result` shape |
|
||
|---|---|---|
|
||
| `select` | `db.select().from(table).where(filters)` | `{ rows: [...] }` |
|
||
| `get` | `db.select().from(table).where(id=...).get()` | `{ row: {...} or null }` |
|
||
| `insert` | `db.insert(table).values({id: uuid(), ...data}).returning().get()` | `{ row: {...} }` |
|
||
| `update` | `db.update(table).set(updates).where(id=...).returning().get()` | `{ row: {...} }` |
|
||
| `delete` | `db.delete(table).where(id=...).run()` | `{ deleted: true }` |
|
||
| `vector_upsert` | LanceDB upsert with pre-computed vector | `{ ok: true }` |
|
||
| `vector_search` | LanceDB search by vector | `{ results: [{id, content, score}...] }` |
|
||
|
||
**Electron generates IDs + timestamps.** Backend tools never send `id` or `createdAt` in `insert` data — Electron adds `id: uuid()`, `createdAt: Date.now()`, `updatedAt: Date.now()`.
|
||
|
||
---
|
||
|
||
## SQLite Schema Reference (Electron's local database)
|
||
|
||
Tools must use **camelCase** field names (Drizzle maps them to snake_case internally):
|
||
|
||
| Table | Columns |
|
||
|---|---|
|
||
| `tasks` | id, projectId, title, description, status (todo\|in_progress\|done), priority (high\|medium\|low), assignee (JSON array string), dueDate (ms), isAiSuggested (0\|1), isApproved (0\|1), createdAt (ms) |
|
||
| `projects` | id, clientId, name, status (active\|archived), aiSummary, createdAt (ms) |
|
||
| `checkpoints` | id, projectId (required), title, date (ms), isAiSuggested (0\|1), isApproved (0\|1), createdAt (ms) |
|
||
| `notes` | id, projectId, title, content (markdown), createdAt (ms), updatedAt (ms) |
|
||
| `taskComments` | id, taskId, author, content, createdAt (ms) |
|
||
| `clients` | id, parentId, name, industry, createdAt (ms) |
|
||
|
||
---
|
||
|
||
## Phase B — Backend Changes
|
||
|
||
### Step B.1 — WS context + frame types
|
||
- [x] Create `app/core/ws_context.py` (~25 lines):
|
||
- `_client_executor: ContextVar[Callable]` — holds the async callback for the current WS session
|
||
- `async def execute_on_client(action, table=None, data=None, filters=None, vector=None, limit=None) -> dict`:
|
||
- Reads callback from ContextVar
|
||
- Builds `tool_call` payload: `{id: str(uuid4()), action, table, data, filters, vector, limit}` (omits None fields)
|
||
- Calls `await callback(payload)` — which sends the WS frame and waits for `tool_result`
|
||
- Returns the result dict
|
||
- `def set_client_executor(fn)` / `def clear_client_executor()` — ContextVar management
|
||
- [x] Add to `app/schemas.py`:
|
||
- `WsFrameType(str, Enum)`: `chat_request`, `text_chunk`, `tool_call`, `tool_result`, `final`, `ping`
|
||
- `WsToolCall(BaseModel)`: `type`, `id`, `action`, `table?`, `data?`, `filters?`, `vector?`, `limit?`
|
||
- `WsToolResult(BaseModel)`: `type`, `id`, `row?`, `rows?`, `results?`, `deleted?`, `ok?`, `error?`
|
||
- `WsTextChunk(BaseModel)`: `type`, `text`
|
||
- `WsFinal(BaseModel)`: `type`, `response`
|
||
- **Files:** `app/core/ws_context.py`, `app/schemas.py`
|
||
- **Outcome:** Any tool can `await execute_on_client(...)` to query/mutate the user's local DB.
|
||
|
||
### Step B.2 — Rewrite all 23 tools to use `execute_on_client()`
|
||
- [x] Each tool: same `@tool` decorator, same parameters, same docstring. Replace `return json.dumps({...})` body with:
|
||
1. Call `result = await execute_on_client(action=..., table=..., data/filters=...)`
|
||
2. Return human-readable string with confirmation + key data from `result`
|
||
|
||
- [x] **`app/agents/task_agent.py` (8 tools):**
|
||
- `list_tasks(project_id, status, search, order_by)`:
|
||
```python
|
||
result = await execute_on_client(action="select", table="tasks", filters={
|
||
"projectId": project_id or None,
|
||
"status": status or None,
|
||
"search": search or None,
|
||
"orderBy": order_by or None,
|
||
})
|
||
rows = result.get("rows", [])
|
||
if not rows:
|
||
return "No tasks found matching the given filters."
|
||
lines = [f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, id: {r['id']})" for r in rows]
|
||
return f"Found {len(rows)} task(s):\n" + "\n".join(lines)
|
||
```
|
||
- `create_task(title, ...)`:
|
||
```python
|
||
result = await execute_on_client(action="insert", table="tasks", data={
|
||
"title": title, "description": description or None, "status": status,
|
||
"priority": priority, "assignee": assignees, "dueDate": due_date or None,
|
||
"projectId": project_id or None, "isAiSuggested": is_ai_suggested, "isApproved": is_approved,
|
||
})
|
||
row = result["row"]
|
||
return f"Task created: '{row['title']}' (id: {row['id']}, status: {row['status']}, priority: {row['priority']})"
|
||
```
|
||
- `update_task(task_id, ...)`: build updates dict (same logic as now) → `execute_on_client(action="update", table="tasks", data={"id": task_id, "updates": updates})` → return "Task updated: {title}"
|
||
- `delete_task(task_id)`: `execute_on_client(action="delete", table="tasks", data={"id": task_id})` → return "Task deleted"
|
||
- `list_tasks_due_today()`: calculate today's start/end ms → `execute_on_client(action="select", table="tasks", filters={"dueDateFrom": start, "dueDateTo": end})` → format + return
|
||
- `list_task_comments(task_id)`: `execute_on_client(action="select", table="taskComments", filters={"taskId": task_id})` → format + return
|
||
- `add_task_comment(task_id, author, content)`: `execute_on_client(action="insert", table="taskComments", data={...})` → return confirmation
|
||
- `delete_task_comment(comment_id)`: `execute_on_client(action="delete", table="taskComments", data={"id": comment_id})` → return confirmation
|
||
|
||
- [x] **`app/agents/project_agent.py` (6 tools):**
|
||
- `list_projects(client_id, include_archived)`: `execute_on_client(action="select", table="projects", filters={clientId, includeArchived})` → format + return
|
||
- `list_all_projects()`: `execute_on_client(action="select", table="projects")` → format + return
|
||
- `get_project(project_id)`: `execute_on_client(action="get", table="projects", data={"id": project_id})` → return project details or "not found"
|
||
- `create_project(name, client_id)`: `execute_on_client(action="insert", table="projects", data={name, clientId})` → return confirmation + id
|
||
- `update_project(project_id, ...)`: build updates → `execute_on_client(action="update", ...)` → return confirmation
|
||
- `delete_project(project_id)`: `execute_on_client(action="delete", ...)` → return confirmation
|
||
|
||
- [x] **`app/agents/checkpoint_agent.py` (4 tools):**
|
||
- `list_checkpoints(project_id)`: `execute_on_client(action="select", table="checkpoints", filters={projectId})` → format + return
|
||
- `create_checkpoint(project_id, title, date, ...)`: `execute_on_client(action="insert", table="checkpoints", data={...})` → return confirmation + id
|
||
- `update_checkpoint(checkpoint_id, ...)`: build updates → `execute_on_client(action="update", ...)` → return confirmation
|
||
- `delete_checkpoint(checkpoint_id)`: `execute_on_client(action="delete", ...)` → return confirmation
|
||
|
||
- [x] **`app/agents/note_agent.py` (5 tools):**
|
||
- `list_notes(project_id)`: `execute_on_client(action="select", table="notes", filters={projectId})` → format + return
|
||
- `get_note(note_id)`: `execute_on_client(action="get", table="notes", data={"id": note_id})` → return full content or "not found"
|
||
- `create_note(title, content, project_id)`: `execute_on_client(action="insert", table="notes", data={...})` → then `execute_on_client(action="vector_upsert", data={id, projectId, content}, vector=await embed(content))` → return confirmation
|
||
- `update_note(note_id, ...)`: build updates → `execute_on_client(action="update", ...)` → then vector_upsert for updated content → return confirmation
|
||
- `delete_note(note_id)`: `execute_on_client(action="delete", ...)` → return confirmation
|
||
|
||
- **Files:** `app/agents/task_agent.py`, `app/agents/project_agent.py`, `app/agents/checkpoint_agent.py`, `app/agents/note_agent.py`
|
||
- **Outcome:** All 23 tools query real user data via WS. LLM sees actual rows, not action descriptors.
|
||
|
||
### Step B.3 — Bidirectional WebSocket handler
|
||
- [x] Refactor `app/api/routes/chat.py` WS endpoint:
|
||
- After auth + accept + receive `chat_request`:
|
||
1. Create `execute_on_client` callback closure capturing the websocket:
|
||
```python
|
||
pending_calls: dict[str, asyncio.Future] = {}
|
||
|
||
async def on_client_result(frame: dict):
|
||
"""Called when a tool_result frame arrives from Electron."""
|
||
fut = pending_calls.pop(frame["id"], None)
|
||
if fut and not fut.done():
|
||
fut.set_result(frame)
|
||
|
||
async def execute_callback(payload: dict) -> dict:
|
||
"""Send tool_call to Electron, wait for tool_result."""
|
||
call_id = payload["id"]
|
||
fut = asyncio.get_event_loop().create_future()
|
||
pending_calls[call_id] = fut
|
||
await websocket.send_text(json.dumps({"type": "tool_call", **payload}))
|
||
return await asyncio.wait_for(fut, timeout=30.0)
|
||
```
|
||
2. Set `client_executor` ContextVar with `execute_callback`
|
||
3. Run orchestrator in a task — it calls agents, agents call tools, tools call `execute_on_client()` which goes through the callback
|
||
4. In parallel, run a message receive loop that dispatches incoming frames:
|
||
- `tool_result` → `on_client_result(frame)`
|
||
- `ping` → ignore
|
||
5. Orchestrator yields `text_chunk` frames → send to client
|
||
6. Send `final` frame when done
|
||
7. Clear ContextVar
|
||
- Keep heartbeat ping every 30s
|
||
- 30s timeout on `tool_result` — if Electron doesn't respond, future raises `TimeoutError`, tool returns error string to LLM
|
||
- **Files:** `app/api/routes/chat.py`
|
||
- **Outcome:** Full bidirectional WS. Tool calls and text streaming happen concurrently on the same connection.
|
||
|
||
### Step B.4 — `_tool_loop` — no changes needed
|
||
- [x] Verify `app/core/agent_registry.py` works unchanged:
|
||
- `_tool_loop` calls `tool_fn.ainvoke(args)` → tool awaits `execute_on_client()` (WS round-trip) → returns string → `ToolMessage(content=string)` → LLM sees real data
|
||
- The async WS round-trip happens inside each tool. `_tool_loop` just sees an awaited tool returning a string — same as before, different content.
|
||
- **No code changes.** Just verify + add a log line for tool execution times if desired.
|
||
|
||
### Step B.5 — Orchestrator cleanup
|
||
- [x] Update `app/core/orchestrator.py`:
|
||
- `orchestrate_stream()`: remove `"actions": []` from final frame. Final becomes: `{"done": true, "response": "..."}`
|
||
- No other changes — `classify_intent` → `call_agent` → chunk response → final frame
|
||
- **Files:** `app/core/orchestrator.py`
|
||
- **Outcome:** Clean final frame. No more action descriptors in the protocol.
|
||
|
||
### Step B.6 — Add `/vectors/embed` endpoint
|
||
- [x] Add to `app/api/routes/vectors.py`:
|
||
- `POST /api/v1/storage/vectors/embed`:
|
||
- Request: `{ text: str }`
|
||
- Response: `{ vector: list[float] }` (1536-dim from `text-embedding-3-small`)
|
||
- Auth required (JWT)
|
||
- Used by:
|
||
- Backend tools: `note_agent` calls this before `vector_upsert`
|
||
- Electron: `vectordb.ts` calls this for note embedding on create/update
|
||
- **Files:** `app/api/routes/vectors.py`
|
||
- **Outcome:** Single embedding endpoint. Both backend tools and Electron can generate vectors.
|
||
|
||
---
|
||
|
||
## Verification
|
||
|
||
| What to test | How |
|
||
|---|---|
|
||
| **Read flow** | "List my tasks" → `list_tasks` → `tool_call{select, tasks}` → Electron returns rows → LLM describes real tasks |
|
||
| **Write flow** | "Create a task called Buy milk" → `create_task` → `tool_call{insert, tasks, data:{title:"Buy milk"}}` → Electron inserts + returns row → tool confirms with id |
|
||
| **Multi-tool** | "How many todo tasks do I have?" → `list_tasks(status=todo)` → LLM counts actual rows → "You have 3 todo tasks" |
|
||
| **Vector search** | "Find notes about deployment" → tool embeds → `tool_call{vector_search, vector:[...]}` → Electron searches LanceDB → returns matching notes |
|
||
| **Vector upsert** | "Create a note about..." → insert note → vector_upsert with embedding → both SQLite + LanceDB updated |
|
||
| **Tool timeout** | Disconnect Electron mid-conversation → 30s timeout → tool returns error → LLM handles gracefully |
|
||
| **Concurrent calls** | Agent calls 2 tools in sequence → each does WS round-trip → both succeed → LLM sees both results |
|
||
| **_tool_loop max iter** | Verify 5-iteration limit still works → after 5 tool calls, LLM forced to answer without tools |
|
||
|
||
---
|
||
|
||
## Execution Notes
|
||
|
||
- **Phase 1 is the critical path.** Auth + backend client + drizzle executor + orchestrator refactor must land first.
|
||
- **Steps 1.1–1.4 are additive** — existing app keeps working until Step 1.5 swaps the orchestrator.
|
||
- **Step 2.1 is the point of no return** — after removing LangChain, there's no local AI fallback.
|
||
- **Phase B (backend changes) must land before Phase 1.3–1.5** — Electron needs the bidirectional WS to talk to.
|
||
- **Phase 3 and Phase 4 are independent** — can be parallelized after Phase 2.
|
||
|
||
---
|
||
|
||
## Phase 3 — Agent System: Config, Orchestration & Cloud Connectors
|
||
|
||
> **Objective:** Backend manages all agent configuration, scheduling, orchestration, and cloud data fetching. Two agent types: **Local Directory Agent** (backend triggers Electron to read files, then AI analyzes) and **Cloud Connector Agent** (backend fetches Gmail/Teams data directly, AI analyzes, pushes results to Electron via WS tool_call). All extracted items use existing WS tool infrastructure to insert into Electron's local DB with `is_ai_suggested=True`.
|
||
>
|
||
> **Electron Phase 3 plan:** `../adiuva/AI_REFACTOR_PLAN.md` Phase 3 section.
|
||
|
||
### Architecture
|
||
|
||
```
|
||
Local Agent:
|
||
Scheduler/manual trigger ──► check device online ──► WS agent_run → Electron
|
||
──► Electron reads files ──► WS agent_data → Backend
|
||
──► Backend AI (prompt_template + file content) ──► WS tool_call(insert) → Electron
|
||
──► Electron persists with isAiSuggested=1
|
||
|
||
Cloud Agent:
|
||
Scheduler/manual trigger ──► Backend fetches Gmail/Teams (OAuth) ──► Backend AI analyzes
|
||
──► check device online ──► WS tool_call(insert) → Electron ──► Electron persists
|
||
```
|
||
|
||
**New WS frame types:**
|
||
|
||
| Direction | `type` | Payload |
|
||
|---|---|---|
|
||
| Server → Client | `agent_run` | `{ run_id, agent_id, config: { paths, file_extensions, prompt_template, data_types } }` |
|
||
| Client → Server | `agent_data` | `{ run_id, files: [{ path, name, content, metadata }] }` |
|
||
| Client → Server | `agent_complete` | `{ run_id, files_read, errors }` |
|
||
| Client → Server | `device_hello` | `{ device_id, agent_ids }` |
|
||
|
||
### Step 3.1 — Agent config tables
|
||
- [x] Add to `app/models.py`:
|
||
- **`LocalAgentConfig`**:
|
||
- `id` UUID PK
|
||
- `user_id` FK → users
|
||
- `device_id` str — identifies which Electron install this config belongs to
|
||
- `name` str
|
||
- `directory_paths` JSON — list of absolute paths on the device
|
||
- `data_types` JSON — which tables to extract to: `["tasks", "notes", "checkpoints", "projects"]`
|
||
- `prompt_template` text — user-configured via Chatbot Journey
|
||
- `file_extensions` JSON — e.g. `[".eml", ".txt", ".pdf", ".md"]`
|
||
- `schedule_cron` str — e.g. `"0 */6 * * *"` (every 6h)
|
||
- `enabled` bool (default True)
|
||
- `last_run_at` datetime nullable
|
||
- `created_at`, `updated_at` timestamps
|
||
- **`CloudAgentConfig`**:
|
||
- `id` UUID PK
|
||
- `user_id` FK → users
|
||
- `provider` str — enum: `gmail`, `teams`, `outlook`
|
||
- `name` str
|
||
- `data_types` JSON — same format as local
|
||
- `prompt_template` text
|
||
- `oauth_token_encrypted` text — Fernet-encrypted OAuth2 credentials
|
||
- `schedule_cron` str
|
||
- `enabled` bool (default True)
|
||
- `last_run_at` datetime nullable
|
||
- `filter_config` JSON — provider-specific: `{ labels: [], date_range: {from, to}, senders: [] }`
|
||
- `created_at`, `updated_at` timestamps
|
||
- **`AgentRunLog`**:
|
||
- `id` UUID PK
|
||
- `agent_id` str — references LocalAgentConfig.id or CloudAgentConfig.id
|
||
- `agent_type` str — `local` or `cloud`
|
||
- `user_id` FK → users
|
||
- `status` str — `running`, `success`, `error`, `partial`
|
||
- `items_processed` int (default 0)
|
||
- `items_created` int (default 0)
|
||
- `errors` JSON — list of error strings
|
||
- `started_at` datetime
|
||
- `completed_at` datetime nullable
|
||
- [x] Add Pydantic schemas to `app/schemas.py`:
|
||
- `LocalAgentConfigCreate`, `LocalAgentConfigUpdate`, `LocalAgentConfigResponse`
|
||
- `CloudAgentConfigCreate`, `CloudAgentConfigUpdate`, `CloudAgentConfigResponse`
|
||
- `AgentRunLogResponse`
|
||
- `AgentCatalogItem` — `{ type, name, description, config_schema }`
|
||
- `WsAgentRun`, `WsAgentData`, `WsAgentComplete`, `WsDeviceHello`
|
||
- [x] Generate Alembic migration
|
||
- **Files:** `app/models.py`, `app/schemas.py`, `alembic/versions/`
|
||
- **Outcome:** Agent config and run tracking tables in PostgreSQL.
|
||
|
||
### Step 3.2 — Agent CRUD API routes
|
||
- [ ] Create `app/api/routes/agents.py`:
|
||
- `GET /api/v1/agents/catalog` — returns hardcoded agent type catalog:
|
||
- `local_directory`: "Watches local directories, extracts data from files using AI"
|
||
- `gmail`: "Scans Gmail inbox, extracts tasks/notes from emails"
|
||
- `teams`: "Monitors Teams messages, extracts action items"
|
||
- `outlook`: "Scans Outlook inbox, extracts tasks/notes"
|
||
- `GET /api/v1/agents/local` — list user's local agent configs
|
||
- `POST /api/v1/agents/local` — create local agent config
|
||
- Body: `{ name, device_id, directory_paths, data_types, prompt_template, file_extensions, schedule_cron }`
|
||
- Tier check: count enabled agents ≤ `batch_active` limit
|
||
- `PUT /api/v1/agents/local/{id}` — update config (ownership check)
|
||
- `DELETE /api/v1/agents/local/{id}` — delete config + associated run logs
|
||
- `GET /api/v1/agents/cloud` — list user's cloud agent configs
|
||
- `POST /api/v1/agents/cloud` — create cloud connector config
|
||
- Body: `{ provider, name, data_types, prompt_template, oauth_token_encrypted, schedule_cron, filter_config }`
|
||
- Tier check: same `batch_active` limit (local + cloud count together)
|
||
- `PUT /api/v1/agents/cloud/{id}` — update config
|
||
- `DELETE /api/v1/agents/cloud/{id}` — delete config + run logs
|
||
- `GET /api/v1/agents/runs` — query params: `agent_id`, `page`, `limit` → paginated run logs
|
||
- `POST /api/v1/agents/{id}/run` — manual trigger (dispatches to agent runner)
|
||
- All routes require JWT auth; ownership enforced on all mutations
|
||
- [ ] Register router in `app/main.py`
|
||
- **Files:** `app/api/routes/agents.py`, `app/main.py`
|
||
- **Outcome:** Full CRUD for agent configs with tier-gated creation limits.
|
||
|
||
### Step 3.3 — Device WS endpoint
|
||
- [ ] Create `app/api/routes/device_ws.py`:
|
||
- `WebSocket /api/v1/ws/device?token=<jwt>` — persistent connection from Electron
|
||
- On connect:
|
||
- Authenticate JWT
|
||
- Receive `device_hello` frame → extract `device_id`, `agent_ids`
|
||
- Store connection in `DeviceConnectionManager` (in-memory dict: `user_id → { ws, device_id }`)
|
||
- Check for overdue agent runs → trigger them immediately
|
||
- Message loop:
|
||
- `agent_data` → route to active agent run handler
|
||
- `agent_complete` → finalize agent run
|
||
- `tool_result` → route to pending tool call (same pattern as chat WS)
|
||
- `pong` → heartbeat ack
|
||
- On disconnect:
|
||
- Remove from `DeviceConnectionManager`
|
||
- Mark any in-progress agent runs as `error` with "device disconnected"
|
||
- Heartbeat: send `ping` every 30s, disconnect if no `pong` within 10s
|
||
- [ ] Create `app/core/device_manager.py`:
|
||
- `DeviceConnectionManager` (singleton):
|
||
- `register(user_id, device_id, ws)` — stores active connection
|
||
- `unregister(user_id)` — removes connection
|
||
- `get_ws(user_id) -> WebSocket | None` — returns active WS if device is online
|
||
- `is_online(user_id, device_id=None) -> bool` — optionally checks specific device
|
||
- `send_frame(user_id, frame: dict)` — sends JSON frame to device
|
||
- **Files:** `app/api/routes/device_ws.py`, `app/core/device_manager.py`, `app/main.py`
|
||
- **Outcome:** Backend maintains persistent WS connections to Electron devices for agent triggers.
|
||
|
||
### Step 3.4 — Agent run orchestrator
|
||
- [ ] Create `app/core/agent_runner.py`:
|
||
- `async run_local_agent(user_id, config: LocalAgentConfig, device_mgr: DeviceConnectionManager)`:
|
||
1. Check device is online with matching `device_id` → abort if offline
|
||
2. Create `AgentRunLog` with `status=running`
|
||
3. Send `WsAgentRun` frame to Electron with config (paths, extensions, prompt)
|
||
4. Await `WsAgentData` frames — collect file contents
|
||
5. Await `WsAgentComplete` frame — Electron signals done reading
|
||
6. For each file: call LLM with `prompt_template` + file content → extract structured items
|
||
7. For each extracted item: send `WsToolCall(insert, table, data)` to Electron → await `WsToolResult`
|
||
- All inserts include `is_ai_suggested=True, is_approved=False`
|
||
8. Update `AgentRunLog`: `status=success`, `items_processed`, `items_created`
|
||
- `async run_cloud_agent(user_id, config: CloudAgentConfig, device_mgr: DeviceConnectionManager)`:
|
||
1. Check device is online → abort if offline (results must push to Electron)
|
||
2. Create `AgentRunLog` with `status=running`
|
||
3. Decrypt OAuth credentials from `config.oauth_token_encrypted`
|
||
4. Fetch data from cloud provider (Step 3.6):
|
||
- Gmail: `google-api-python-client` + `filter_config` label/date filters
|
||
- Teams: `msgraph-sdk` + channel/date filters
|
||
- Outlook: `msgraph-sdk` + folder/date filters
|
||
5. For each item: call LLM with `prompt_template` + email/message content → extract structured items
|
||
6. For each extracted item: send `WsToolCall(insert)` to Electron → await `WsToolResult`
|
||
7. Update `AgentRunLog`
|
||
- `async trigger_pending_runs(user_id, device_id, device_mgr)`:
|
||
- Called when Electron connects (after `device_hello`)
|
||
- Queries all enabled agent configs where `last_run_at + schedule_interval < now()`
|
||
- For local agents: only triggers if `config.device_id == device_id`
|
||
- For cloud agents: triggers regardless of device (any connected device can receive results)
|
||
- Executes runs sequentially (one at a time to avoid overwhelming the WS)
|
||
- Error handling: on any failure, update `AgentRunLog` with `status=error` + error details
|
||
- **Files:** `app/core/agent_runner.py`
|
||
- **Outcome:** Backend drives all agent execution — both local (via WS file request) and cloud (direct API calls).
|
||
|
||
### Step 3.5 — Chatbot Journey endpoint
|
||
- [ ] Create `app/api/routes/agent_setup.py`:
|
||
- `POST /api/v1/agents/journey/start`:
|
||
- Body: `{ agent_type: "local"|"cloud", data_types: ["tasks", "notes", ...] }`
|
||
- Creates a journey session (in-memory or Redis-backed)
|
||
- Returns first AI message: contextual question based on agent type
|
||
- Local: "What kind of files are in the directories you want to monitor? (emails, documents, logs, etc.)"
|
||
- Cloud: "What kind of emails/messages should I look for? (client communications, invoices, meeting notes, etc.)"
|
||
- Response: `{ session_id, message, done: false }`
|
||
- `POST /api/v1/agents/journey/message`:
|
||
- Body: `{ session_id, message }`
|
||
- AI processes user's answer, asks follow-up questions (max 5 turns)
|
||
- System prompt: "You are configuring a data extraction agent for a freelancer. Ask about file format, what data to extract (tasks, notes, checkpoints), naming conventions, priority rules, and any special mapping. After 3-5 questions, generate a detailed prompt_template."
|
||
- When AI determines enough context: `{ session_id, message: "Here's your configuration...", done: true, prompt_template: "..." }`
|
||
- The `prompt_template` is a structured instruction for the extraction LLM (e.g. "Extract tasks from email. Subject becomes task title. If body contains 'urgent' or 'ASAP', set priority to 'high'. Extract due dates if mentioned.")
|
||
- **Files:** `app/api/routes/agent_setup.py`, `app/main.py`
|
||
- **Outcome:** Users configure AI prompts through guided conversation, not manual text editing.
|
||
|
||
### Step 3.6 — Cloud provider integrations
|
||
- [ ] Create `app/integrations/gmail.py`:
|
||
- `GmailClient`:
|
||
- `__init__(oauth_token)` — initializes Google API client
|
||
- `async fetch_messages(filter_config, since: datetime) -> list[EmailMessage]`
|
||
- `EmailMessage`: `{ id, subject, sender, body_text, date, labels }`
|
||
- Handles token refresh via Google OAuth2 refresh flow
|
||
- Respects `filter_config.labels`, `filter_config.date_range`, `filter_config.senders`
|
||
- [ ] Create `app/integrations/ms_graph.py`:
|
||
- `MSGraphClient`:
|
||
- `__init__(oauth_token)` — initializes MS Graph client
|
||
- `async fetch_emails(filter_config, since: datetime) -> list[EmailMessage]` (Outlook)
|
||
- `async fetch_messages(filter_config, since: datetime) -> list[ChatMessage]` (Teams)
|
||
- `ChatMessage`: `{ id, content, sender, channel, date }`
|
||
- Handles token refresh via MSAL
|
||
- [ ] Create `app/integrations/__init__.py` — factory: `get_provider(provider_name) -> GmailClient | MSGraphClient`
|
||
- **Dependencies:** `google-api-python-client`, `google-auth-oauthlib`, `msgraph-sdk`, `msal`
|
||
- **Files:** `app/integrations/gmail.py`, `app/integrations/ms_graph.py`, `app/integrations/__init__.py`
|
||
- **Outcome:** Backend can fetch emails/messages from Gmail, Outlook, and Teams.
|
||
|
||
### Step 3.7 — Agent scheduler
|
||
- [ ] Create `app/core/agent_scheduler.py`:
|
||
- Uses `APScheduler` (or simple asyncio loop) to check agent schedules
|
||
- Every 60s: query enabled agents where `last_run_at + cron_interval < now()`
|
||
- For each due agent:
|
||
- Check if user's device is online via `DeviceConnectionManager`
|
||
- If online: dispatch to `agent_runner`
|
||
- If offline: skip (will trigger on next `device_hello`)
|
||
- Locks: use PostgreSQL advisory locks to prevent duplicate runs in multi-instance deployments
|
||
- [ ] Integrate with FastAPI lifespan (start scheduler on app startup, shutdown gracefully)
|
||
- **Dependencies:** `apscheduler>=4.0`
|
||
- **Files:** `app/core/agent_scheduler.py`, `app/main.py`
|
||
- **Outcome:** Agents run automatically on their configured schedules.
|
||
|
||
### Step 3.8 — OAuth flow endpoints
|
||
- [ ] Create `app/api/routes/oauth.py`:
|
||
- `GET /api/v1/oauth/{provider}/authorize` — returns OAuth authorization URL
|
||
- Gmail: Google OAuth2 with `gmail.readonly` scope
|
||
- Outlook/Teams: MS identity platform with `Mail.Read`, `ChannelMessage.Read.All` scopes
|
||
- `GET /api/v1/oauth/{provider}/callback` — handles OAuth redirect
|
||
- Exchanges auth code for access + refresh tokens
|
||
- Encrypts tokens with Fernet (server-side key from settings)
|
||
- Returns encrypted token blob for storage in `CloudAgentConfig.oauth_token_encrypted`
|
||
- `POST /api/v1/oauth/{provider}/refresh` — refresh expired OAuth token
|
||
- **Files:** `app/api/routes/oauth.py`, `app/main.py`
|
||
- **Outcome:** Users can connect Gmail/Teams/Outlook accounts securely.
|
||
|
||
---
|
||
|
||
### Phase 3 — Verification
|
||
|
||
| # | Scenario | Expected |
|
||
|---|---|---|
|
||
| 1 | **Agent CRUD** | Create/read/update/delete local and cloud configs; tier limits enforced (free=2, pro=10) |
|
||
| 2 | **WS device connect** | Electron connects → `device_hello` → backend stores connection → triggers overdue runs |
|
||
| 3 | **Local agent run** | Backend sends `agent_run` → Electron reads files → `agent_data` → backend AI extracts → `tool_call(insert)` → Electron persists with `isAiSuggested=1` |
|
||
| 4 | **Cloud agent run** | Backend fetches Gmail → AI extracts tasks → `tool_call(insert)` → Electron persists |
|
||
| 5 | **Device binding** | Local agent config with `device_id=A` only triggers when device A is connected |
|
||
| 6 | **Chatbot Journey** | Start journey → 3-5 Q&A turns → produces valid `prompt_template` |
|
||
| 7 | **Schedule** | Agent with `schedule_cron="0 */6 * * *"` runs every 6h when device is online |
|
||
| 8 | **Offline resilience** | Device offline → runs skipped → device reconnects → overdue runs trigger immediately |
|
||
| 9 | **OAuth flow** | Gmail authorize → callback → token encrypted → stored in config → fetch emails works |
|
||
|
||
### Phase 3 — New Dependencies
|
||
|
||
| Package | Purpose |
|
||
|---|---|
|
||
| `google-api-python-client` | Gmail API access |
|
||
| `google-auth-oauthlib` | Gmail OAuth2 flow |
|
||
| `msgraph-sdk` | Outlook + Teams API access |
|
||
| `msal` | MS identity platform auth |
|
||
| `apscheduler>=4.0` | Agent scheduling |
|
||
| `cryptography` (Fernet) | OAuth token encryption at rest |
|
||
- **One step at a time.** Mark `[x]` and commit with `step N.N complete: <outcome>`. |