18 KiB
18 KiB
Backend Plan — Adiuva Cloud API
Separate repository. This document defines the FastAPI backend that the Electron app communicates with.
The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, and backup blob storage. The backend NEVER persists user data. It receives context in requests, uses it for orchestration, and discards it.
Project Structure
adiuva-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI entry + CORS + lifespan + router includes
│ ├── core/
│ │ ├── __init__.py
│ │ ├── agent_registry.py # Base classes + singleton registry
│ │ ├── orchestrator.py # LLM-based intent router
│ │ ├── execution_plan.py # Plan builder + cache
│ │ └── plugin_loader.py # Dynamic agent loading
│ ├── agents/
│ │ ├── __init__.py # Auto-registers all agents
│ │ ├── task_agent.py
│ │ ├── calendar_agent.py
│ │ ├── email_agent.py
│ │ └── analytics_agent.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── chat.py # POST /chat + WS /chat/stream
│ │ │ ├── plans.py # GET /plans/playbook
│ │ │ ├── backup.py # PUT/GET /backup
│ │ │ ├── auth.py # Register/login/refresh
│ │ │ └── billing.py # Checkout/webhook/subscription
│ │ └── middleware/
│ │ ├── __init__.py
│ │ ├── auth.py # JWT validation
│ │ ├── rate_limit.py # Tier-aware rate limiting
│ │ └── sanitizer.py # Strip prompt metadata from responses
│ ├── billing/
│ │ ├── __init__.py
│ │ ├── stripe_service.py # Stripe checkout + webhooks
│ │ └── tier_manager.py # Feature matrix per tier
│ └── config/
│ ├── __init__.py
│ └── settings.py # Pydantic BaseSettings (env-based)
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Fixtures: test client, mock agents, mock LLM
│ ├── test_orchestrator.py
│ ├── test_agents.py
│ ├── test_auth.py
│ └── test_backup.py
├── alembic/ # DB migrations (auth/billing tables only)
│ ├── alembic.ini
│ └── versions/
├── requirements.txt
├── Dockerfile
├── docker-compose.yml # App + PostgreSQL + Redis (dev)
├── .env.example
└── README.md
Step-by-Step Implementation
Step 1 — Project scaffolding ✅
- Initialize repo with the directory structure above
- Write
requirements.txt:fastapi>=0.115.0 uvicorn[standard]>=0.34.0 langchain>=0.3.0 langchain-openai>=0.3.0 pydantic>=2.10.0 python-jose[cryptography]>=3.3.0 stripe>=11.0.0 boto3>=1.35.0 slowapi>=0.1.9 sqlalchemy>=2.0.0 asyncpg>=0.30.0 alembic>=1.14.0 bcrypt>=4.2.0 python-dotenv>=1.0.0 httpx>=0.28.0 websockets>=14.0 pytest>=8.0.0 pytest-asyncio>=0.24.0 - Write
app/main.py: FastAPI app with CORS (allowapp://,http://localhost:*), lifespan (init DB pool, init agent registry), include all routers under/api/v1 - Write
app/config/settings.py:Settings(BaseSettings)with fields:DATABASE_URL,JWT_SECRET,JWT_ALGORITHM(default HS256),STRIPE_SECRET_KEY,STRIPE_WEBHOOK_SECRET,S3_BUCKET,S3_REGION,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,OPENAI_API_KEY,CORS_ORIGINS,ENV(dev/prod) - Write
Dockerfile: Python 3.12 slim, multi-stage (builder + runtime), non-root user - Write
docker-compose.yml: app, postgres:16, optional redis - Write
.env.example - Outcome: Runnable FastAPI skeleton (returns 404 on all routes).
Step 2 — Pydantic schemas (API contracts) ✅
- Create
app/schemas.py(mirrorssrc/shared/api-types.tsfrom Electron repo):ChatRequest:message: str,context: ChatContext,execution_mode: Literal['direct', 'plan']ChatContext:user_profile: dict,relevant_documents: list[str],recent_tasks: list[dict],conversation_history: list[dict]ChatResponse:response: str,actions: list[PlanAction]PlanAction:type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification'],table: str | None,data: dict | NoneExecutionPlan:agent: str,steps: list[PlanStep]PlanStep:action: str,prompt_template: str | None,variables: dict | None,data_from_step: int | NoneBackupMetadata:version: int,timestamp: int,checksum: str,chunk_count: intBillingTier:Literal['free', 'pro', 'power', 'team']AuthTokens:access_token: str,refresh_token: str,expires_at: intUserProfile:id: str,email: str,tier: BillingTier
- Outcome: All request/response models defined and validated.
Step 3 — Agent Registry + base classes ✅
app/core/agent_registry.py:BaseAgent(ABC):user_id: str,shared_memory: dict,vector_store_context: list[str],skills: list[str]- Abstract
get_name() -> str,get_description() -> str
ChatAgent(BaseAgent):- Abstract
async handle(query: str, context: dict) -> str - Abstract
get_tools() -> list(LangChain tool definitions) - Concrete
_tool_loop(llm, messages, tools, max_iter=5) -> str— shared tool-calling loop
- Abstract
AgentRegistry(singleton):_agents: dict[str, ChatAgent]register(agent_class)— decorator patternget(name) -> ChatAgentlist_agents() -> list[dict]— returns[{name, description}]for orchestrator promptasync call_agent(name, query, context) -> str— for inter-agent calls
- Unit tests: register, get, list, call_agent with mock
- Outcome: Pluggable agent framework.
Step 4 — Orchestrator
app/core/orchestrator.py:async classify_intent(message, context, registry) -> str:- System prompt: "You are an intent classifier. Given the user message and context, decide which agent to route to. Available agents: {registry.list_agents()}. Respond with just the agent name."
- Uses gpt-4o-mini via LangChain for low latency
- Falls back to
task_agentif no clear match
async route_single(agent_name, message, context) -> ChatResponse:- Instantiates agent from registry
- Calls
agent.handle(message, context) - Returns response + any actions the agent produced
async route_pipeline(agent_names, message, context) -> ChatResponse:- Executes agents in sequence
- Each agent receives
{...context, previous_results: [...]} - Final synthesis via LLM: "Summarize these agent results into a coherent response"
async orchestrate(request: ChatRequest) -> ChatResponse | ExecutionPlan:- Main entry point
- Classifies intent
- If
execution_mode == 'direct': route + return response - If
execution_mode == 'plan': route + return execution plan with template IDs
async orchestrate_stream(request: ChatRequest) -> AsyncGenerator[str, None]:- Same as orchestrate but yields tokens for WebSocket streaming
- Integration tests with mocked LLM and mocked agents
- Outcome: Intelligent routing with single-agent and pipeline modes.
Step 5 — Execution Plan generator
app/core/execution_plan.py:PromptTemplateRegistry: dict oftemplate_id -> prompt_text. Templates are server-side only — client receives IDs.ExecutionPlanBuilder:add_step(action, params) -> selfadd_llm_step(template_id, variables) -> selfadd_data_step(action, data_from_step) -> selfbuild() -> ExecutionPlan— validates step references
PlanCache:- In-memory LRU (maxsize=1000)
cache_plan(key, plan),get_plan(key),get_all_playbooks() -> list[ExecutionPlan]- Playbooks are pre-built plans for common operations (e.g., "create task from email", "generate weekly report")
- Outcome: Plans are cacheable as playbooks. Prompt IP never leaves the server.
Step 6 — Chat Agents
app/agents/task_agent.py—@registry.register:- Description: "Manages tasks: create, update, list, suggest"
- Tools:
create_task(title, description, priority, due_date),update_task(id, updates),list_tasks(filters),suggest_tasks(notes_context) - System prompt: PM-oriented, validates task structure, infers priority from context
handle(): LLM + tool loop via_tool_loop(), returns response text + list of actions performed
app/agents/calendar_agent.py—@registry.register:- Description: "Calendar management: events, conflicts, scheduling"
- Tools:
list_events(date_range),detect_conflicts(events),suggest_reschedule(conflict) - Works with event metadata passed in context (never raw calendar data stored)
app/agents/email_agent.py—@registry.register:- Description: "Email analysis: classify, extract actions, draft responses"
- Tools:
classify_email(metadata),extract_action_items(metadata),draft_response(thread_context) - Only processes metadata sent by client — never raw email bodies
app/agents/analytics_agent.py—@registry.register:- Description: "Workspace analytics: metrics, reports, trends"
- Tools:
calculate_metrics(task_data),generate_report(period, data),trend_analysis(data_points) - Crunches numbers from context, returns structured insights
app/agents/__init__.py: imports all agent modules to trigger@registry.registerdecorators- Unit tests per agent with mocked LLM
- Outcome: Four specialized agents, all registered and tested.
Step 7 — API Routes
7a — Chat endpoint
app/api/routes/chat.py:POST /api/v1/chat:- Request:
ChatRequest - Calls
orchestrate(request)ororchestrate()+build_plan() - Response:
ChatResponseorExecutionPlan
- Request:
WebSocket /api/v1/chat/stream:- Client sends
ChatRequestas first JSON frame - Server yields token strings via
orchestrate_stream() - Final frame: JSON
ChatResponsewith{"done": true, "response": "...", "actions": [...]} - Heartbeat ping every 30s to keep connection alive
- Client sends
7b — Plans endpoint
app/api/routes/plans.py:GET /api/v1/plans/playbook: Returns all playbooks available for the user's tierGET /api/v1/plans/playbook/{plan_id}: Returns a specific plan
7c — Backup endpoint
app/api/routes/backup.py:PUT /api/v1/backup: Accepts binary blob + metadata headers (X-Backup-Version,X-Backup-Timestamp,X-Backup-Checksum). Stores in S3 keyed by{user_id}/{timestamp}. Enforces tier limits:- Free: 0 (no backup)
- Pro: 5 GB
- Power: 50 GB
- Team: unlimited
GET /api/v1/backup: Returns latest blob for authenticated user. SupportsIf-Modified-Since.GET /api/v1/backup/history: Returns list ofBackupMetadata(no blobs).DELETE /api/v1/backup/{backup_id}: Delete specific backup.
7d — Auth endpoint
app/api/routes/auth.py:POST /api/v1/auth/register:{email, password}→ bcrypt hash → insert user → returnAuthTokensPOST /api/v1/auth/login: Validate credentials → returnAuthTokensPOST /api/v1/auth/refresh: Rotate refresh token → return newAuthTokensGET /api/v1/auth/me: ReturnUserProfilefor current JWT
7e — Billing endpoint
-
app/api/routes/billing.py:POST /api/v1/billing/checkout: Creates Stripe checkout session → returns URLPOST /api/v1/billing/webhook: Handles Stripe webhooks (subscription lifecycle)GET /api/v1/billing/subscription: Returns current subscription infoDELETE /api/v1/billing/subscription: Cancels subscription
-
Outcome: Complete REST + WebSocket API.
Step 8 — Middleware
8a — Auth middleware
app/api/middleware/auth.py:- FastAPI dependency:
get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile - Validates JWT signature, expiry, extracts
user_idandtier - Raises
401on invalid/expired token - Exempt routes:
/api/v1/auth/register,/api/v1/auth/login,/api/v1/billing/webhook
- FastAPI dependency:
8b — Rate limiter
app/api/middleware/rate_limit.py:- Uses
slowapiwithLimiter(key_func=get_user_id_from_jwt) - Tier-based limits:
- Free: 20 req/min
- Pro: 60 req/min
- Power: 120 req/min
- Team: 200 req/seat/min
- Custom 429 response with
Retry-Afterheader
- Uses
8c — Sanitizer
-
app/api/middleware/sanitizer.py:- Response middleware that scans response bodies
- Strips: system prompt fragments, agent internal reasoning, tool schemas, routing metadata
- Pattern-based detection + exact match against known prompt fingerprints
- Logs sanitization events for monitoring
-
Outcome: Secure, rate-limited API with prompt IP protection.
Step 9 — Billing & Tier management
app/billing/stripe_service.py:create_checkout_session(user_id, tier) -> strhandle_webhook(payload, sig_header) -> None: processescheckout.session.completed,customer.subscription.updated,customer.subscription.deleted,invoice.payment_failedget_subscription(user_id) -> dict | Nonecancel_subscription(user_id) -> None
app/billing/tier_manager.py:TierManager:- Feature matrix:
FEATURES = { 'free': {'agents': 3, 'batch': False, 'providers': 1, 'backup_gb': 0}, 'pro': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 5}, 'power': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 50, 'byok': True}, 'team': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': -1, 'sso': True}, } get_tier(user_id) -> BillingTiercheck_feature(user_id, feature) -> boolget_rate_limit(tier) -> int
- Feature matrix:
- Outcome: Stripe integration with tier-based feature gating.
Step 10 — Database (auth/billing only)
- PostgreSQL schema via Alembic:
users:id UUID PK,email UNIQUE,password_hash,tier(default 'free'),stripe_customer_id,created_at,updated_atrefresh_tokens:id UUID PK,user_id FK,token_hash,expires_at,created_atsubscriptions:id UUID PK,user_id FK,stripe_subscription_id,tier,status,current_period_end,created_atbackup_metadata:id UUID PK,user_id FK,s3_key,version,timestamp,checksum,size_bytes,created_at
- Initial Alembic migration
- SQLAlchemy models in
app/models.py - Outcome: Auth and billing persistence. Zero user data stored.
Step 11 — Testing & deployment
tests/conftest.py: TestClient fixture, mock LLM fixture (AsyncMockreturning canned responses), mock agent fixture, test DB (SQLite in-memory for speed)tests/test_orchestrator.py: classify_intent routing, single agent, pipeline, plan modetests/test_agents.py: each agent with mocked toolstests/test_auth.py: register → login → access protected → refresh → expired tokentests/test_backup.py: upload → download → history → delete, tier limit enforcementDockerfileoptimized for production (gunicorn + uvicorn workers)- GitHub Actions CI: lint (ruff), test (pytest), build Docker image
- Outcome: Fully tested, deployable backend.
API Contract Summary
| Method | Endpoint | Auth | Request | Response |
|---|---|---|---|---|
| POST | /api/v1/auth/register |
No | {email, password} |
AuthTokens |
| POST | /api/v1/auth/login |
No | {email, password} |
AuthTokens |
| POST | /api/v1/auth/refresh |
No | {refresh_token} |
AuthTokens |
| GET | /api/v1/auth/me |
JWT | — | UserProfile |
| POST | /api/v1/chat |
JWT | ChatRequest |
ChatResponse | ExecutionPlan |
| WS | /api/v1/chat/stream |
JWT | ChatRequest (first frame) |
Token stream + final JSON |
| GET | /api/v1/plans/playbook |
JWT | — | ExecutionPlan[] |
| GET | /api/v1/plans/playbook/:id |
JWT | — | ExecutionPlan |
| PUT | /api/v1/backup |
JWT | Binary blob + headers | {ok: true} |
| GET | /api/v1/backup |
JWT | — | Binary blob |
| GET | /api/v1/backup/history |
JWT | — | BackupMetadata[] |
| DELETE | /api/v1/backup/:id |
JWT | — | {ok: true} |
| POST | /api/v1/billing/checkout |
JWT | {tier} |
{checkout_url} |
| POST | /api/v1/billing/webhook |
Stripe sig | Stripe event | {ok: true} |
| GET | /api/v1/billing/subscription |
JWT | — | Subscription info |
| DELETE | /api/v1/billing/subscription |
JWT | — | {ok: true} |
| GET | /api/v1/health |
No | — | {status, version} |
Stack
| Layer | Technology |
|---|---|
| Framework | FastAPI + Uvicorn |
| LLM | LangChain + langchain-openai |
| Auth | PyJWT + bcrypt + OAuth2 |
| Billing | stripe-python |
| Storage | boto3 (S3) |
| Database | PostgreSQL + SQLAlchemy + Alembic |
| Rate limiting | slowapi |
| Testing | pytest + pytest-asyncio + httpx |
| Deployment | Docker → fly.io / Railway / AWS ECS |
Development Rules
- NEVER persist user data. The DB stores only auth, billing, and backup metadata. User context arrives in requests and is discarded after processing.
- NEVER expose prompts. System prompts are composed server-side from fragments. Responses are sanitized before sending.
- Stateless request handling. No server-side session state. All context comes from the client + JWT.
- Type hints everywhere. All functions have full type annotations.
- Test every agent. Each chat agent has unit tests with mocked LLM responses.
- Structured logging. JSON logs with request ID correlation.
- One step at a time. Implement one numbered step per session. When the step is fully done, mark all its checkboxes as
[x]in this file and commit with messagestep N complete: <outcome line>.