Files
adiuva-api/BACKEND_PLAN.md
2026-03-03 15:14:04 +01:00

30 KiB

Backend Plan — Adiuva Cloud API

Separate repository. This document defines the FastAPI backend that the Electron app communicates with.

The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, E2E backup blob storage, cloud storage (encrypted blobs), cloud vector store, and plugin marketplace. The backend NEVER persists user data in plaintext. Cloud storage blobs are E2E encrypted before upload — the backend only verifies integrity, never decrypts.


Project Structure

adiuva-api/
├── app/
│   ├── __init__.py
│   ├── main.py                    # FastAPI entry + CORS + lifespan + router includes
│   ├── core/
│   │   ├── __init__.py
│   │   ├── agent_registry.py      # Base classes + singleton registry
│   │   ├── orchestrator.py        # LLM-based intent router
│   │   ├── execution_plan.py      # Plan builder + cache
│   │   └── plugin_loader.py       # Dynamic agent loading
│   ├── agents/                    # Chat agents (proprietary logic + prompts)
│   │   ├── __init__.py            # Auto-registers all agents
│   │   ├── task_agent.py
│   │   ├── calendar_agent.py
│   │   ├── email_agent.py
│   │   └── analytics_agent.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── chat.py            # POST /chat + WS /chat/stream
│   │   │   ├── plans.py           # GET /plans/playbook
│   │   │   ├── storage.py         # CRUD cloud storage (E2E encrypted blobs)
│   │   │   ├── vectors.py         # Upsert/search cloud vector store
│   │   │   ├── backup.py          # PUT/GET /backup
│   │   │   ├── plugins.py         # Plugin marketplace
│   │   │   ├── auth.py            # Register/login/refresh
│   │   │   └── billing.py         # Checkout/webhook/subscription
│   │   └── middleware/
│   │       ├── __init__.py
│   │       ├── auth.py            # JWT validation
│   │       ├── rate_limit.py      # Tier-aware rate limiting
│   │       └── sanitizer.py       # Strip prompt metadata from responses
│   ├── storage/
│   │   ├── __init__.py
│   │   ├── blob_store.py          # S3 for E2E encrypted blobs
│   │   ├── vector_store.py        # Cloud vector store (Pinecone/Qdrant)
│   │   └── encryption.py          # Integrity verification only — NO decryption
│   ├── marketplace/
│   │   ├── __init__.py
│   │   ├── plugin_registry.py     # Plugin catalog (metadata, versions, ratings)
│   │   ├── plugin_review.py       # Review queue + approval workflow
│   │   └── revenue_share.py       # 70/30 split tracking with Stripe Connect
│   ├── billing/
│   │   ├── __init__.py
│   │   ├── stripe_service.py      # Stripe checkout + webhooks
│   │   └── tier_manager.py        # Feature matrix per tier
│   └── config/
│       ├── __init__.py
│       └── settings.py            # Pydantic BaseSettings (env-based)
├── tests/
│   ├── __init__.py
│   ├── conftest.py                # Fixtures: test client, mock agents, mock LLM
│   ├── test_orchestrator.py
│   ├── test_agents.py
│   ├── test_auth.py
│   ├── test_backup.py
│   ├── test_storage.py
│   └── test_plugins.py
├── alembic/                       # DB migrations (auth/billing/marketplace tables only)
│   ├── alembic.ini
│   └── versions/
├── requirements.txt
├── Dockerfile
├── docker-compose.yml             # App + PostgreSQL + Redis (dev)
├── .env.example
└── README.md

Step-by-Step Implementation

Step 1 — Project scaffolding

  • Initialize repo with the directory structure above
  • Write requirements.txt:
    fastapi>=0.115.0
    uvicorn[standard]>=0.34.0
    langchain>=0.3.0
    langchain-openai>=0.3.0
    pydantic>=2.10.0
    python-jose[cryptography]>=3.3.0
    stripe>=11.0.0
    boto3>=1.35.0
    slowapi>=0.1.9
    sqlalchemy>=2.0.0
    asyncpg>=0.30.0
    alembic>=1.14.0
    bcrypt>=4.2.0
    python-dotenv>=1.0.0
    httpx>=0.28.0
    websockets>=14.0
    pytest>=8.0.0
    pytest-asyncio>=0.24.0
    
  • Write app/main.py: FastAPI app with CORS (allow app://, http://localhost:*), lifespan (init DB pool, init agent registry), include all routers under /api/v1
  • Write app/config/settings.py: Settings(BaseSettings) with fields: DATABASE_URL, JWT_SECRET, JWT_ALGORITHM (default HS256), STRIPE_SECRET_KEY, STRIPE_WEBHOOK_SECRET, S3_BUCKET, S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, OPENAI_API_KEY, CORS_ORIGINS, ENV (dev/prod), PINECONE_API_KEY, PINECONE_INDEX, QDRANT_URL, QDRANT_API_KEY
  • Write Dockerfile: Python 3.12 slim, multi-stage (builder + runtime), non-root user
  • Write docker-compose.yml: app, postgres:16, optional redis
  • Write .env.example
  • Outcome: Runnable FastAPI skeleton (returns 404 on all routes).

Step 2 — Pydantic schemas (API contracts)

  • Create app/schemas.py (mirrors src/shared/api-types.ts from Electron repo):
    • ChatRequest: message: str, context: ChatContext, execution_mode: Literal['direct', 'plan']
    • ChatContext: user_profile: dict, relevant_documents: list[str], recent_tasks: list[dict], conversation_history: list[dict]
    • ChatResponse: response: str, actions: list[PlanAction]
    • PlanAction: type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification', 'call_agent'], table: str | None, data: dict | None, agent: str | None
    • ExecutionPlan: agent: str, steps: list[PlanStep]
    • PlanStep: action: str, prompt_template: str | None, variables: dict | None, data_from_step: int | None
    • BackupMetadata: version: int, timestamp: int, checksum: str, chunk_count: int
    • BillingTier: Literal['free', 'pro', 'power', 'team']
    • AuthTokens: access_token: str, refresh_token: str, expires_at: int
    • UserProfile: id: str, email: str, tier: BillingTier
    • StorageRecord: id: str, user_id: str, table: str, blob: bytes, checksum: str, created_at: int, updated_at: int — blob is always E2E encrypted by client
    • StorageRecordCreate: table: str, blob: bytes, checksum: str
    • StorageRecordUpdate: blob: bytes, checksum: str
    • VectorUpsertRequest: vectors: list[VectorItem]
    • VectorItem: id: str, blob: bytes, checksum: str — vector + metadata encrypted by client
    • VectorSearchRequest: query_blob: bytes, top_k: int = 10
    • VectorSearchResponse: results: list[VectorSearchResult]
    • VectorSearchResult: id: str, score: float, blob: bytes
    • PluginManifest: id: str, name: str, description: str, version: str, author: str, permissions: list[str], category: str, price_cents: int = 0
    • PluginListResponse: plugins: list[PluginManifest], total: int, page: int
    • PluginInstallRequest: plugin_id: str
  • Outcome: All request/response models defined and validated.

Step 3 — Agent Registry + base classes

  • app/core/agent_registry.py:
    • BaseAgent(ABC):
      • user_id: str, shared_memory: dict, vector_store_context: list[str], skills: list[str]
      • Abstract get_name() -> str, get_description() -> str
    • ChatAgent(BaseAgent):
      • Abstract async handle(query: str, context: dict) -> str
      • Abstract get_tools() -> list (LangChain tool definitions)
      • Concrete _tool_loop(llm, messages, tools, max_iter=5) -> str — shared tool-calling loop
    • AgentRegistry (singleton):
      • _agents: dict[str, ChatAgent]
      • register(agent_class) — decorator pattern
      • get(name) -> ChatAgent
      • list_agents() -> list[dict] — returns [{name, description}] for orchestrator prompt
      • async call_agent(name, query, context) -> str — for inter-agent calls
  • Unit tests: register, get, list, call_agent with mock
  • Outcome: Pluggable agent framework.

Step 4 — Orchestrator

  • app/core/orchestrator.py:
    • async classify_intent(message, context, registry) -> str:
      • System prompt: "You are an intent classifier. Given the user message and context, decide which agent to route to. Available agents: {registry.list_agents()}. Respond with just the agent name."
      • Uses gpt-4o-mini via LangChain for low latency
      • Falls back to task_agent if no clear match
    • async route_single(agent_name, message, context) -> ChatResponse:
      • Instantiates agent from registry
      • Calls agent.handle(message, context)
      • Returns response + any actions the agent produced
    • async route_pipeline(agent_names, message, context) -> ChatResponse:
      • Executes agents in sequence
      • Each agent receives {...context, previous_results: [...]}
      • Final synthesis via LLM: "Summarize these agent results into a coherent response"
    • async orchestrate(request: ChatRequest) -> ChatResponse | ExecutionPlan:
      • Main entry point
      • Context is transparent to orchestrator — data may originate from local or cloud storage on the client side
      • Classifies intent
      • If execution_mode == 'direct': route + return response
      • If execution_mode == 'plan': route + return execution plan with template IDs
    • async orchestrate_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
      • Same as orchestrate but yields tokens for WebSocket streaming
  • Integration tests with mocked LLM and mocked agents
  • Outcome: Intelligent routing with single-agent and pipeline modes.

Step 5 — Execution Plan generator

  • app/core/execution_plan.py:
    • PromptTemplateRegistry: dict of template_id -> prompt_text. Templates are server-side only — client receives IDs.
    • ExecutionPlanBuilder:
      • add_step(action, params) -> self
      • add_llm_step(template_id, variables) -> self
      • add_data_step(action, data_from_step) -> self
      • build() -> ExecutionPlan — validates step references
    • PlanCache:
      • In-memory LRU (maxsize=1000)
      • cache_plan(key, plan), get_plan(key), get_all_playbooks() -> list[ExecutionPlan]
      • Playbooks are pre-built plans for common operations (e.g., "create task from email", "generate weekly report")
  • Outcome: Plans are cacheable as playbooks. Prompt IP never leaves the server.

Step 6 — Chat Agents

  • app/agents/task_agent.py@registry.register:
    • Description: "Manages tasks and comments: list, create, update, delete, due-today, comments"
    • Tools (8): list_tasks(project_id, status, search, order_by), create_task(title, description, status, priority, assignees, due_date, project_id, is_ai_suggested, is_approved), update_task(task_id, ...), delete_task(task_id), list_tasks_due_today(), list_task_comments(task_id), add_task_comment(task_id, author, content), delete_task_comment(comment_id)
    • status: todo|in_progress|done; priority: high|medium|low; assignees: JSON-encoded string; due_date: ms timestamp
    • Accepts flexible context; sentinel -1 for optional integer update fields
  • app/agents/checkpoint_agent.py@registry.register:
    • Description: "Manages project checkpoints (milestones): list, create, update, delete"
    • Tools (4): list_checkpoints(project_id), create_checkpoint(project_id, title, date, is_ai_suggested, is_approved), update_checkpoint(checkpoint_id, ...), delete_checkpoint(checkpoint_id)
    • project_id is required for create; date is a ms timestamp; supports AI-suggestion + approval workflow
  • app/agents/project_agent.py@registry.register:
    • Description: "Manages projects: list, get, create, update, archive, delete"
    • Tools (6): list_projects(client_id, include_archived), list_all_projects(), get_project(project_id), create_project(name, client_id), update_project(project_id, ...), delete_project(project_id)
    • status: active|archived; prefers archive over deletion (docstring guard on delete)
  • app/agents/note_agent.py@registry.register:
    • Description: "Manages notes: list, get, create, update, delete"
    • Tools (5): list_notes(project_id), get_note(note_id), create_note(title, content, project_id), update_note(note_id, ...), delete_note(note_id)
    • content is Markdown; get_note should be called before update to preserve existing content
  • app/agents/__init__.py: imports all four agent modules to trigger @registry.register decorators
  • Unit tests per agent with mocked LLM (registration, names, tool counts, handle(), direct tool invocation)
  • Outcome: Four domain-specific agents matching the UI data model (Tasks, Checkpoints, Projects, Notes), all registered and tested.

Step 7 — Storage Layer

  • app/storage/blob_store.py:
    • BlobStore: async upload, async download, async delete (idempotent), async list_keys
    • Keys: {user_id}/{table}/{record_id} — backend never inspects blob content
    • boto3 S3 with SSE-S3 at-rest encryption; client checksum stored in S3 object metadata
  • app/storage/vector_store.py:
    • VectorStore: async upsert, async search, async delete
    • Pinecone (default, namespace=user_id) or Qdrant (user_id payload filter) — runtime-configurable
    • 32-dim SHA-256-derived float vector; blob stored as base64 in metadata/payload
    • ANN on encrypted data: known accuracy trade-off, documented
  • app/storage/encryption.py:
    • verify_checksum(blob, checksum) -> bool — SHA-256 + hmac.compare_digest (constant-time)
    • reject_if_tampered(blob, checksum) — raises HTTP 400 on mismatch
    • Backend NEVER holds decryption keys
  • app/schemas.py: added StorageRecord*, VectorItem, VectorUpsertRequest, VectorSearch*, Plugin* schemas
  • app/config/settings.py: added PINECONE_API_KEY, PINECONE_INDEX, QDRANT_URL, QDRANT_API_KEY
  • requirements.txt: added moto[s3], pinecone, qdrant-client
  • 37 unit tests covering encryption, BlobStore (moto), VectorStore Pinecone, VectorStore Qdrant
  • Outcome: Cloud storage layer that handles E2E encrypted blobs without ever accessing plaintext.

Step 8 — API Routes

8a — Chat endpoint

  • app/api/routes/chat.py:
    • POST /api/v1/chat:
      • Request: ChatRequest
      • Calls orchestrate(request) or orchestrate() + build_plan()
      • Response: ChatResponse or ExecutionPlan
    • WebSocket /api/v1/chat/stream:
      • Client sends ChatRequest as first JSON frame
      • Server yields token strings via orchestrate_stream()
      • Final frame: JSON ChatResponse with {"done": true, "response": "...", "actions": [...]}
      • Heartbeat ping every 30s to keep connection alive

8b — Plans endpoint

  • app/api/routes/plans.py:
    • GET /api/v1/plans/playbook: Returns all playbooks available for the user's tier
    • GET /api/v1/plans/playbook/{plan_id}: Returns a specific plan

8c — Storage endpoint (cloud records)

  • app/api/routes/storage.py:
    • POST /api/v1/storage/records: Create encrypted record
      • Request: StorageRecordCreate
      • Verifies checksum, stores blob in S3, inserts metadata row in PostgreSQL
      • Response: {id: str, created_at: int}
    • GET /api/v1/storage/records: List record metadata (no blobs)
      • Query params: table: str, page: int, limit: int
      • Response: list[{id, table, checksum, created_at, updated_at}]
    • GET /api/v1/storage/records/{id}: Download encrypted blob
      • Response: blob bytes + X-Checksum header
    • PUT /api/v1/storage/records/{id}: Update encrypted blob
      • Request: StorageRecordUpdate
    • DELETE /api/v1/storage/records/{id}: Delete record + S3 blob
    • All routes enforce tier cloud_storage_gb quota via TierManager.check_quota(user_id)

8d — Vectors endpoint (cloud vector store)

  • app/api/routes/vectors.py:
    • POST /api/v1/storage/vectors/upsert:
      • Request: VectorUpsertRequest
      • Verifies checksums, delegates to VectorStore.upsert()
      • Response: {upserted: int}
    • POST /api/v1/storage/vectors/search:
      • Request: VectorSearchRequest
      • Delegates to VectorStore.search()
      • Response: VectorSearchResponse
    • DELETE /api/v1/storage/vectors:
      • Request: {ids: list[str]}

8e — Backup endpoint

  • app/api/routes/backup.py:
    • PUT /api/v1/backup: Accepts binary blob + metadata headers (X-Backup-Version, X-Backup-Timestamp, X-Backup-Checksum). Stores in S3 keyed by {user_id}/{timestamp}. Enforces tier limits:
      • Free: 0 (no backup)
      • Pro: 5 GB
      • Power: 25 GB
      • Team: unlimited
    • GET /api/v1/backup: Returns latest blob for authenticated user. Supports If-Modified-Since.
    • GET /api/v1/backup/history: Returns list of BackupMetadata (no blobs).
    • DELETE /api/v1/backup/{backup_id}: Delete specific backup.

8f — Plugins endpoint

  • app/api/routes/plugins.py:
    • GET /api/v1/plugins:
      • Query params: category: str | None, q: str | None, page: int, sort: Literal['rating', 'installs', 'newest']
      • Response: PluginListResponse
      • Available from Power tier and above
    • GET /api/v1/plugins/{id}:
      • Response: PluginManifest + ratings + install count
    • POST /api/v1/plugins/{id}/install:
      • Request: PluginInstallRequest
      • Records installation for the user (billing tracking, analytics)
      • If plugin is paid: triggers Stripe Connect charge + revenue split (70% developer, 30% platform)
      • Response: {ok: true, download_url: str} — signed S3 URL for plugin package
    • DELETE /api/v1/plugins/{id}/install:
      • Unregisters installation

8g — Auth endpoint

  • app/api/routes/auth.py:
    • POST /api/v1/auth/register: {email, password} → bcrypt hash → insert user → return AuthTokens
    • POST /api/v1/auth/login: Validate credentials → return AuthTokens
    • POST /api/v1/auth/refresh: Rotate refresh token → return new AuthTokens
    • GET /api/v1/auth/me: Return UserProfile for current JWT

8h — Billing endpoint

  • app/api/routes/billing.py:

    • POST /api/v1/billing/checkout: Creates Stripe checkout session → returns URL
    • POST /api/v1/billing/webhook: Handles Stripe webhooks (subscription lifecycle)
    • GET /api/v1/billing/subscription: Returns current subscription info
    • DELETE /api/v1/billing/subscription: Cancels subscription
  • Outcome: Complete REST + WebSocket API covering orchestration, storage, vectors, backup, marketplace.

Step 9 — Middleware

9a — Auth middleware

  • app/api/middleware/auth.py:
    • FastAPI dependency: get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile
    • Validates JWT signature, expiry, extracts user_id and tier
    • Raises 401 on invalid/expired token
    • Exempt routes: /api/v1/auth/register, /api/v1/auth/login, /api/v1/billing/webhook

9b — Rate limiter

  • app/api/middleware/rate_limit.py:
    • Uses slowapi with Limiter(key_func=get_user_id_from_jwt)
    • Tier-based limits:
      • Free: 20 req/min
      • Pro: 60 req/min
      • Power: 120 req/min
      • Team: 200 req/seat/min
    • Custom 429 response with Retry-After header

9c — Sanitizer

  • app/api/middleware/sanitizer.py:

    • Response middleware that scans response bodies
    • Strips: system prompt fragments, agent internal reasoning, tool schemas, routing metadata
    • Pattern-based detection + exact match against known prompt fingerprints
    • Logs sanitization events for monitoring
  • Outcome: Secure, rate-limited API with prompt IP protection.

Step 10 — Plugin Marketplace

  • app/marketplace/plugin_registry.py:
    • PluginRegistry:
      • async list_plugins(category, query, page, sort) -> PluginListResponse
      • async get_plugin(plugin_id) -> PluginManifest | None
      • async submit_plugin(manifest: PluginManifest, package_s3_key: str) -> str — returns plugin_id, sets status = 'pending_review'
      • async approve_plugin(plugin_id) -> None — admin only, sets status = 'approved'
      • async reject_plugin(plugin_id, reason: str) -> None
  • app/marketplace/plugin_review.py:
    • ReviewQueue:
      • async get_pending() -> list[dict]
      • async submit_review(plugin_id, reviewer_id, decision, notes) -> None
    • Security checklist enforced before approval: manifest schema valid, permissions are from allowed set, no binary blobs in manifest
  • app/marketplace/revenue_share.py:
    • RevenueShare:
      • async record_install(plugin_id, user_id, amount_cents) -> None
      • async payout_developer(plugin_id, period) -> None — Stripe Connect transfer: 70% to developer
      • async get_earnings(developer_id, period) -> dict
  • Outcome: Plugin marketplace with catalog, review workflow, and revenue split.

Step 11 — Billing & Tier management

  • app/billing/stripe_service.py:
    • create_checkout_session(user_id, tier) -> str
    • handle_webhook(payload, sig_header) -> None: processes checkout.session.completed, customer.subscription.updated, customer.subscription.deleted, invoice.payment_failed
    • get_subscription(user_id) -> dict | None
    • cancel_subscription(user_id) -> None
  • app/billing/tier_manager.py:
    • TierManager:
      • Feature matrix:
        FEATURES = {
            'free':  {
                'agents': 3,
                'batch_active': 2,
                'cloud_storage_gb': 0,
                'backup_gb': 0,
                'providers': 1,
                'batch_builder': False,
                'plugin_marketplace': False,
                'sso': False,
            },
            'pro':   {
                'agents': -1,          # unlimited
                'batch_active': 10,
                'cloud_storage_gb': 5,
                'backup_gb': 5,
                'providers': -1,
                'batch_builder': False,
                'plugin_marketplace': False,
                'sso': False,
            },
            'power': {
                'agents': -1,
                'batch_active': -1,    # unlimited
                'cloud_storage_gb': 25,
                'backup_gb': 25,
                'providers': -1,
                'batch_builder': True,
                'plugin_marketplace': True,
                'sso': False,
            },
            'team':  {
                'agents': -1,
                'batch_active': -1,
                'cloud_storage_gb': -1,
                'backup_gb': -1,
                'providers': -1,
                'batch_builder': True,
                'plugin_marketplace': True,
                'sso': True,
            },
        }
        
      • get_tier(user_id) -> BillingTier
      • check_feature(user_id, feature) -> bool
      • get_rate_limit(tier) -> int
      • check_quota(user_id) -> bool — checks cloud_storage_gb current usage vs limit
  • app/billing/__init__.py: exports stripe_service and tier_manager singletons
  • app/api/routes/billing.py: refactored to delegate to StripeService
  • app/api/routes/storage.py and backup.py: _check_quota now delegates to tier_manager.enforce_quota / enforce_backup_quota
  • Outcome: Stripe integration with tier-based feature gating matching Free/Pro(15€)/Power(29€)/Team(49€/seat).

Step 12 — Database (auth/billing/marketplace only)

  • PostgreSQL schema via Alembic:
    • users: id UUID PK, email UNIQUE, password_hash, tier (default 'free'), stripe_customer_id, created_at, updated_at
    • refresh_tokens: id UUID PK, user_id FK, token_hash, expires_at, created_at
    • subscriptions: id UUID PK, user_id FK, stripe_subscription_id, tier, status, current_period_end, created_at
    • backup_metadata: id UUID PK, user_id FK, s3_key, version, timestamp, checksum, size_bytes, created_at
    • storage_records: id UUID PK, user_id FK, table_name VARCHAR, s3_key, checksum, size_bytes, created_at, updated_at — metadata only, no plaintext
    • plugins: id UUID PK, name, description, version, author_id FK, category, status (pending_review/approved/rejected), price_cents, s3_package_key, install_count, avg_rating, created_at
    • plugin_installations: id UUID PK, plugin_id FK, user_id FK, installed_at
    • plugin_reviews: id UUID PK, plugin_id FK, reviewer_id FK, decision, notes, reviewed_at
    • revenue_events: id UUID PK, plugin_id FK, user_id FK, amount_cents, developer_share_cents, stripe_transfer_id, created_at
  • Initial Alembic migration
  • SQLAlchemy models in app/models.py
  • Outcome: Auth, billing, storage metadata, and marketplace persistence. Zero user data in plaintext.

Step 13 — Testing & deployment

  • tests/conftest.py: TestClient fixture, mock LLM fixture (AsyncMock returning canned responses), mock agent fixture, test DB (SQLite in-memory for speed), mock S3 (moto), mock Pinecone
  • tests/test_orchestrator.py: classify_intent routing, single agent, pipeline, plan mode
  • tests/test_agents.py: each agent with mocked tools
  • tests/test_auth.py: register → login → access protected → refresh → expired token
  • tests/test_backup.py: upload → download → history → delete, tier limit enforcement
  • tests/test_storage.py: create record → list → download → update → delete, checksum rejection, quota enforcement
  • tests/test_plugins.py: list plugins, install, uninstall, revenue event creation, tier gate (free user blocked)
  • Dockerfile optimized for production (gunicorn + uvicorn workers)
  • GitHub Actions CI: lint (ruff), test (pytest), build Docker image
  • Outcome: Fully tested, deployable backend.

API Contract Summary

Method Endpoint Auth Request Response
POST /api/v1/auth/register No {email, password} AuthTokens
POST /api/v1/auth/login No {email, password} AuthTokens
POST /api/v1/auth/refresh No {refresh_token} AuthTokens
GET /api/v1/auth/me JWT UserProfile
POST /api/v1/chat JWT ChatRequest ChatResponse | ExecutionPlan
WS /api/v1/chat/stream JWT ChatRequest (first frame) Token stream + final JSON
GET /api/v1/plans/playbook JWT ExecutionPlan[]
GET /api/v1/plans/playbook/:id JWT ExecutionPlan
POST /api/v1/storage/records JWT StorageRecordCreate {id, created_at}
GET /api/v1/storage/records JWT ?table&page&limit RecordMeta[]
GET /api/v1/storage/records/:id JWT Binary blob
PUT /api/v1/storage/records/:id JWT StorageRecordUpdate {ok: true}
DELETE /api/v1/storage/records/:id JWT {ok: true}
POST /api/v1/storage/vectors/upsert JWT VectorUpsertRequest {upserted: int}
POST /api/v1/storage/vectors/search JWT VectorSearchRequest VectorSearchResponse
DELETE /api/v1/storage/vectors JWT {ids: list[str]} {ok: true}
PUT /api/v1/backup JWT Binary blob + headers {ok: true}
GET /api/v1/backup JWT Binary blob
GET /api/v1/backup/history JWT BackupMetadata[]
DELETE /api/v1/backup/:id JWT {ok: true}
GET /api/v1/plugins JWT ?category&q&page&sort PluginListResponse
GET /api/v1/plugins/:id JWT PluginManifest + stats
POST /api/v1/plugins/:id/install JWT PluginInstallRequest {ok, download_url}
DELETE /api/v1/plugins/:id/install JWT {ok: true}
POST /api/v1/billing/checkout JWT {tier} {checkout_url}
POST /api/v1/billing/webhook Stripe sig Stripe event {ok: true}
GET /api/v1/billing/subscription JWT Subscription info
DELETE /api/v1/billing/subscription JWT {ok: true}
GET /api/v1/health No {status, version}

Stack

Layer Technology
Framework FastAPI + Uvicorn
LLM LangChain + langchain-openai
Auth PyJWT + bcrypt + OAuth2
Billing stripe-python + Stripe Connect
Blob storage boto3 (S3)
Vector store Pinecone or Qdrant (configurable)
Database PostgreSQL + SQLAlchemy + Alembic
Rate limiting slowapi
Testing pytest + pytest-asyncio + httpx + moto (S3 mock)
Deployment Docker → fly.io / Railway / AWS ECS

Development Rules

  1. NEVER persist user data in plaintext. The DB stores only auth, billing, storage metadata, and marketplace data. User context arrives in requests and is discarded. Cloud blobs are E2E encrypted client-side — backend only stores opaque bytes.
  2. NEVER expose prompts. System prompts are composed server-side from fragments. Responses are sanitized before sending. In plan mode, prompt_template fields are reference IDs only.
  3. NEVER decrypt user blobs. app/storage/encryption.py only verifies checksums. No decryption key ever reaches the backend.
  4. Stateless request handling. No server-side session state. All context comes from the client + JWT.
  5. Type hints everywhere. All functions have full type annotations.
  6. Test every agent. Each chat agent has unit tests with mocked LLM responses.
  7. Structured logging. JSON logs with request ID correlation.
  8. Tier gates are enforced server-side. Never trust client-reported tier. Always fetch from DB via TierManager.get_tier(user_id).
  9. One step at a time. Implement one numbered step per session. When the step is fully done, mark all its checkboxes as [x] in this file and commit with message step N complete: <outcome line>.