42 KiB
Backend Plan — Adiuva Cloud API
Separate repository. This document defines the FastAPI backend that the Electron app communicates with.
The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, E2E backup blob storage, cloud storage (encrypted blobs), cloud vector store, and plugin marketplace. The backend NEVER persists user data in plaintext. Cloud storage blobs are E2E encrypted before upload — the backend only verifies integrity, never decrypts.
Project Structure
adiuva-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI entry + CORS + lifespan + router includes
│ ├── core/
│ │ ├── __init__.py
│ │ ├── agent_registry.py # Base classes + singleton registry
│ │ ├── orchestrator.py # LLM-based intent router
│ │ ├── execution_plan.py # Plan builder + cache
│ │ └── plugin_loader.py # Dynamic agent loading
│ ├── agents/ # Chat agents (proprietary logic + prompts)
│ │ ├── __init__.py # Auto-registers all agents
│ │ ├── task_agent.py
│ │ ├── calendar_agent.py
│ │ ├── email_agent.py
│ │ └── analytics_agent.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── chat.py # POST /chat + WS /chat/stream
│ │ │ ├── plans.py # GET /plans/playbook
│ │ │ ├── storage.py # CRUD cloud storage (E2E encrypted blobs)
│ │ │ ├── vectors.py # Upsert/search cloud vector store
│ │ │ ├── backup.py # PUT/GET /backup
│ │ │ ├── plugins.py # Plugin marketplace
│ │ │ ├── auth.py # Register/login/refresh
│ │ │ └── billing.py # Checkout/webhook/subscription
│ │ └── middleware/
│ │ ├── __init__.py
│ │ ├── auth.py # JWT validation
│ │ ├── rate_limit.py # Tier-aware rate limiting
│ │ └── sanitizer.py # Strip prompt metadata from responses
│ ├── storage/
│ │ ├── __init__.py
│ │ ├── blob_store.py # S3 for E2E encrypted blobs
│ │ ├── vector_store.py # Cloud vector store (Pinecone/Qdrant)
│ │ └── encryption.py # Integrity verification only — NO decryption
│ ├── marketplace/
│ │ ├── __init__.py
│ │ ├── plugin_registry.py # Plugin catalog (metadata, versions, ratings)
│ │ ├── plugin_review.py # Review queue + approval workflow
│ │ └── revenue_share.py # 70/30 split tracking with Stripe Connect
│ ├── billing/
│ │ ├── __init__.py
│ │ ├── stripe_service.py # Stripe checkout + webhooks
│ │ └── tier_manager.py # Feature matrix per tier
│ └── config/
│ ├── __init__.py
│ └── settings.py # Pydantic BaseSettings (env-based)
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Fixtures: test client, mock agents, mock LLM
│ ├── test_orchestrator.py
│ ├── test_agents.py
│ ├── test_auth.py
│ ├── test_backup.py
│ ├── test_storage.py
│ └── test_plugins.py
├── alembic/ # DB migrations (auth/billing/marketplace tables only)
│ ├── alembic.ini
│ └── versions/
├── requirements.txt
├── Dockerfile
├── docker-compose.yml # App + PostgreSQL + Redis (dev)
├── .env.example
└── README.md
Step-by-Step Implementation
Step 1 — Project scaffolding ✅
- Initialize repo with the directory structure above
- Write
requirements.txt:fastapi>=0.115.0 uvicorn[standard]>=0.34.0 langchain>=0.3.0 langchain-openai>=0.3.0 pydantic>=2.10.0 python-jose[cryptography]>=3.3.0 stripe>=11.0.0 boto3>=1.35.0 slowapi>=0.1.9 sqlalchemy>=2.0.0 asyncpg>=0.30.0 alembic>=1.14.0 bcrypt>=4.2.0 python-dotenv>=1.0.0 httpx>=0.28.0 websockets>=14.0 pytest>=8.0.0 pytest-asyncio>=0.24.0 - Write
app/main.py: FastAPI app with CORS (allowapp://,http://localhost:*), lifespan (init DB pool, init agent registry), include all routers under/api/v1 - Write
app/config/settings.py:Settings(BaseSettings)with fields:DATABASE_URL,JWT_SECRET,JWT_ALGORITHM(default HS256),STRIPE_SECRET_KEY,STRIPE_WEBHOOK_SECRET,S3_BUCKET,S3_REGION,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,OPENAI_API_KEY,CORS_ORIGINS,ENV(dev/prod),PINECONE_API_KEY,PINECONE_INDEX,QDRANT_URL,QDRANT_API_KEY - Write
Dockerfile: Python 3.12 slim, multi-stage (builder + runtime), non-root user - Write
docker-compose.yml: app, postgres:16, optional redis - Write
.env.example - Outcome: Runnable FastAPI skeleton (returns 404 on all routes).
Step 2 — Pydantic schemas (API contracts) ✅
- Create
app/schemas.py(mirrorssrc/shared/api-types.tsfrom Electron repo):ChatRequest:message: str,context: ChatContext,execution_mode: Literal['direct', 'plan']ChatContext:user_profile: dict,relevant_documents: list[str],recent_tasks: list[dict],conversation_history: list[dict]ChatResponse:response: str,actions: list[PlanAction]PlanAction:type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification', 'call_agent'],table: str | None,data: dict | None,agent: str | NoneExecutionPlan:agent: str,steps: list[PlanStep]PlanStep:action: str,prompt_template: str | None,variables: dict | None,data_from_step: int | NoneBackupMetadata:version: int,timestamp: int,checksum: str,chunk_count: intBillingTier:Literal['free', 'pro', 'power', 'team']AuthTokens:access_token: str,refresh_token: str,expires_at: intUserProfile:id: str,email: str,tier: BillingTierStorageRecord:id: str,user_id: str,table: str,blob: bytes,checksum: str,created_at: int,updated_at: int— blob is always E2E encrypted by clientStorageRecordCreate:table: str,blob: bytes,checksum: strStorageRecordUpdate:blob: bytes,checksum: strVectorUpsertRequest:vectors: list[VectorItem]VectorItem:id: str,blob: bytes,checksum: str— vector + metadata encrypted by clientVectorSearchRequest:query_blob: bytes,top_k: int = 10VectorSearchResponse:results: list[VectorSearchResult]VectorSearchResult:id: str,score: float,blob: bytesPluginManifest:id: str,name: str,description: str,version: str,author: str,permissions: list[str],category: str,price_cents: int = 0PluginListResponse:plugins: list[PluginManifest],total: int,page: intPluginInstallRequest:plugin_id: str
- Outcome: All request/response models defined and validated.
Step 3 — Agent Registry + base classes ✅
app/core/agent_registry.py:BaseAgent(ABC):user_id: str,shared_memory: dict,vector_store_context: list[str],skills: list[str]- Abstract
get_name() -> str,get_description() -> str
ChatAgent(BaseAgent):- Abstract
async handle(query: str, context: dict) -> str - Abstract
get_tools() -> list(LangChain tool definitions) - Concrete
_tool_loop(llm, messages, tools, max_iter=5) -> str— shared tool-calling loop
- Abstract
AgentRegistry(singleton):_agents: dict[str, ChatAgent]register(agent_class)— decorator patternget(name) -> ChatAgentlist_agents() -> list[dict]— returns[{name, description}]for orchestrator promptasync call_agent(name, query, context) -> str— for inter-agent calls
- Unit tests: register, get, list, call_agent with mock
- Outcome: Pluggable agent framework.
Step 4 — Orchestrator ✅
app/core/orchestrator.py:async classify_intent(message, context, registry) -> str:- System prompt: "You are an intent classifier. Given the user message and context, decide which agent to route to. Available agents: {registry.list_agents()}. Respond with just the agent name."
- Uses gpt-4o-mini via LangChain for low latency
- Falls back to
task_agentif no clear match
async route_single(agent_name, message, context) -> ChatResponse:- Instantiates agent from registry
- Calls
agent.handle(message, context) - Returns response + any actions the agent produced
async route_pipeline(agent_names, message, context) -> ChatResponse:- Executes agents in sequence
- Each agent receives
{...context, previous_results: [...]} - Final synthesis via LLM: "Summarize these agent results into a coherent response"
async orchestrate(request: ChatRequest) -> ChatResponse | ExecutionPlan:- Main entry point
- Context is transparent to orchestrator — data may originate from local or cloud storage on the client side
- Classifies intent
- If
execution_mode == 'direct': route + return response - If
execution_mode == 'plan': route + return execution plan with template IDs
async orchestrate_stream(request: ChatRequest) -> AsyncGenerator[str, None]:- Same as orchestrate but yields tokens for WebSocket streaming
- Integration tests with mocked LLM and mocked agents
- Outcome: Intelligent routing with single-agent and pipeline modes.
Step 5 — Execution Plan generator ✅
app/core/execution_plan.py:PromptTemplateRegistry: dict oftemplate_id -> prompt_text. Templates are server-side only — client receives IDs.ExecutionPlanBuilder:add_step(action, params) -> selfadd_llm_step(template_id, variables) -> selfadd_data_step(action, data_from_step) -> selfbuild() -> ExecutionPlan— validates step references
PlanCache:- In-memory LRU (maxsize=1000)
cache_plan(key, plan),get_plan(key),get_all_playbooks() -> list[ExecutionPlan]- Playbooks are pre-built plans for common operations (e.g., "create task from email", "generate weekly report")
- Outcome: Plans are cacheable as playbooks. Prompt IP never leaves the server.
Step 6 — Chat Agents ✅
app/agents/task_agent.py—@registry.register:- Description: "Manages tasks and comments: list, create, update, delete, due-today, comments"
- Tools (8):
list_tasks(project_id, status, search, order_by),create_task(title, description, status, priority, assignees, due_date, project_id, is_ai_suggested, is_approved),update_task(task_id, ...),delete_task(task_id),list_tasks_due_today(),list_task_comments(task_id),add_task_comment(task_id, author, content),delete_task_comment(comment_id) - status:
todo|in_progress|done; priority:high|medium|low; assignees: JSON-encoded string; due_date: ms timestamp - Accepts flexible context; sentinel
-1for optional integer update fields
app/agents/checkpoint_agent.py—@registry.register:- Description: "Manages project checkpoints (milestones): list, create, update, delete"
- Tools (4):
list_checkpoints(project_id),create_checkpoint(project_id, title, date, is_ai_suggested, is_approved),update_checkpoint(checkpoint_id, ...),delete_checkpoint(checkpoint_id) project_idis required for create; date is a ms timestamp; supports AI-suggestion + approval workflow
app/agents/project_agent.py—@registry.register:- Description: "Manages projects: list, get, create, update, archive, delete"
- Tools (6):
list_projects(client_id, include_archived),list_all_projects(),get_project(project_id),create_project(name, client_id),update_project(project_id, ...),delete_project(project_id) - status:
active|archived; prefers archive over deletion (docstring guard on delete)
app/agents/note_agent.py—@registry.register:- Description: "Manages notes: list, get, create, update, delete"
- Tools (5):
list_notes(project_id),get_note(note_id),create_note(title, content, project_id),update_note(note_id, ...),delete_note(note_id) - content is Markdown;
get_noteshould be called before update to preserve existing content
app/agents/__init__.py: imports all four agent modules to trigger@registry.registerdecorators- Unit tests per agent with mocked LLM (registration, names, tool counts, handle(), direct tool invocation)
- Outcome: Four domain-specific agents matching the UI data model (Tasks, Checkpoints, Projects, Notes), all registered and tested.
Step 7 — Storage Layer ✅
app/storage/blob_store.py:BlobStore:async upload,async download,async delete(idempotent),async list_keys- Keys:
{user_id}/{table}/{record_id}— backend never inspects blob content - boto3 S3 with SSE-S3 at-rest encryption; client checksum stored in S3 object metadata
app/storage/vector_store.py:VectorStore:async upsert,async search,async delete- Pinecone (default,
namespace=user_id) or Qdrant (user_idpayload filter) — runtime-configurable - 32-dim SHA-256-derived float vector; blob stored as base64 in metadata/payload
- ANN on encrypted data: known accuracy trade-off, documented
app/storage/encryption.py:verify_checksum(blob, checksum) -> bool— SHA-256 +hmac.compare_digest(constant-time)reject_if_tampered(blob, checksum)— raisesHTTP 400on mismatch- Backend NEVER holds decryption keys
app/schemas.py: addedStorageRecord*,VectorItem,VectorUpsertRequest,VectorSearch*,Plugin*schemasapp/config/settings.py: addedPINECONE_API_KEY,PINECONE_INDEX,QDRANT_URL,QDRANT_API_KEYrequirements.txt: addedmoto[s3],pinecone,qdrant-client- 37 unit tests covering encryption, BlobStore (moto), VectorStore Pinecone, VectorStore Qdrant
- Outcome: Cloud storage layer that handles E2E encrypted blobs without ever accessing plaintext.
Step 8 — API Routes ✅
8a — Chat endpoint
app/api/routes/chat.py:POST /api/v1/chat:- Request:
ChatRequest - Calls
orchestrate(request)ororchestrate()+build_plan() - Response:
ChatResponseorExecutionPlan
- Request:
WebSocket /api/v1/chat/stream:- Client sends
ChatRequestas first JSON frame - Server yields token strings via
orchestrate_stream() - Final frame: JSON
ChatResponsewith{"done": true, "response": "...", "actions": [...]} - Heartbeat ping every 30s to keep connection alive
- Client sends
8b — Plans endpoint
app/api/routes/plans.py:GET /api/v1/plans/playbook: Returns all playbooks available for the user's tierGET /api/v1/plans/playbook/{plan_id}: Returns a specific plan
8c — Storage endpoint (cloud records)
app/api/routes/storage.py:POST /api/v1/storage/records: Create encrypted record- Request:
StorageRecordCreate - Verifies checksum, stores blob in S3, inserts metadata row in PostgreSQL
- Response:
{id: str, created_at: int}
- Request:
GET /api/v1/storage/records: List record metadata (no blobs)- Query params:
table: str,page: int,limit: int - Response:
list[{id, table, checksum, created_at, updated_at}]
- Query params:
GET /api/v1/storage/records/{id}: Download encrypted blob- Response: blob bytes +
X-Checksumheader
- Response: blob bytes +
PUT /api/v1/storage/records/{id}: Update encrypted blob- Request:
StorageRecordUpdate
- Request:
DELETE /api/v1/storage/records/{id}: Delete record + S3 blob- All routes enforce tier cloud_storage_gb quota via
TierManager.check_quota(user_id)
8d — Vectors endpoint (cloud vector store)
app/api/routes/vectors.py:POST /api/v1/storage/vectors/upsert:- Request:
VectorUpsertRequest - Verifies checksums, delegates to
VectorStore.upsert() - Response:
{upserted: int}
- Request:
POST /api/v1/storage/vectors/search:- Request:
VectorSearchRequest - Delegates to
VectorStore.search() - Response:
VectorSearchResponse
- Request:
DELETE /api/v1/storage/vectors:- Request:
{ids: list[str]}
- Request:
8e — Backup endpoint
app/api/routes/backup.py:PUT /api/v1/backup: Accepts binary blob + metadata headers (X-Backup-Version,X-Backup-Timestamp,X-Backup-Checksum). Stores in S3 keyed by{user_id}/{timestamp}. Enforces tier limits:- Free: 0 (no backup)
- Pro: 5 GB
- Power: 25 GB
- Team: unlimited
GET /api/v1/backup: Returns latest blob for authenticated user. SupportsIf-Modified-Since.GET /api/v1/backup/history: Returns list ofBackupMetadata(no blobs).DELETE /api/v1/backup/{backup_id}: Delete specific backup.
8f — Plugins endpoint
app/api/routes/plugins.py:GET /api/v1/plugins:- Query params:
category: str | None,q: str | None,page: int,sort: Literal['rating', 'installs', 'newest'] - Response:
PluginListResponse - Available from Power tier and above
- Query params:
GET /api/v1/plugins/{id}:- Response:
PluginManifest+ ratings + install count
- Response:
POST /api/v1/plugins/{id}/install:- Request:
PluginInstallRequest - Records installation for the user (billing tracking, analytics)
- If plugin is paid: triggers Stripe Connect charge + revenue split (70% developer, 30% platform)
- Response:
{ok: true, download_url: str}— signed S3 URL for plugin package
- Request:
DELETE /api/v1/plugins/{id}/install:- Unregisters installation
8g — Auth endpoint
app/api/routes/auth.py:POST /api/v1/auth/register:{email, password}→ bcrypt hash → insert user → returnAuthTokensPOST /api/v1/auth/login: Validate credentials → returnAuthTokensPOST /api/v1/auth/refresh: Rotate refresh token → return newAuthTokensGET /api/v1/auth/me: ReturnUserProfilefor current JWT
8h — Billing endpoint
-
app/api/routes/billing.py:POST /api/v1/billing/checkout: Creates Stripe checkout session → returns URLPOST /api/v1/billing/webhook: Handles Stripe webhooks (subscription lifecycle)GET /api/v1/billing/subscription: Returns current subscription infoDELETE /api/v1/billing/subscription: Cancels subscription
-
Outcome: Complete REST + WebSocket API covering orchestration, storage, vectors, backup, marketplace.
Step 9 — Middleware
9a — Auth middleware
app/api/middleware/auth.py:- FastAPI dependency:
get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile - Validates JWT signature, expiry, extracts
user_idandtier - Raises
401on invalid/expired token - Exempt routes:
/api/v1/auth/register,/api/v1/auth/login,/api/v1/billing/webhook
- FastAPI dependency:
9b — Rate limiter
app/api/middleware/rate_limit.py:- Uses
slowapiwithLimiter(key_func=get_user_id_from_jwt) - Tier-based limits:
- Free: 20 req/min
- Pro: 60 req/min
- Power: 120 req/min
- Team: 200 req/seat/min
- Custom 429 response with
Retry-Afterheader
- Uses
9c — Sanitizer
-
app/api/middleware/sanitizer.py:- Response middleware that scans response bodies
- Strips: system prompt fragments, agent internal reasoning, tool schemas, routing metadata
- Pattern-based detection + exact match against known prompt fingerprints
- Logs sanitization events for monitoring
-
Outcome: Secure, rate-limited API with prompt IP protection.
Step 10 — Plugin Marketplace ✅
app/marketplace/plugin_registry.py:PluginRegistry:async list_plugins(category, query, page, sort) -> PluginListResponseasync get_plugin(plugin_id) -> PluginManifest | Noneasync submit_plugin(manifest: PluginManifest, package_s3_key: str) -> str— returns plugin_id, sets status = 'pending_review'async approve_plugin(plugin_id) -> None— admin only, sets status = 'approved'async reject_plugin(plugin_id, reason: str) -> None
app/marketplace/plugin_review.py:ReviewQueue:async get_pending() -> list[dict]async submit_review(plugin_id, reviewer_id, decision, notes) -> None
- Security checklist enforced before approval: manifest schema valid, permissions are from allowed set, no binary blobs in manifest
app/marketplace/revenue_share.py:RevenueShare:async record_install(plugin_id, user_id, amount_cents) -> Noneasync payout_developer(plugin_id, period) -> None— Stripe Connect transfer: 70% to developerasync get_earnings(developer_id, period) -> dict
- Outcome: Plugin marketplace with catalog, review workflow, and revenue split.
Step 11 — Billing & Tier management ✅
app/billing/stripe_service.py:create_checkout_session(user_id, tier) -> strhandle_webhook(payload, sig_header) -> None: processescheckout.session.completed,customer.subscription.updated,customer.subscription.deleted,invoice.payment_failedget_subscription(user_id) -> dict | Nonecancel_subscription(user_id) -> None
app/billing/tier_manager.py:TierManager:- Feature matrix:
FEATURES = { 'free': { 'agents': 3, 'batch_active': 2, 'cloud_storage_gb': 0, 'backup_gb': 0, 'providers': 1, 'batch_builder': False, 'plugin_marketplace': False, 'sso': False, }, 'pro': { 'agents': -1, # unlimited 'batch_active': 10, 'cloud_storage_gb': 5, 'backup_gb': 5, 'providers': -1, 'batch_builder': False, 'plugin_marketplace': False, 'sso': False, }, 'power': { 'agents': -1, 'batch_active': -1, # unlimited 'cloud_storage_gb': 25, 'backup_gb': 25, 'providers': -1, 'batch_builder': True, 'plugin_marketplace': True, 'sso': False, }, 'team': { 'agents': -1, 'batch_active': -1, 'cloud_storage_gb': -1, 'backup_gb': -1, 'providers': -1, 'batch_builder': True, 'plugin_marketplace': True, 'sso': True, }, } get_tier(user_id) -> BillingTiercheck_feature(user_id, feature) -> boolget_rate_limit(tier) -> intcheck_quota(user_id) -> bool— checks cloud_storage_gb current usage vs limit
- Feature matrix:
app/billing/__init__.py: exportsstripe_serviceandtier_managersingletonsapp/api/routes/billing.py: refactored to delegate toStripeServiceapp/api/routes/storage.pyandbackup.py:_check_quotanow delegates totier_manager.enforce_quota/enforce_backup_quota- Outcome: Stripe integration with tier-based feature gating matching Free/Pro(15€)/Power(29€)/Team(49€/seat).
Step 12 — Database (auth/billing/marketplace only)
- PostgreSQL schema via Alembic:
users:id UUID PK,email UNIQUE,password_hash,tier(default 'free'),stripe_customer_id,created_at,updated_atrefresh_tokens:id UUID PK,user_id FK,token_hash,expires_at,created_atsubscriptions:id UUID PK,user_id FK,stripe_subscription_id,tier,status,current_period_end,created_atbackup_metadata:id UUID PK,user_id FK,s3_key,version,timestamp,checksum,size_bytes,created_atstorage_records:id UUID PK,user_id FK,table_name VARCHAR,s3_key,checksum,size_bytes,created_at,updated_at— metadata only, no plaintextplugins:id UUID PK,name,description,version,author_id FK,category,status(pending_review/approved/rejected),price_cents,s3_package_key,install_count,avg_rating,created_atplugin_installations:id UUID PK,plugin_id FK,user_id FK,installed_atplugin_reviews:id UUID PK,plugin_id FK,reviewer_id FK,decision,notes,reviewed_atrevenue_events:id UUID PK,plugin_id FK,user_id FK,amount_cents,developer_share_cents,stripe_transfer_id,created_at
- Initial Alembic migration
- SQLAlchemy models in
app/models.py - Outcome: Auth, billing, storage metadata, and marketplace persistence. Zero user data in plaintext.
Step 13 — Testing & deployment ✅
tests/conftest.py: TestClient fixture, mock LLM fixture (AsyncMockreturning canned responses), mock agent fixture, test DB (SQLite in-memory for speed), mock S3 (moto), mock Pineconetests/test_orchestrator.py: classify_intent routing, single agent, pipeline, plan modetests/test_agents.py: each agent with mocked toolstests/test_auth.py: register → login → access protected → refresh → expired tokentests/test_backup.py: upload → download → history → delete, tier limit enforcementtests/test_storage.py: create record → list → download → update → delete, checksum rejection, quota enforcementtests/test_plugins.py: list plugins, install, uninstall, revenue event creation, tier gate (free user blocked)Dockerfileoptimized for production (gunicorn + uvicorn workers)- GitHub Actions CI: lint (ruff), test (pytest), build Docker image
- Outcome: Fully tested, deployable backend.
API Contract Summary
| Method | Endpoint | Auth | Request | Response |
|---|---|---|---|---|
| POST | /api/v1/auth/register |
No | {email, password} |
AuthTokens |
| POST | /api/v1/auth/login |
No | {email, password} |
AuthTokens |
| POST | /api/v1/auth/refresh |
No | {refresh_token} |
AuthTokens |
| GET | /api/v1/auth/me |
JWT | — | UserProfile |
| POST | /api/v1/chat |
JWT | ChatRequest |
ChatResponse | ExecutionPlan |
| WS | /api/v1/chat/stream |
JWT | ChatRequest (first frame) |
Token stream + final JSON |
| GET | /api/v1/plans/playbook |
JWT | — | ExecutionPlan[] |
| GET | /api/v1/plans/playbook/:id |
JWT | — | ExecutionPlan |
| POST | /api/v1/storage/records |
JWT | StorageRecordCreate |
{id, created_at} |
| GET | /api/v1/storage/records |
JWT | ?table&page&limit |
RecordMeta[] |
| GET | /api/v1/storage/records/:id |
JWT | — | Binary blob |
| PUT | /api/v1/storage/records/:id |
JWT | StorageRecordUpdate |
{ok: true} |
| DELETE | /api/v1/storage/records/:id |
JWT | — | {ok: true} |
| POST | /api/v1/storage/vectors/upsert |
JWT | VectorUpsertRequest |
{upserted: int} |
| POST | /api/v1/storage/vectors/search |
JWT | VectorSearchRequest |
VectorSearchResponse |
| DELETE | /api/v1/storage/vectors |
JWT | {ids: list[str]} |
{ok: true} |
| PUT | /api/v1/backup |
JWT | Binary blob + headers | {ok: true} |
| GET | /api/v1/backup |
JWT | — | Binary blob |
| GET | /api/v1/backup/history |
JWT | — | BackupMetadata[] |
| DELETE | /api/v1/backup/:id |
JWT | — | {ok: true} |
| GET | /api/v1/plugins |
JWT | ?category&q&page&sort |
PluginListResponse |
| GET | /api/v1/plugins/:id |
JWT | — | PluginManifest + stats |
| POST | /api/v1/plugins/:id/install |
JWT | PluginInstallRequest |
{ok, download_url} |
| DELETE | /api/v1/plugins/:id/install |
JWT | — | {ok: true} |
| POST | /api/v1/billing/checkout |
JWT | {tier} |
{checkout_url} |
| POST | /api/v1/billing/webhook |
Stripe sig | Stripe event | {ok: true} |
| GET | /api/v1/billing/subscription |
JWT | — | Subscription info |
| DELETE | /api/v1/billing/subscription |
JWT | — | {ok: true} |
| GET | /api/v1/health |
No | — | {status, version} |
| GET | /api/v1/agents/catalog |
JWT | — | AgentCatalogItem[] |
| GET | /api/v1/agents/local |
JWT | — | LocalAgentConfigResponse[] |
| POST | /api/v1/agents/local |
JWT | LocalAgentConfigCreate |
LocalAgentConfigResponse |
| PUT | /api/v1/agents/local/{id} |
JWT | LocalAgentConfigUpdate |
LocalAgentConfigResponse |
| DELETE | /api/v1/agents/local/{id} |
JWT | — | {ok: true} |
| GET | /api/v1/agents/cloud |
JWT | — | CloudAgentConfigResponse[] |
| POST | /api/v1/agents/cloud |
JWT | CloudAgentConfigCreate |
CloudAgentConfigResponse |
| PUT | /api/v1/agents/cloud/{id} |
JWT | CloudAgentConfigUpdate |
CloudAgentConfigResponse |
| DELETE | /api/v1/agents/cloud/{id} |
JWT | — | {ok: true} |
| GET | /api/v1/agents/runs |
JWT | ?agent_id&page&limit |
AgentRunLogResponse[] |
| POST | /api/v1/agents/{id}/run |
JWT | — | {ok: true, run_id} |
| POST | /api/v1/agents/journey/start |
JWT | {agent_type, data_types} |
{session_id, message, done} |
| POST | /api/v1/agents/journey/message |
JWT | {session_id, message} |
{session_id, message, done, prompt_template?} |
| GET | /api/v1/oauth/{provider}/authorize |
JWT | — | {authorization_url} |
| GET | /api/v1/oauth/{provider}/callback |
— | OAuth code | {encrypted_token} |
| WS | /api/v1/ws/device |
JWT | device_hello (first frame) |
Agent trigger + tool_call frames |
| GET | /api/v1/memory/core |
JWT | — | Core memory entries |
| GET | /api/v1/memory/associative |
JWT | — | Associative memories |
| GET | /api/v1/memory/episodic |
JWT | — | Episodic summaries |
| GET | /api/v1/memory/proactive |
JWT | — | Proactive patterns |
| DELETE | /api/v1/memory/{type}/{id} |
JWT | — | {ok: true} |
| POST | /api/v1/oauth/{provider}/refresh |
JWT | — | {encrypted_token} |
Stack
| Layer | Technology |
|---|---|
| Framework | FastAPI + Uvicorn |
| LLM | LangChain + langchain-openai |
| Auth | PyJWT + bcrypt + OAuth2 |
| Billing | stripe-python + Stripe Connect |
| Blob storage | boto3 (S3) |
| Vector store | Pinecone or Qdrant (configurable) |
| Database | PostgreSQL + SQLAlchemy + Alembic |
| Rate limiting | slowapi |
| Cloud integrations | google-api-python-client, msgraph-sdk, msal |
| Agent scheduling | APScheduler |
| Testing | pytest + pytest-asyncio + httpx + moto (S3 mock) |
| Deployment | Docker → fly.io / Railway / AWS ECS |
Phase 3 — New Files
| File | Purpose |
|---|---|
app/models.py |
Add LocalAgentConfig, CloudAgentConfig, AgentRunLog models |
app/schemas.py |
Add agent config schemas + WS agent frame types |
app/api/routes/agents.py |
Agent CRUD endpoints (catalog, local, cloud, runs, manual trigger) |
app/api/routes/agent_setup.py |
Chatbot Journey endpoints (start + message) |
app/api/routes/device_ws.py |
Persistent device WS endpoint (/api/v1/ws/device) |
app/api/routes/oauth.py |
OAuth authorize/callback for Gmail, Teams, Outlook |
app/core/agent_runner.py |
Agent run orchestration — local (WS file request) + cloud (API fetch) |
app/core/device_manager.py |
DeviceConnectionManager — tracks active Electron WS connections |
app/core/agent_scheduler.py |
Periodic scheduler for agent cron triggers |
app/integrations/gmail.py |
Gmail API client (fetch messages with filters) |
app/integrations/ms_graph.py |
MS Graph client for Outlook emails + Teams messages |
app/integrations/__init__.py |
Provider factory |
Full Phase 3 step-by-step plan: See
AI_REFACTOR_PLAN.mdPhase 3 section.
Architecture v2 — Integration Phases
Reference:
architecture-v2.md— Local-first topology, BYOK LLM keys, MemGPT-style memory middleware, Popup scoping with navigation directives, Batch Agent.These phases build on top of the completed Steps 1–13 and Phase 3 (3.1–3.6). Phase 5 from
AI_REFACTOR_PLAN.md(on-device KV memory) is superseded by cloud-side memory middleware below.
Step 14 — Fix chat WS for bidirectional tool calls (V2.0.1)
Blocker:
chat_stream()never callsset_client_executor()— all 23 agent tools fail during chat WS sessions.
- Rewrite
app/api/routes/chat.pychat_stream():pending_calls: dict[str, asyncio.Future]for tool-call round-trips- Concurrent receive loop (dispatches
tool_result→ resolves futures) + orchestration task set_client_executor()before orchestrating,clear_client_executor()in finally- Parse first frame as
{"type": "chat_request", ...} - Send
{"type": "text_chunk", "text": "..."}+{"type": "final", "response": "..."} - Heartbeat ping every 30s, 30s timeout on tool_result
- Tests: verify all 23 tools work over chat WS
- Files:
app/api/routes/chat.py,app/core/orchestrator.py - Outcome: Full bidirectional chat WS. All agent tools now work over
/chat/stream.
Step 15 — Agent scheduler + OAuth endpoints (V2.0.2)
app/core/agent_scheduler.py: APScheduler, 60s check loop, PostgreSQL advisory locks for multi-instanceapp/api/routes/oauth.py:GET /oauth/{provider}/authorize,GET /oauth/{provider}/callback,POST /oauth/{provider}/refresh- Gmail:
gmail.readonlyscope - Outlook/Teams:
Mail.Read,ChannelMessage.Read.Allscopes - Encrypts tokens with Fernet, returns encrypted blob for
CloudAgentConfig.oauth_token_encrypted
- Gmail:
- Integrate scheduler with FastAPI lifespan (start on startup, shutdown gracefully)
- Dependencies:
apscheduler>=4.0 - Files:
app/core/agent_scheduler.py(new),app/api/routes/oauth.py(new),app/main.py - Outcome: Agents run on cron schedules. OAuth flow for Gmail/Teams/Outlook.
Step 16 — BYOK: API key passthrough in LLM factory (V2.1.1)
- Add
api_key: str | None = Noneparam toget_llm(),get_router_llm(),embed() - When provided, use BYOK key instead of
_api_key_for_model()server fallback - Add Cerebras support:
_api_key_for_model()handlescerebras/prefix - Key is never persisted, never logged
- Files:
app/core/llm.py,app/config/settings.py - Outcome: LLM factory accepts per-request API keys with server-side fallback.
Step 17 — BYOK: Thread key through request lifecycle (V2.1.2)
ContextVar:_request_api_key: ContextVar[str | None]inapp/core/llm.pyget_llm()reads from ContextVar when no explicitapi_keyparamChatRequestschema: addapi_key: str | None = None- WS handlers set ContextVar from incoming frame's
api_keyfield - Fallback: if no BYOK key → server-side key (backward compat + Batch Agent)
- Files:
app/schemas.py,app/core/llm.py,app/api/routes/chat.py,app/core/orchestrator.py - Outcome: BYOK key flows from request → orchestrator → agent → LLM. Never stored.
Step 18 — pgvector + memory DB tables (V2.2.1)
- Add
pgvectortorequirements.txt - New SQLAlchemy models in
app/models.py:CoreMemory: id, user_id, key, value, created_at, updated_atAssociativeEmbedding: id, user_id, entity_type, entity_id, label, embedding (pgvector Vector), metadata_json, created_atEpisodicSummary: id, user_id, session_id, summary, key_entities, created_atProactivePattern: id, user_id, pattern_type, description, confidence, last_detected_at, created_at
- Alembic migration with
CREATE EXTENSION IF NOT EXISTS vector - Files:
app/models.py,requirements.txt,alembic/versions/ - Outcome: Memory tables in PostgreSQL with pgvector support.
Step 19 — Memory service layer (V2.2.2)
- Create
app/core/memory.py—MemoryServiceclass:load_core_memory(user_id),write_core_memory(user_id, key, value)(upsert)search_associative(user_id, query_embedding, top_k=5)(pgvector similarity)write_associative(user_id, entity_type, entity_id, label, embedding, metadata)get_recent_episodic(user_id, limit=3),write_episodic(user_id, session_id, summary, key_entities)get_proactive_patterns(user_id),write_proactive_pattern(user_id, ...)delete_memory(user_id, memory_type, memory_id)— user review/delete
- Uses async SQLAlchemy sessions from
app/db.py - Files:
app/core/memory.py(new) - Outcome: Complete CRUD + similarity search for all 4 memory types.
Step 20 — Memory middleware wrapper (V2.2.3)
- Create
app/core/memory_middleware.py:enrich_with_memory(user_id, message, context):- Load core memory (always injected)
- Embed user message → pgvector similarity search on associative memory
- Load recent episodic summaries
- Load proactive patterns
- Return enriched context
post_process_memory(user_id, message, response, context):- LLM decides what to remember (semi-autonomous)
- Write core memory for preferences
- Write associative for entity relationships
- Compress session into episodic summary when conversation ends
- Files:
app/core/memory_middleware.py(new) - Outcome: Memory wraps every orchestrator call — enrich before, learn after.
Step 21 — Integrate memory into orchestrator (V2.2.4)
- Modify
orchestrate()/orchestrate_stream():- Before
classify_intent: callenrich_with_memory() - After agent response: call
post_process_memory()
- Before
- Add
memory_writetool to Router's system prompt - Files:
app/core/orchestrator.py - Outcome: All chat interactions are memory-enriched. Router can explicitly write memories.
Step 22 — Memory management API (V2.2.5)
- Create
app/api/routes/memory.py:GET /api/v1/memory/core— list core memoriesGET /api/v1/memory/associative— list associative memoriesGET /api/v1/memory/episodic— list episodic summariesGET /api/v1/memory/proactive— list proactive patternsDELETE /api/v1/memory/{type}/{id}— user deletes a memory
- Register router in
app/main.py - Files:
app/api/routes/memory.py(new),app/main.py - Outcome: Users can review and delete their memories (semi-autonomous model).
Step 23 — Scope context + structured response (V2.3.1)
- Update
ChatRequest: addsource: Literal["home", "popup"] = "home",scope: dict | None = None - New response schemas in
app/schemas.py:AiResponse: response (text + ui_directive + data), navigation, mutations, contextNavigationDirective: action, target, filterMutationCommand: action, dataResponseContext: scope_changed, new_scope
- Used when
source == "popup"or navigation needed;ChatResponsekept for backward compat - Files:
app/schemas.py - Outcome: Popup can receive navigation directives and scoped responses.
Step 24 — Enhanced Router capabilities (V2.3.2)
- Update orchestrator system prompt + tool set:
ask_user_clarification— return clarification question, WS handler waits for next user messagerender_ui_directive— specify UI rendering (task_card, chart, diagram)cross_entity_resolve— include navigation directive when scope crosses entities
- Files:
app/core/orchestrator.py - Outcome: Router can clarify, render rich UI, and navigate across entities.
Step 25 — WS protocol evolution (V2.3.3)
- Add to
WsFrameType:user_request,data_request,data_response,ai_response,mutation user_request= enhancedchat_requestwith source, scope, api_keyai_response= structured response with navigation + mutations + context- Server auto-detects client frame format for backward compat
- Files:
app/schemas.py,app/api/routes/chat.py - Outcome: v2 WS protocol with full backward compatibility.
Step 26 — Batch agent implementation (V2.4.1)
- Create
app/agents/batch_agent.py— background agent (notChatAgent):pattern_detection: analyze episodic summaries for recurring patternsmemory_consolidation: merge redundant episodic summariessuggestion_generation: create proactive pattern entriesoverdue_detection: request task data from Electron via device WS
- Uses server-side LLM key (not BYOK — runs without user request)
- Requires device online for entity data access
- Files:
app/agents/batch_agent.py(new) - Outcome: Background agent that learns patterns and generates proactive suggestions.
Step 27 — Batch agent scheduling + proactive surfacing (V2.4.2)
- Integrate with agent scheduler from Step 15
- Default: every 6h per user, only when device online
- Proactive patterns surfaced via memory middleware in Router context
- Files:
app/core/agent_scheduler.py,app/core/memory_middleware.py - Outcome: Batch runs automatically. Suggestions appear in chat responses.
Step 28 — E2E memory encryption + tests (V2.5)
- Application-level Fernet encryption for all memory table writes
- Encryption key derived from user passphrase, sent with requests
tests/test_byok.py: key threading, Cerebras model string, fallbacktests/test_memory.py: all 4 memory types, pgvector search, middlewaretests/test_popup.py: scope, navigation directives, cross-entitytests/test_batch_agent.py: pattern detection, consolidation- Files:
app/core/memory.py,app/storage/encryption.py,tests/ - Outcome: Fully tested, encrypted memory system.
Development Rules
- NEVER persist user data in plaintext. The DB stores only auth, billing, storage metadata, and marketplace data. User context arrives in requests and is discarded. Cloud blobs are E2E encrypted client-side — backend only stores opaque bytes.
- NEVER expose prompts. System prompts are composed server-side from fragments. Responses are sanitized before sending. In plan mode,
prompt_templatefields are reference IDs only. - NEVER decrypt user blobs.
app/storage/encryption.pyonly verifies checksums. No decryption key ever reaches the backend. - Stateless request handling. No server-side session state. All context comes from the client + JWT.
- Type hints everywhere. All functions have full type annotations.
- Test every agent. Each chat agent has unit tests with mocked LLM responses.
- Structured logging. JSON logs with request ID correlation.
- Tier gates are enforced server-side. Never trust client-reported tier. Always fetch from DB via
TierManager.get_tier(user_id). - One step at a time. Implement one numbered step per session. When the step is fully done, mark all its checkboxes as
[x]in this file and commit with messagestep N complete: <outcome line>.