Files
api/BACKEND_PLAN.md
2026-03-01 23:42:33 +01:00

18 KiB

Backend Plan — Adiuva Cloud API

Separate repository. This document defines the FastAPI backend that the Electron app communicates with.

The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, and backup blob storage. The backend NEVER persists user data. It receives context in requests, uses it for orchestration, and discards it.


Project Structure

adiuva-backend/
├── app/
│   ├── __init__.py
│   ├── main.py                    # FastAPI entry + CORS + lifespan + router includes
│   ├── core/
│   │   ├── __init__.py
│   │   ├── agent_registry.py      # Base classes + singleton registry
│   │   ├── orchestrator.py        # LLM-based intent router
│   │   ├── execution_plan.py      # Plan builder + cache
│   │   └── plugin_loader.py       # Dynamic agent loading
│   ├── agents/
│   │   ├── __init__.py            # Auto-registers all agents
│   │   ├── task_agent.py
│   │   ├── calendar_agent.py
│   │   ├── email_agent.py
│   │   └── analytics_agent.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── chat.py            # POST /chat + WS /chat/stream
│   │   │   ├── plans.py           # GET /plans/playbook
│   │   │   ├── backup.py          # PUT/GET /backup
│   │   │   ├── auth.py            # Register/login/refresh
│   │   │   └── billing.py         # Checkout/webhook/subscription
│   │   └── middleware/
│   │       ├── __init__.py
│   │       ├── auth.py            # JWT validation
│   │       ├── rate_limit.py      # Tier-aware rate limiting
│   │       └── sanitizer.py       # Strip prompt metadata from responses
│   ├── billing/
│   │   ├── __init__.py
│   │   ├── stripe_service.py      # Stripe checkout + webhooks
│   │   └── tier_manager.py        # Feature matrix per tier
│   └── config/
│       ├── __init__.py
│       └── settings.py            # Pydantic BaseSettings (env-based)
├── tests/
│   ├── __init__.py
│   ├── conftest.py                # Fixtures: test client, mock agents, mock LLM
│   ├── test_orchestrator.py
│   ├── test_agents.py
│   ├── test_auth.py
│   └── test_backup.py
├── alembic/                       # DB migrations (auth/billing tables only)
│   ├── alembic.ini
│   └── versions/
├── requirements.txt
├── Dockerfile
├── docker-compose.yml             # App + PostgreSQL + Redis (dev)
├── .env.example
└── README.md

Step-by-Step Implementation

Step 1 — Project scaffolding

  • Initialize repo with the directory structure above
  • Write requirements.txt:
    fastapi>=0.115.0
    uvicorn[standard]>=0.34.0
    langchain>=0.3.0
    langchain-openai>=0.3.0
    pydantic>=2.10.0
    python-jose[cryptography]>=3.3.0
    stripe>=11.0.0
    boto3>=1.35.0
    slowapi>=0.1.9
    sqlalchemy>=2.0.0
    asyncpg>=0.30.0
    alembic>=1.14.0
    bcrypt>=4.2.0
    python-dotenv>=1.0.0
    httpx>=0.28.0
    websockets>=14.0
    pytest>=8.0.0
    pytest-asyncio>=0.24.0
    
  • Write app/main.py: FastAPI app with CORS (allow app://, http://localhost:*), lifespan (init DB pool, init agent registry), include all routers under /api/v1
  • Write app/config/settings.py: Settings(BaseSettings) with fields: DATABASE_URL, JWT_SECRET, JWT_ALGORITHM (default HS256), STRIPE_SECRET_KEY, STRIPE_WEBHOOK_SECRET, S3_BUCKET, S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, OPENAI_API_KEY, CORS_ORIGINS, ENV (dev/prod)
  • Write Dockerfile: Python 3.12 slim, multi-stage (builder + runtime), non-root user
  • Write docker-compose.yml: app, postgres:16, optional redis
  • Write .env.example
  • Outcome: Runnable FastAPI skeleton (returns 404 on all routes).

Step 2 — Pydantic schemas (API contracts)

  • Create app/schemas.py (mirrors src/shared/api-types.ts from Electron repo):
    • ChatRequest: message: str, context: ChatContext, execution_mode: Literal['direct', 'plan']
    • ChatContext: user_profile: dict, relevant_documents: list[str], recent_tasks: list[dict], conversation_history: list[dict]
    • ChatResponse: response: str, actions: list[PlanAction]
    • PlanAction: type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification'], table: str | None, data: dict | None
    • ExecutionPlan: agent: str, steps: list[PlanStep]
    • PlanStep: action: str, prompt_template: str | None, variables: dict | None, data_from_step: int | None
    • BackupMetadata: version: int, timestamp: int, checksum: str, chunk_count: int
    • BillingTier: Literal['free', 'pro', 'power', 'team']
    • AuthTokens: access_token: str, refresh_token: str, expires_at: int
    • UserProfile: id: str, email: str, tier: BillingTier
  • Outcome: All request/response models defined and validated.

Step 3 — Agent Registry + base classes

  • app/core/agent_registry.py:
    • BaseAgent(ABC):
      • user_id: str, shared_memory: dict, vector_store_context: list[str], skills: list[str]
      • Abstract get_name() -> str, get_description() -> str
    • ChatAgent(BaseAgent):
      • Abstract async handle(query: str, context: dict) -> str
      • Abstract get_tools() -> list (LangChain tool definitions)
      • Concrete _tool_loop(llm, messages, tools, max_iter=5) -> str — shared tool-calling loop
    • AgentRegistry (singleton):
      • _agents: dict[str, ChatAgent]
      • register(agent_class) — decorator pattern
      • get(name) -> ChatAgent
      • list_agents() -> list[dict] — returns [{name, description}] for orchestrator prompt
      • async call_agent(name, query, context) -> str — for inter-agent calls
  • Unit tests: register, get, list, call_agent with mock
  • Outcome: Pluggable agent framework.

Step 4 — Orchestrator

  • app/core/orchestrator.py:
    • async classify_intent(message, context, registry) -> str:
      • System prompt: "You are an intent classifier. Given the user message and context, decide which agent to route to. Available agents: {registry.list_agents()}. Respond with just the agent name."
      • Uses gpt-4o-mini via LangChain for low latency
      • Falls back to task_agent if no clear match
    • async route_single(agent_name, message, context) -> ChatResponse:
      • Instantiates agent from registry
      • Calls agent.handle(message, context)
      • Returns response + any actions the agent produced
    • async route_pipeline(agent_names, message, context) -> ChatResponse:
      • Executes agents in sequence
      • Each agent receives {...context, previous_results: [...]}
      • Final synthesis via LLM: "Summarize these agent results into a coherent response"
    • async orchestrate(request: ChatRequest) -> ChatResponse | ExecutionPlan:
      • Main entry point
      • Classifies intent
      • If execution_mode == 'direct': route + return response
      • If execution_mode == 'plan': route + return execution plan with template IDs
    • async orchestrate_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
      • Same as orchestrate but yields tokens for WebSocket streaming
  • Integration tests with mocked LLM and mocked agents
  • Outcome: Intelligent routing with single-agent and pipeline modes.

Step 5 — Execution Plan generator

  • app/core/execution_plan.py:
    • PromptTemplateRegistry: dict of template_id -> prompt_text. Templates are server-side only — client receives IDs.
    • ExecutionPlanBuilder:
      • add_step(action, params) -> self
      • add_llm_step(template_id, variables) -> self
      • add_data_step(action, data_from_step) -> self
      • build() -> ExecutionPlan — validates step references
    • PlanCache:
      • In-memory LRU (maxsize=1000)
      • cache_plan(key, plan), get_plan(key), get_all_playbooks() -> list[ExecutionPlan]
      • Playbooks are pre-built plans for common operations (e.g., "create task from email", "generate weekly report")
  • Outcome: Plans are cacheable as playbooks. Prompt IP never leaves the server.

Step 6 — Chat Agents

  • app/agents/task_agent.py@registry.register:
    • Description: "Manages tasks: create, update, list, suggest"
    • Tools: create_task(title, description, priority, due_date), update_task(id, updates), list_tasks(filters), suggest_tasks(notes_context)
    • System prompt: PM-oriented, validates task structure, infers priority from context
    • handle(): LLM + tool loop via _tool_loop(), returns response text + list of actions performed
  • app/agents/calendar_agent.py@registry.register:
    • Description: "Calendar management: events, conflicts, scheduling"
    • Tools: list_events(date_range), detect_conflicts(events), suggest_reschedule(conflict)
    • Works with event metadata passed in context (never raw calendar data stored)
  • app/agents/email_agent.py@registry.register:
    • Description: "Email analysis: classify, extract actions, draft responses"
    • Tools: classify_email(metadata), extract_action_items(metadata), draft_response(thread_context)
    • Only processes metadata sent by client — never raw email bodies
  • app/agents/analytics_agent.py@registry.register:
    • Description: "Workspace analytics: metrics, reports, trends"
    • Tools: calculate_metrics(task_data), generate_report(period, data), trend_analysis(data_points)
    • Crunches numbers from context, returns structured insights
  • app/agents/__init__.py: imports all agent modules to trigger @registry.register decorators
  • Unit tests per agent with mocked LLM
  • Outcome: Four specialized agents, all registered and tested.

Step 7 — API Routes

7a — Chat endpoint

  • app/api/routes/chat.py:
    • POST /api/v1/chat:
      • Request: ChatRequest
      • Calls orchestrate(request) or orchestrate() + build_plan()
      • Response: ChatResponse or ExecutionPlan
    • WebSocket /api/v1/chat/stream:
      • Client sends ChatRequest as first JSON frame
      • Server yields token strings via orchestrate_stream()
      • Final frame: JSON ChatResponse with {"done": true, "response": "...", "actions": [...]}
      • Heartbeat ping every 30s to keep connection alive

7b — Plans endpoint

  • app/api/routes/plans.py:
    • GET /api/v1/plans/playbook: Returns all playbooks available for the user's tier
    • GET /api/v1/plans/playbook/{plan_id}: Returns a specific plan

7c — Backup endpoint

  • app/api/routes/backup.py:
    • PUT /api/v1/backup: Accepts binary blob + metadata headers (X-Backup-Version, X-Backup-Timestamp, X-Backup-Checksum). Stores in S3 keyed by {user_id}/{timestamp}. Enforces tier limits:
      • Free: 0 (no backup)
      • Pro: 5 GB
      • Power: 50 GB
      • Team: unlimited
    • GET /api/v1/backup: Returns latest blob for authenticated user. Supports If-Modified-Since.
    • GET /api/v1/backup/history: Returns list of BackupMetadata (no blobs).
    • DELETE /api/v1/backup/{backup_id}: Delete specific backup.

7d — Auth endpoint

  • app/api/routes/auth.py:
    • POST /api/v1/auth/register: {email, password} → bcrypt hash → insert user → return AuthTokens
    • POST /api/v1/auth/login: Validate credentials → return AuthTokens
    • POST /api/v1/auth/refresh: Rotate refresh token → return new AuthTokens
    • GET /api/v1/auth/me: Return UserProfile for current JWT

7e — Billing endpoint

  • app/api/routes/billing.py:

    • POST /api/v1/billing/checkout: Creates Stripe checkout session → returns URL
    • POST /api/v1/billing/webhook: Handles Stripe webhooks (subscription lifecycle)
    • GET /api/v1/billing/subscription: Returns current subscription info
    • DELETE /api/v1/billing/subscription: Cancels subscription
  • Outcome: Complete REST + WebSocket API.

Step 8 — Middleware

8a — Auth middleware

  • app/api/middleware/auth.py:
    • FastAPI dependency: get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile
    • Validates JWT signature, expiry, extracts user_id and tier
    • Raises 401 on invalid/expired token
    • Exempt routes: /api/v1/auth/register, /api/v1/auth/login, /api/v1/billing/webhook

8b — Rate limiter

  • app/api/middleware/rate_limit.py:
    • Uses slowapi with Limiter(key_func=get_user_id_from_jwt)
    • Tier-based limits:
      • Free: 20 req/min
      • Pro: 60 req/min
      • Power: 120 req/min
      • Team: 200 req/seat/min
    • Custom 429 response with Retry-After header

8c — Sanitizer

  • app/api/middleware/sanitizer.py:

    • Response middleware that scans response bodies
    • Strips: system prompt fragments, agent internal reasoning, tool schemas, routing metadata
    • Pattern-based detection + exact match against known prompt fingerprints
    • Logs sanitization events for monitoring
  • Outcome: Secure, rate-limited API with prompt IP protection.

Step 9 — Billing & Tier management

  • app/billing/stripe_service.py:
    • create_checkout_session(user_id, tier) -> str
    • handle_webhook(payload, sig_header) -> None: processes checkout.session.completed, customer.subscription.updated, customer.subscription.deleted, invoice.payment_failed
    • get_subscription(user_id) -> dict | None
    • cancel_subscription(user_id) -> None
  • app/billing/tier_manager.py:
    • TierManager:
      • Feature matrix:
        FEATURES = {
            'free':  {'agents': 3, 'batch': False, 'providers': 1, 'backup_gb': 0},
            'pro':   {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 5},
            'power': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 50, 'byok': True},
            'team':  {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': -1, 'sso': True},
        }
        
      • get_tier(user_id) -> BillingTier
      • check_feature(user_id, feature) -> bool
      • get_rate_limit(tier) -> int
  • Outcome: Stripe integration with tier-based feature gating.

Step 10 — Database (auth/billing only)

  • PostgreSQL schema via Alembic:
    • users: id UUID PK, email UNIQUE, password_hash, tier (default 'free'), stripe_customer_id, created_at, updated_at
    • refresh_tokens: id UUID PK, user_id FK, token_hash, expires_at, created_at
    • subscriptions: id UUID PK, user_id FK, stripe_subscription_id, tier, status, current_period_end, created_at
    • backup_metadata: id UUID PK, user_id FK, s3_key, version, timestamp, checksum, size_bytes, created_at
  • Initial Alembic migration
  • SQLAlchemy models in app/models.py
  • Outcome: Auth and billing persistence. Zero user data stored.

Step 11 — Testing & deployment

  • tests/conftest.py: TestClient fixture, mock LLM fixture (AsyncMock returning canned responses), mock agent fixture, test DB (SQLite in-memory for speed)
  • tests/test_orchestrator.py: classify_intent routing, single agent, pipeline, plan mode
  • tests/test_agents.py: each agent with mocked tools
  • tests/test_auth.py: register → login → access protected → refresh → expired token
  • tests/test_backup.py: upload → download → history → delete, tier limit enforcement
  • Dockerfile optimized for production (gunicorn + uvicorn workers)
  • GitHub Actions CI: lint (ruff), test (pytest), build Docker image
  • Outcome: Fully tested, deployable backend.

API Contract Summary

Method Endpoint Auth Request Response
POST /api/v1/auth/register No {email, password} AuthTokens
POST /api/v1/auth/login No {email, password} AuthTokens
POST /api/v1/auth/refresh No {refresh_token} AuthTokens
GET /api/v1/auth/me JWT UserProfile
POST /api/v1/chat JWT ChatRequest ChatResponse | ExecutionPlan
WS /api/v1/chat/stream JWT ChatRequest (first frame) Token stream + final JSON
GET /api/v1/plans/playbook JWT ExecutionPlan[]
GET /api/v1/plans/playbook/:id JWT ExecutionPlan
PUT /api/v1/backup JWT Binary blob + headers {ok: true}
GET /api/v1/backup JWT Binary blob
GET /api/v1/backup/history JWT BackupMetadata[]
DELETE /api/v1/backup/:id JWT {ok: true}
POST /api/v1/billing/checkout JWT {tier} {checkout_url}
POST /api/v1/billing/webhook Stripe sig Stripe event {ok: true}
GET /api/v1/billing/subscription JWT Subscription info
DELETE /api/v1/billing/subscription JWT {ok: true}
GET /api/v1/health No {status, version}

Stack

Layer Technology
Framework FastAPI + Uvicorn
LLM LangChain + langchain-openai
Auth PyJWT + bcrypt + OAuth2
Billing stripe-python
Storage boto3 (S3)
Database PostgreSQL + SQLAlchemy + Alembic
Rate limiting slowapi
Testing pytest + pytest-asyncio + httpx
Deployment Docker → fly.io / Railway / AWS ECS

Development Rules

  1. NEVER persist user data. The DB stores only auth, billing, and backup metadata. User context arrives in requests and is discarded after processing.
  2. NEVER expose prompts. System prompts are composed server-side from fragments. Responses are sanitized before sending.
  3. Stateless request handling. No server-side session state. All context comes from the client + JWT.
  4. Type hints everywhere. All functions have full type annotations.
  5. Test every agent. Each chat agent has unit tests with mocked LLM responses.
  6. Structured logging. JSON logs with request ID correlation.