From 68955d2fc21b80970ccd804eb0d0ba9889a0897b Mon Sep 17 00:00:00 2001 From: roberto Date: Mon, 2 Mar 2026 13:03:54 +0100 Subject: [PATCH] step 4 complete: intelligent routing with single-agent and pipeline modes Co-Authored-By: Claude Sonnet 4.6 --- BACKEND_PLAN.md | 259 ++++++++++++++++++++++----- app/core/orchestrator.py | 170 ++++++++++++++++++ tests/test_orchestrator.py | 348 +++++++++++++++++++++++++++++++++++++ 3 files changed, 735 insertions(+), 42 deletions(-) create mode 100644 app/core/orchestrator.py create mode 100644 tests/test_orchestrator.py diff --git a/BACKEND_PLAN.md b/BACKEND_PLAN.md index be8be32..8424e3c 100644 --- a/BACKEND_PLAN.md +++ b/BACKEND_PLAN.md @@ -2,8 +2,8 @@ > **Separate repository.** This document defines the FastAPI backend that the Electron app communicates with. > -> The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, and backup blob storage. -> The backend NEVER persists user data. It receives context in requests, uses it for orchestration, and discards it. +> The backend owns: orchestration logic, chat agent intelligence, prompt IP, auth, billing, E2E backup blob storage, cloud storage (encrypted blobs), cloud vector store, and plugin marketplace. +> The backend NEVER persists user data in plaintext. Cloud storage blobs are E2E encrypted before upload — the backend only verifies integrity, never decrypts. --- @@ -20,7 +20,7 @@ adiuva-api/ │ │ ├── orchestrator.py # LLM-based intent router │ │ ├── execution_plan.py # Plan builder + cache │ │ └── plugin_loader.py # Dynamic agent loading -│ ├── agents/ +│ ├── agents/ # Chat agents (proprietary logic + prompts) │ │ ├── __init__.py # Auto-registers all agents │ │ ├── task_agent.py │ │ ├── calendar_agent.py @@ -32,7 +32,10 @@ adiuva-api/ │ │ │ ├── __init__.py │ │ │ ├── chat.py # POST /chat + WS /chat/stream │ │ │ ├── plans.py # GET /plans/playbook +│ │ │ ├── storage.py # CRUD cloud storage (E2E encrypted blobs) +│ │ │ ├── vectors.py # Upsert/search cloud vector store │ │ │ ├── backup.py # PUT/GET /backup +│ │ │ ├── plugins.py # Plugin marketplace │ │ │ ├── auth.py # Register/login/refresh │ │ │ └── billing.py # Checkout/webhook/subscription │ │ └── middleware/ @@ -40,6 +43,16 @@ adiuva-api/ │ │ ├── auth.py # JWT validation │ │ ├── rate_limit.py # Tier-aware rate limiting │ │ └── sanitizer.py # Strip prompt metadata from responses +│ ├── storage/ +│ │ ├── __init__.py +│ │ ├── blob_store.py # S3 for E2E encrypted blobs +│ │ ├── vector_store.py # Cloud vector store (Pinecone/Qdrant) +│ │ └── encryption.py # Integrity verification only — NO decryption +│ ├── marketplace/ +│ │ ├── __init__.py +│ │ ├── plugin_registry.py # Plugin catalog (metadata, versions, ratings) +│ │ ├── plugin_review.py # Review queue + approval workflow +│ │ └── revenue_share.py # 70/30 split tracking with Stripe Connect │ ├── billing/ │ │ ├── __init__.py │ │ ├── stripe_service.py # Stripe checkout + webhooks @@ -53,8 +66,10 @@ adiuva-api/ │ ├── test_orchestrator.py │ ├── test_agents.py │ ├── test_auth.py -│ └── test_backup.py -├── alembic/ # DB migrations (auth/billing tables only) +│ ├── test_backup.py +│ ├── test_storage.py +│ └── test_plugins.py +├── alembic/ # DB migrations (auth/billing/marketplace tables only) │ ├── alembic.ini │ └── versions/ ├── requirements.txt @@ -92,7 +107,7 @@ adiuva-api/ pytest-asyncio>=0.24.0 ``` - [x] Write `app/main.py`: FastAPI app with CORS (allow `app://`, `http://localhost:*`), lifespan (init DB pool, init agent registry), include all routers under `/api/v1` -- [x] Write `app/config/settings.py`: `Settings(BaseSettings)` with fields: `DATABASE_URL`, `JWT_SECRET`, `JWT_ALGORITHM` (default HS256), `STRIPE_SECRET_KEY`, `STRIPE_WEBHOOK_SECRET`, `S3_BUCKET`, `S3_REGION`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `OPENAI_API_KEY`, `CORS_ORIGINS`, `ENV` (dev/prod) +- [x] Write `app/config/settings.py`: `Settings(BaseSettings)` with fields: `DATABASE_URL`, `JWT_SECRET`, `JWT_ALGORITHM` (default HS256), `STRIPE_SECRET_KEY`, `STRIPE_WEBHOOK_SECRET`, `S3_BUCKET`, `S3_REGION`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `OPENAI_API_KEY`, `CORS_ORIGINS`, `ENV` (dev/prod), `PINECONE_API_KEY`, `PINECONE_INDEX`, `QDRANT_URL`, `QDRANT_API_KEY` - [x] Write `Dockerfile`: Python 3.12 slim, multi-stage (builder + runtime), non-root user - [x] Write `docker-compose.yml`: app, postgres:16, optional redis - [x] Write `.env.example` @@ -103,13 +118,24 @@ adiuva-api/ - `ChatRequest`: `message: str`, `context: ChatContext`, `execution_mode: Literal['direct', 'plan']` - `ChatContext`: `user_profile: dict`, `relevant_documents: list[str]`, `recent_tasks: list[dict]`, `conversation_history: list[dict]` - `ChatResponse`: `response: str`, `actions: list[PlanAction]` - - `PlanAction`: `type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification']`, `table: str | None`, `data: dict | None` + - `PlanAction`: `type: Literal['create_record', 'update_record', 'delete_record', 'index_document', 'send_notification', 'call_agent']`, `table: str | None`, `data: dict | None`, `agent: str | None` - `ExecutionPlan`: `agent: str`, `steps: list[PlanStep]` - `PlanStep`: `action: str`, `prompt_template: str | None`, `variables: dict | None`, `data_from_step: int | None` - `BackupMetadata`: `version: int`, `timestamp: int`, `checksum: str`, `chunk_count: int` - `BillingTier`: `Literal['free', 'pro', 'power', 'team']` - `AuthTokens`: `access_token: str`, `refresh_token: str`, `expires_at: int` - `UserProfile`: `id: str`, `email: str`, `tier: BillingTier` + - `StorageRecord`: `id: str`, `user_id: str`, `table: str`, `blob: bytes`, `checksum: str`, `created_at: int`, `updated_at: int` — blob is always E2E encrypted by client + - `StorageRecordCreate`: `table: str`, `blob: bytes`, `checksum: str` + - `StorageRecordUpdate`: `blob: bytes`, `checksum: str` + - `VectorUpsertRequest`: `vectors: list[VectorItem]` + - `VectorItem`: `id: str`, `blob: bytes`, `checksum: str` — vector + metadata encrypted by client + - `VectorSearchRequest`: `query_blob: bytes`, `top_k: int = 10` + - `VectorSearchResponse`: `results: list[VectorSearchResult]` + - `VectorSearchResult`: `id: str`, `score: float`, `blob: bytes` + - `PluginManifest`: `id: str`, `name: str`, `description: str`, `version: str`, `author: str`, `permissions: list[str]`, `category: str`, `price_cents: int = 0` + - `PluginListResponse`: `plugins: list[PluginManifest]`, `total: int`, `page: int` + - `PluginInstallRequest`: `plugin_id: str` - **Outcome:** All request/response models defined and validated. ### Step 3 — Agent Registry + base classes ✅ @@ -130,8 +156,8 @@ adiuva-api/ - [x] Unit tests: register, get, list, call_agent with mock - **Outcome:** Pluggable agent framework. -### Step 4 — Orchestrator -- [ ] `app/core/orchestrator.py`: +### Step 4 — Orchestrator ✅ +- [x] `app/core/orchestrator.py`: - `async classify_intent(message, context, registry) -> str`: - System prompt: "You are an intent classifier. Given the user message and context, decide which agent to route to. Available agents: {registry.list_agents()}. Respond with just the agent name." - Uses gpt-4o-mini via LangChain for low latency @@ -146,12 +172,13 @@ adiuva-api/ - Final synthesis via LLM: "Summarize these agent results into a coherent response" - `async orchestrate(request: ChatRequest) -> ChatResponse | ExecutionPlan`: - Main entry point + - Context is transparent to orchestrator — data may originate from local or cloud storage on the client side - Classifies intent - If `execution_mode == 'direct'`: route + return response - If `execution_mode == 'plan'`: route + return execution plan with template IDs - `async orchestrate_stream(request: ChatRequest) -> AsyncGenerator[str, None]`: - Same as orchestrate but yields tokens for WebSocket streaming -- [ ] Integration tests with mocked LLM and mocked agents +- [x] Integration tests with mocked LLM and mocked agents - **Outcome:** Intelligent routing with single-agent and pipeline modes. ### Step 5 — Execution Plan generator @@ -174,6 +201,7 @@ adiuva-api/ - Tools: `create_task(title, description, priority, due_date)`, `update_task(id, updates)`, `list_tasks(filters)`, `suggest_tasks(notes_context)` - System prompt: PM-oriented, validates task structure, infers priority from context - `handle()`: LLM + tool loop via `_tool_loop()`, returns response text + list of actions performed + - Accepts flexible context: mandatory fields `user_profile` + `message`, all other fields (from batch/plugin output) are optional - [ ] `app/agents/calendar_agent.py` — `@registry.register`: - Description: "Calendar management: events, conflicts, scheduling" - Tools: `list_events(date_range)`, `detect_conflicts(events)`, `suggest_reschedule(conflict)` @@ -190,9 +218,32 @@ adiuva-api/ - [ ] Unit tests per agent with mocked LLM - **Outcome:** Four specialized agents, all registered and tested. -### Step 7 — API Routes +### Step 7 — Storage Layer +- [ ] `app/storage/blob_store.py`: + - `BlobStore`: + - `async upload(user_id, table, record_id, blob: bytes, checksum: str) -> str` — returns S3 key + - `async download(user_id, s3_key) -> bytes` + - `async delete(user_id, s3_key) -> None` + - `async list_keys(user_id, table) -> list[str]` + - Keys structured as `{user_id}/{table}/{record_id}` — backend never inspects blob content + - Uses boto3 S3 with server-side encryption at rest (SSE-S3) as extra layer +- [ ] `app/storage/vector_store.py`: + - `VectorStore`: + - `async upsert(user_id, vectors: list[VectorItem]) -> None` — vectors are pre-encrypted blobs + - `async search(user_id, query_blob: bytes, top_k: int) -> list[VectorSearchResult]` + - `async delete(user_id, vector_ids: list[str]) -> None` + - Wraps Pinecone (default) or Qdrant — configurable via settings + - Namespace per `user_id` for isolation + - Note: because vectors are E2E encrypted by client, ANN search is on the encrypted representation — semantic search accuracy is a known trade-off when users choose cloud vectors +- [ ] `app/storage/encryption.py`: + - `verify_checksum(blob: bytes, checksum: str) -> bool` — SHA-256 HMAC integrity check only + - `reject_if_tampered(blob, checksum)` — raises `400` if mismatch + - Backend NEVER holds decryption keys — all crypto is client-side +- **Outcome:** Cloud storage layer that handles E2E encrypted blobs without ever accessing plaintext. -#### 7a — Chat endpoint +### Step 8 — API Routes + +#### 8a — Chat endpoint - [ ] `app/api/routes/chat.py`: - `POST /api/v1/chat`: - Request: `ChatRequest` @@ -204,48 +255,93 @@ adiuva-api/ - Final frame: JSON `ChatResponse` with `{"done": true, "response": "...", "actions": [...]}` - Heartbeat ping every 30s to keep connection alive -#### 7b — Plans endpoint +#### 8b — Plans endpoint - [ ] `app/api/routes/plans.py`: - `GET /api/v1/plans/playbook`: Returns all playbooks available for the user's tier - `GET /api/v1/plans/playbook/{plan_id}`: Returns a specific plan -#### 7c — Backup endpoint +#### 8c — Storage endpoint (cloud records) +- [ ] `app/api/routes/storage.py`: + - `POST /api/v1/storage/records`: Create encrypted record + - Request: `StorageRecordCreate` + - Verifies checksum, stores blob in S3, inserts metadata row in PostgreSQL + - Response: `{id: str, created_at: int}` + - `GET /api/v1/storage/records`: List record metadata (no blobs) + - Query params: `table: str`, `page: int`, `limit: int` + - Response: `list[{id, table, checksum, created_at, updated_at}]` + - `GET /api/v1/storage/records/{id}`: Download encrypted blob + - Response: blob bytes + `X-Checksum` header + - `PUT /api/v1/storage/records/{id}`: Update encrypted blob + - Request: `StorageRecordUpdate` + - `DELETE /api/v1/storage/records/{id}`: Delete record + S3 blob + - All routes enforce tier cloud_storage_gb quota via `TierManager.check_quota(user_id)` + +#### 8d — Vectors endpoint (cloud vector store) +- [ ] `app/api/routes/vectors.py`: + - `POST /api/v1/storage/vectors/upsert`: + - Request: `VectorUpsertRequest` + - Verifies checksums, delegates to `VectorStore.upsert()` + - Response: `{upserted: int}` + - `POST /api/v1/storage/vectors/search`: + - Request: `VectorSearchRequest` + - Delegates to `VectorStore.search()` + - Response: `VectorSearchResponse` + - `DELETE /api/v1/storage/vectors`: + - Request: `{ids: list[str]}` + +#### 8e — Backup endpoint - [ ] `app/api/routes/backup.py`: - `PUT /api/v1/backup`: Accepts binary blob + metadata headers (`X-Backup-Version`, `X-Backup-Timestamp`, `X-Backup-Checksum`). Stores in S3 keyed by `{user_id}/{timestamp}`. Enforces tier limits: - Free: 0 (no backup) - Pro: 5 GB - - Power: 50 GB + - Power: 25 GB - Team: unlimited - `GET /api/v1/backup`: Returns latest blob for authenticated user. Supports `If-Modified-Since`. - `GET /api/v1/backup/history`: Returns list of `BackupMetadata` (no blobs). - `DELETE /api/v1/backup/{backup_id}`: Delete specific backup. -#### 7d — Auth endpoint +#### 8f — Plugins endpoint +- [ ] `app/api/routes/plugins.py`: + - `GET /api/v1/plugins`: + - Query params: `category: str | None`, `q: str | None`, `page: int`, `sort: Literal['rating', 'installs', 'newest']` + - Response: `PluginListResponse` + - Available from Power tier and above + - `GET /api/v1/plugins/{id}`: + - Response: `PluginManifest` + ratings + install count + - `POST /api/v1/plugins/{id}/install`: + - Request: `PluginInstallRequest` + - Records installation for the user (billing tracking, analytics) + - If plugin is paid: triggers Stripe Connect charge + revenue split (70% developer, 30% platform) + - Response: `{ok: true, download_url: str}` — signed S3 URL for plugin package + - `DELETE /api/v1/plugins/{id}/install`: + - Unregisters installation + +#### 8g — Auth endpoint - [ ] `app/api/routes/auth.py`: - `POST /api/v1/auth/register`: `{email, password}` → bcrypt hash → insert user → return `AuthTokens` - `POST /api/v1/auth/login`: Validate credentials → return `AuthTokens` - `POST /api/v1/auth/refresh`: Rotate refresh token → return new `AuthTokens` - `GET /api/v1/auth/me`: Return `UserProfile` for current JWT -#### 7e — Billing endpoint +#### 8h — Billing endpoint - [ ] `app/api/routes/billing.py`: - `POST /api/v1/billing/checkout`: Creates Stripe checkout session → returns URL - `POST /api/v1/billing/webhook`: Handles Stripe webhooks (subscription lifecycle) - `GET /api/v1/billing/subscription`: Returns current subscription info - `DELETE /api/v1/billing/subscription`: Cancels subscription -- **Outcome:** Complete REST + WebSocket API. +- **Outcome:** Complete REST + WebSocket API covering orchestration, storage, vectors, backup, marketplace. -### Step 8 — Middleware +### Step 9 — Middleware -#### 8a — Auth middleware +#### 9a — Auth middleware - [ ] `app/api/middleware/auth.py`: - FastAPI dependency: `get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile` - Validates JWT signature, expiry, extracts `user_id` and `tier` - Raises `401` on invalid/expired token - Exempt routes: `/api/v1/auth/register`, `/api/v1/auth/login`, `/api/v1/billing/webhook` -#### 8b — Rate limiter +#### 9b — Rate limiter - [ ] `app/api/middleware/rate_limit.py`: - Uses `slowapi` with `Limiter(key_func=get_user_id_from_jwt)` - Tier-based limits: @@ -255,7 +351,7 @@ adiuva-api/ - Team: 200 req/seat/min - Custom 429 response with `Retry-After` header -#### 8c — Sanitizer +#### 9c — Sanitizer - [ ] `app/api/middleware/sanitizer.py`: - Response middleware that scans response bodies - Strips: system prompt fragments, agent internal reasoning, tool schemas, routing metadata @@ -264,7 +360,27 @@ adiuva-api/ - **Outcome:** Secure, rate-limited API with prompt IP protection. -### Step 9 — Billing & Tier management +### Step 10 — Plugin Marketplace +- [ ] `app/marketplace/plugin_registry.py`: + - `PluginRegistry`: + - `async list_plugins(category, query, page, sort) -> PluginListResponse` + - `async get_plugin(plugin_id) -> PluginManifest | None` + - `async submit_plugin(manifest: PluginManifest, package_s3_key: str) -> str` — returns plugin_id, sets status = 'pending_review' + - `async approve_plugin(plugin_id) -> None` — admin only, sets status = 'approved' + - `async reject_plugin(plugin_id, reason: str) -> None` +- [ ] `app/marketplace/plugin_review.py`: + - `ReviewQueue`: + - `async get_pending() -> list[dict]` + - `async submit_review(plugin_id, reviewer_id, decision, notes) -> None` + - Security checklist enforced before approval: manifest schema valid, permissions are from allowed set, no binary blobs in manifest +- [ ] `app/marketplace/revenue_share.py`: + - `RevenueShare`: + - `async record_install(plugin_id, user_id, amount_cents) -> None` + - `async payout_developer(plugin_id, period) -> None` — Stripe Connect transfer: 70% to developer + - `async get_earnings(developer_id, period) -> dict` +- **Outcome:** Plugin marketplace with catalog, review workflow, and revenue split. + +### Step 11 — Billing & Tier management - [ ] `app/billing/stripe_service.py`: - `create_checkout_session(user_id, tier) -> str` - `handle_webhook(payload, sig_header) -> None`: processes `checkout.session.completed`, `customer.subscription.updated`, `customer.subscription.deleted`, `invoice.payment_failed` @@ -275,33 +391,77 @@ adiuva-api/ - Feature matrix: ```python FEATURES = { - 'free': {'agents': 3, 'batch': False, 'providers': 1, 'backup_gb': 0}, - 'pro': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 5}, - 'power': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': 50, 'byok': True}, - 'team': {'agents': -1, 'batch': True, 'providers': -1, 'backup_gb': -1, 'sso': True}, + 'free': { + 'agents': 3, + 'batch_active': 2, + 'cloud_storage_gb': 0, + 'backup_gb': 0, + 'providers': 1, + 'batch_builder': False, + 'plugin_marketplace': False, + 'sso': False, + }, + 'pro': { + 'agents': -1, # unlimited + 'batch_active': 10, + 'cloud_storage_gb': 5, + 'backup_gb': 5, + 'providers': -1, + 'batch_builder': False, + 'plugin_marketplace': False, + 'sso': False, + }, + 'power': { + 'agents': -1, + 'batch_active': -1, # unlimited + 'cloud_storage_gb': 25, + 'backup_gb': 25, + 'providers': -1, + 'batch_builder': True, + 'plugin_marketplace': True, + 'sso': False, + }, + 'team': { + 'agents': -1, + 'batch_active': -1, + 'cloud_storage_gb': -1, + 'backup_gb': -1, + 'providers': -1, + 'batch_builder': True, + 'plugin_marketplace': True, + 'sso': True, + }, } ``` - `get_tier(user_id) -> BillingTier` - `check_feature(user_id, feature) -> bool` - `get_rate_limit(tier) -> int` -- **Outcome:** Stripe integration with tier-based feature gating. + - `check_quota(user_id) -> bool` — checks cloud_storage_gb current usage vs limit +- **Outcome:** Stripe integration with tier-based feature gating matching Free/Pro(15€)/Power(29€)/Team(49€/seat). -### Step 10 — Database (auth/billing only) +### Step 12 — Database (auth/billing/marketplace only) - [ ] PostgreSQL schema via Alembic: - `users`: `id UUID PK`, `email UNIQUE`, `password_hash`, `tier` (default 'free'), `stripe_customer_id`, `created_at`, `updated_at` - `refresh_tokens`: `id UUID PK`, `user_id FK`, `token_hash`, `expires_at`, `created_at` - `subscriptions`: `id UUID PK`, `user_id FK`, `stripe_subscription_id`, `tier`, `status`, `current_period_end`, `created_at` - `backup_metadata`: `id UUID PK`, `user_id FK`, `s3_key`, `version`, `timestamp`, `checksum`, `size_bytes`, `created_at` + - `storage_records`: `id UUID PK`, `user_id FK`, `table_name VARCHAR`, `s3_key`, `checksum`, `size_bytes`, `created_at`, `updated_at` — metadata only, no plaintext + - `plugins`: `id UUID PK`, `name`, `description`, `version`, `author_id FK`, `category`, `status` (pending_review/approved/rejected), `price_cents`, `s3_package_key`, `install_count`, `avg_rating`, `created_at` + - `plugin_installations`: `id UUID PK`, `plugin_id FK`, `user_id FK`, `installed_at` + - `plugin_reviews`: `id UUID PK`, `plugin_id FK`, `reviewer_id FK`, `decision`, `notes`, `reviewed_at` + - `revenue_events`: `id UUID PK`, `plugin_id FK`, `user_id FK`, `amount_cents`, `developer_share_cents`, `stripe_transfer_id`, `created_at` - [ ] Initial Alembic migration - [ ] SQLAlchemy models in `app/models.py` -- **Outcome:** Auth and billing persistence. Zero user data stored. +- **Outcome:** Auth, billing, storage metadata, and marketplace persistence. Zero user data in plaintext. -### Step 11 — Testing & deployment -- [ ] `tests/conftest.py`: TestClient fixture, mock LLM fixture (`AsyncMock` returning canned responses), mock agent fixture, test DB (SQLite in-memory for speed) +### Step 13 — Testing & deployment +- [ ] `tests/conftest.py`: TestClient fixture, mock LLM fixture (`AsyncMock` returning canned responses), mock agent fixture, test DB (SQLite in-memory for speed), mock S3 (moto), mock Pinecone - [ ] `tests/test_orchestrator.py`: classify_intent routing, single agent, pipeline, plan mode - [ ] `tests/test_agents.py`: each agent with mocked tools - [ ] `tests/test_auth.py`: register → login → access protected → refresh → expired token - [ ] `tests/test_backup.py`: upload → download → history → delete, tier limit enforcement +- [ ] `tests/test_storage.py`: create record → list → download → update → delete, checksum rejection, quota enforcement +- [ ] `tests/test_plugins.py`: list plugins, install, uninstall, revenue event creation, tier gate (free user blocked) - [ ] `Dockerfile` optimized for production (gunicorn + uvicorn workers) - [ ] GitHub Actions CI: lint (ruff), test (pytest), build Docker image - **Outcome:** Fully tested, deployable backend. @@ -320,10 +480,22 @@ adiuva-api/ | WS | `/api/v1/chat/stream` | JWT | `ChatRequest` (first frame) | Token stream + final JSON | | GET | `/api/v1/plans/playbook` | JWT | — | `ExecutionPlan[]` | | GET | `/api/v1/plans/playbook/:id` | JWT | — | `ExecutionPlan` | +| POST | `/api/v1/storage/records` | JWT | `StorageRecordCreate` | `{id, created_at}` | +| GET | `/api/v1/storage/records` | JWT | `?table&page&limit` | `RecordMeta[]` | +| GET | `/api/v1/storage/records/:id` | JWT | — | Binary blob | +| PUT | `/api/v1/storage/records/:id` | JWT | `StorageRecordUpdate` | `{ok: true}` | +| DELETE | `/api/v1/storage/records/:id` | JWT | — | `{ok: true}` | +| POST | `/api/v1/storage/vectors/upsert` | JWT | `VectorUpsertRequest` | `{upserted: int}` | +| POST | `/api/v1/storage/vectors/search` | JWT | `VectorSearchRequest` | `VectorSearchResponse` | +| DELETE | `/api/v1/storage/vectors` | JWT | `{ids: list[str]}` | `{ok: true}` | | PUT | `/api/v1/backup` | JWT | Binary blob + headers | `{ok: true}` | | GET | `/api/v1/backup` | JWT | — | Binary blob | | GET | `/api/v1/backup/history` | JWT | — | `BackupMetadata[]` | | DELETE | `/api/v1/backup/:id` | JWT | — | `{ok: true}` | +| GET | `/api/v1/plugins` | JWT | `?category&q&page&sort` | `PluginListResponse` | +| GET | `/api/v1/plugins/:id` | JWT | — | `PluginManifest` + stats | +| POST | `/api/v1/plugins/:id/install` | JWT | `PluginInstallRequest` | `{ok, download_url}` | +| DELETE | `/api/v1/plugins/:id/install` | JWT | — | `{ok: true}` | | POST | `/api/v1/billing/checkout` | JWT | `{tier}` | `{checkout_url}` | | POST | `/api/v1/billing/webhook` | Stripe sig | Stripe event | `{ok: true}` | | GET | `/api/v1/billing/subscription` | JWT | — | Subscription info | @@ -339,21 +511,24 @@ adiuva-api/ | Framework | FastAPI + Uvicorn | | LLM | LangChain + langchain-openai | | Auth | PyJWT + bcrypt + OAuth2 | -| Billing | stripe-python | -| Storage | boto3 (S3) | +| Billing | stripe-python + Stripe Connect | +| Blob storage | boto3 (S3) | +| Vector store | Pinecone or Qdrant (configurable) | | Database | PostgreSQL + SQLAlchemy + Alembic | | Rate limiting | slowapi | -| Testing | pytest + pytest-asyncio + httpx | +| Testing | pytest + pytest-asyncio + httpx + moto (S3 mock) | | Deployment | Docker → fly.io / Railway / AWS ECS | --- ## Development Rules -1. **NEVER persist user data.** The DB stores only auth, billing, and backup metadata. User context arrives in requests and is discarded after processing. -2. **NEVER expose prompts.** System prompts are composed server-side from fragments. Responses are sanitized before sending. -3. **Stateless request handling.** No server-side session state. All context comes from the client + JWT. -4. **Type hints everywhere.** All functions have full type annotations. -5. **Test every agent.** Each chat agent has unit tests with mocked LLM responses. -6. **Structured logging.** JSON logs with request ID correlation. -7. **One step at a time.** Implement one numbered step per session. When the step is fully done, mark all its checkboxes as `[x]` in this file and commit with message `step N complete: `. +1. **NEVER persist user data in plaintext.** The DB stores only auth, billing, storage metadata, and marketplace data. User context arrives in requests and is discarded. Cloud blobs are E2E encrypted client-side — backend only stores opaque bytes. +2. **NEVER expose prompts.** System prompts are composed server-side from fragments. Responses are sanitized before sending. In plan mode, `prompt_template` fields are reference IDs only. +3. **NEVER decrypt user blobs.** `app/storage/encryption.py` only verifies checksums. No decryption key ever reaches the backend. +4. **Stateless request handling.** No server-side session state. All context comes from the client + JWT. +5. **Type hints everywhere.** All functions have full type annotations. +6. **Test every agent.** Each chat agent has unit tests with mocked LLM responses. +7. **Structured logging.** JSON logs with request ID correlation. +8. **Tier gates are enforced server-side.** Never trust client-reported tier. Always fetch from DB via `TierManager.get_tier(user_id)`. +9. **One step at a time.** Implement one numbered step per session. When the step is fully done, mark all its checkboxes as `[x]` in this file and commit with message `step N complete: `. diff --git a/app/core/orchestrator.py b/app/core/orchestrator.py new file mode 100644 index 0000000..82e8f6c --- /dev/null +++ b/app/core/orchestrator.py @@ -0,0 +1,170 @@ +"""Orchestrator — LLM-based intent router and agent pipeline.""" + +from __future__ import annotations + +import json +from typing import Any, AsyncGenerator + +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI + +from app.config.settings import settings +from app.core.agent_registry import AgentRegistry +from app.core.agent_registry import registry as _default_registry +from app.schemas import ChatRequest, ChatResponse, ExecutionPlan, PlanStep + +_FALLBACK_AGENT = "task_agent" + +_CLASSIFY_SYSTEM = ( + "You are an intent classifier. Given the user message and context, decide " + "which agent to route to.\n" + "Available agents: {agents}\n" + "Respond with just the agent name, nothing else." +) + +_SYNTHESIZE_HUMAN = ( + "Combine the following agent results into one coherent response.\n\n" + "Agent results:\n{results}\n\n" + "Original message: {message}" +) + + +def _make_llm(model: str = "gpt-4o-mini") -> ChatOpenAI: + return ChatOpenAI(model=model, temperature=0, api_key=settings.OPENAI_API_KEY) + + +async def classify_intent( + message: str, + context: dict[str, Any], + reg: AgentRegistry, +) -> str: + """Use gpt-4o-mini to classify intent and return the matching agent name. + + Falls back to ``task_agent`` when the registry is empty or the model + returns a name that is not registered. + """ + agents = reg.list_agents() + if not agents: + return _FALLBACK_AGENT + + system = _CLASSIFY_SYSTEM.format(agents=json.dumps(agents)) + # Truncate context to keep the classification prompt short + human = f"Message: {message}\nContext summary: {json.dumps(context)[:500]}" + + llm = _make_llm() + response = await llm.ainvoke( + [SystemMessage(content=system), HumanMessage(content=human)] + ) + + agent_name = str(response.content).strip().lower() + known = {a["name"] for a in agents} + return agent_name if agent_name in known else _FALLBACK_AGENT + + +async def route_single( + agent_name: str, + message: str, + context: dict[str, Any], + reg: AgentRegistry, +) -> ChatResponse: + """Route to a single agent and wrap the result in a ``ChatResponse``.""" + response_text = await reg.call_agent(agent_name, message, context) + return ChatResponse(response=response_text) + + +async def route_pipeline( + agent_names: list[str], + message: str, + context: dict[str, Any], + reg: AgentRegistry, +) -> ChatResponse: + """Execute agents sequentially; each agent receives previous results in context. + + A final LLM synthesis call merges all results into one coherent response. + """ + previous_results: list[str] = [] + + for agent_name in agent_names: + ctx = {**context, "previous_results": list(previous_results)} + result = await reg.call_agent(agent_name, message, ctx) + previous_results.append(result) + + results_str = "\n\n".join( + f"[{name}]: {res}" for name, res in zip(agent_names, previous_results) + ) + human = _SYNTHESIZE_HUMAN.format(results=results_str, message=message) + llm = _make_llm() + synthesis = await llm.ainvoke([HumanMessage(content=human)]) + return ChatResponse(response=str(synthesis.content)) + + +def _build_plan(agent_name: str, message: str) -> ExecutionPlan: + """Build a minimal ``ExecutionPlan`` for the resolved agent. + + The full ``ExecutionPlanBuilder`` (with template registry and caching) is + implemented in Step 5. This function produces the single-step baseline + plan that the orchestrator returns in ``'plan'`` mode. + """ + return ExecutionPlan( + agent=agent_name, + steps=[ + PlanStep( + action="handle", + prompt_template=f"tpl_{agent_name}_default", + variables={"message": message}, + ) + ], + ) + + +async def orchestrate( + request: ChatRequest, + reg: AgentRegistry | None = None, +) -> ChatResponse | ExecutionPlan: + """Main orchestration entry point. + + * Classifies the user's intent to select an agent. + * ``execution_mode == 'direct'``: routes to the agent and returns a + ``ChatResponse``. + * ``execution_mode == 'plan'``: returns an ``ExecutionPlan`` with the + resolved agent and a template-ID-only step (prompt IP stays server-side). + """ + if reg is None: + reg = _default_registry + + context = request.context.model_dump() + agent_name = await classify_intent(request.message, context, reg) + + if request.execution_mode == "direct": + return await route_single(agent_name, request.message, context, reg) + + # plan mode — return plan, do not execute + return _build_plan(agent_name, request.message) + + +async def orchestrate_stream( + request: ChatRequest, + reg: AgentRegistry | None = None, +) -> AsyncGenerator[str, None]: + """Streaming orchestration — yields text chunks then a final JSON frame. + + The final frame is a JSON object: + ``{"done": true, "response": "...", "actions": []}``. + + Agents do not yet support token-level streaming; the full response is + fetched first, then emitted in fixed-size chunks. Token-level streaming + will be wired in Step 6 when agents expose ``astream()``. + """ + if reg is None: + reg = _default_registry + + context = request.context.model_dump() + agent_name = await classify_intent(request.message, context, reg) + response_text = await reg.call_agent(agent_name, request.message, context) + + chunk_size = 50 + for i in range(0, len(response_text), chunk_size): + yield response_text[i : i + chunk_size] + + final = ChatResponse(response=response_text) + yield json.dumps({"done": True, **final.model_dump()}) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..4432e33 --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,348 @@ +"""Integration tests for the orchestrator module.""" + +from __future__ import annotations + +import json +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.core.agent_registry import AgentRegistry, ChatAgent +from app.core.orchestrator import ( + classify_intent, + orchestrate, + orchestrate_stream, + route_pipeline, + route_single, +) +from app.schemas import ChatContext, ChatRequest, ChatResponse, ExecutionPlan + + +# ── Stub agents ────────────────────────────────────────────────────── + + +class _TaskAgent(ChatAgent): + def get_name(self) -> str: + return "task_agent" + + def get_description(self) -> str: + return "Manages tasks: create, update, list, suggest" + + def get_tools(self) -> list[Any]: + return [] + + async def handle(self, query: str, context: dict[str, Any]) -> str: + return f"task: {query}" + + +class _CalendarAgent(ChatAgent): + def get_name(self) -> str: + return "calendar_agent" + + def get_description(self) -> str: + return "Calendar management: events, conflicts, scheduling" + + def get_tools(self) -> list[Any]: + return [] + + async def handle(self, query: str, context: dict[str, Any]) -> str: + return f"calendar: {query}" + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _mock_llm(response_text: str) -> MagicMock: + """Return a mock LLM that always produces *response_text*.""" + msg = MagicMock() + msg.content = response_text + llm = MagicMock() + llm.ainvoke = AsyncMock(return_value=msg) + return llm + + +# ── Fixtures ───────────────────────────────────────────────────────── + + +@pytest.fixture(autouse=True) +def _fresh_registry(): + """Reset the AgentRegistry singleton between tests.""" + AgentRegistry._instance = None + yield + AgentRegistry._instance = None + + +@pytest.fixture() +def reg() -> AgentRegistry: + r = AgentRegistry() + r.register(_TaskAgent) + r.register(_CalendarAgent) + return r + + +# ── classify_intent ─────────────────────────────────────────────────── + + +class TestClassifyIntent: + @pytest.mark.asyncio + async def test_routes_to_known_agent(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + result = await classify_intent("add a task", {}, reg) + assert result == "task_agent" + + @pytest.mark.asyncio + async def test_routes_to_calendar_agent(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("calendar_agent") + result = await classify_intent("schedule a meeting", {}, reg) + assert result == "calendar_agent" + + @pytest.mark.asyncio + async def test_falls_back_on_unknown_name(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("nonexistent_agent") + result = await classify_intent("do something", {}, reg) + assert result == "task_agent" + + @pytest.mark.asyncio + async def test_empty_registry_returns_fallback_without_llm_call(self) -> None: + empty_reg = AgentRegistry() + # No LLM should be instantiated — early return path + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + result = await classify_intent("anything", {}, empty_reg) + mock_cls.assert_not_called() + assert result == "task_agent" + + @pytest.mark.asyncio + async def test_whitespace_stripped_from_response(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm(" task_agent \n") + result = await classify_intent("create task", {}, reg) + assert result == "task_agent" + + +# ── route_single ───────────────────────────────────────────────────── + + +class TestRouteSingle: + @pytest.mark.asyncio + async def test_returns_chat_response(self, reg: AgentRegistry) -> None: + result = await route_single("task_agent", "create a task", {}, reg) + assert isinstance(result, ChatResponse) + + @pytest.mark.asyncio + async def test_response_contains_agent_output(self, reg: AgentRegistry) -> None: + result = await route_single("task_agent", "create a task", {}, reg) + assert result.response == "task: create a task" + + @pytest.mark.asyncio + async def test_unknown_agent_raises_key_error(self, reg: AgentRegistry) -> None: + with pytest.raises(KeyError): + await route_single("nonexistent", "hello", {}, reg) + + @pytest.mark.asyncio + async def test_actions_default_empty(self, reg: AgentRegistry) -> None: + result = await route_single("task_agent", "hi", {}, reg) + assert result.actions == [] + + +# ── route_pipeline ──────────────────────────────────────────────────── + + +class TestRoutePipeline: + @pytest.mark.asyncio + async def test_returns_chat_response(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("synthesized result") + result = await route_pipeline( + ["task_agent", "calendar_agent"], "plan my week", {}, reg + ) + assert isinstance(result, ChatResponse) + + @pytest.mark.asyncio + async def test_response_is_synthesis_output(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("synthesized result") + result = await route_pipeline( + ["task_agent", "calendar_agent"], "plan my week", {}, reg + ) + assert result.response == "synthesized result" + + @pytest.mark.asyncio + async def test_passes_previous_results_to_subsequent_agents( + self, reg: AgentRegistry + ) -> None: + """Each agent after the first should receive prior outputs in context.""" + received_contexts: list[dict[str, Any]] = [] + + class _CapturingAgent(ChatAgent): + def get_name(self) -> str: + return "capture" + + def get_description(self) -> str: + return "captures context for testing" + + def get_tools(self) -> list[Any]: + return [] + + async def handle(self, query: str, context: dict[str, Any]) -> str: + received_contexts.append(dict(context)) + return "captured" + + reg.register(_CapturingAgent) + + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("done") + await route_pipeline(["task_agent", "capture"], "hi", {}, reg) + + # The second agent (capture) must have received previous results + assert len(received_contexts) == 1 + assert "previous_results" in received_contexts[0] + assert received_contexts[0]["previous_results"] == ["task: hi"] + + @pytest.mark.asyncio + async def test_single_agent_pipeline(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("single result") + result = await route_pipeline(["task_agent"], "one agent", {}, reg) + assert result.response == "single result" + + +# ── orchestrate ─────────────────────────────────────────────────────── + + +class TestOrchestrate: + @pytest.mark.asyncio + async def test_direct_mode_returns_chat_response( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="add a task", execution_mode="direct") + result = await orchestrate(request, reg) + assert isinstance(result, ChatResponse) + + @pytest.mark.asyncio + async def test_direct_mode_response_content(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="add a task", execution_mode="direct") + result = await orchestrate(request, reg) + assert isinstance(result, ChatResponse) + assert result.response == "task: add a task" + + @pytest.mark.asyncio + async def test_plan_mode_returns_execution_plan( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="plan my tasks", execution_mode="plan") + result = await orchestrate(request, reg) + assert isinstance(result, ExecutionPlan) + + @pytest.mark.asyncio + async def test_plan_mode_agent_matches_classified( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("calendar_agent") + request = ChatRequest( + message="schedule something", execution_mode="plan" + ) + result = await orchestrate(request, reg) + assert isinstance(result, ExecutionPlan) + assert result.agent == "calendar_agent" + + @pytest.mark.asyncio + async def test_plan_mode_has_steps(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="plan tasks", execution_mode="plan") + result = await orchestrate(request, reg) + assert isinstance(result, ExecutionPlan) + assert len(result.steps) >= 1 + + @pytest.mark.asyncio + async def test_plan_mode_template_id_contains_agent_name( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="plan tasks", execution_mode="plan") + result = await orchestrate(request, reg) + assert isinstance(result, ExecutionPlan) + assert result.steps[0].prompt_template is not None + assert "task_agent" in result.steps[0].prompt_template + + @pytest.mark.asyncio + async def test_default_execution_mode_is_direct( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + # execution_mode defaults to "direct" + request = ChatRequest(message="help me") + result = await orchestrate(request, reg) + assert isinstance(result, ChatResponse) + + +# ── orchestrate_stream ──────────────────────────────────────────────── + + +class TestOrchestrateStream: + @pytest.mark.asyncio + async def test_yields_at_least_one_chunk(self, reg: AgentRegistry) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="add a task", execution_mode="direct") + chunks = [chunk async for chunk in orchestrate_stream(request, reg)] + assert len(chunks) >= 1 + + @pytest.mark.asyncio + async def test_last_chunk_is_final_json_frame( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="add a task", execution_mode="direct") + chunks = [chunk async for chunk in orchestrate_stream(request, reg)] + + last = json.loads(chunks[-1]) + assert last["done"] is True + assert "response" in last + assert "actions" in last + + @pytest.mark.asyncio + async def test_final_frame_response_matches_agent_output( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest(message="create a task", execution_mode="direct") + chunks = [chunk async for chunk in orchestrate_stream(request, reg)] + + final = json.loads(chunks[-1]) + assert final["response"] == "task: create a task" + + @pytest.mark.asyncio + async def test_text_chunks_before_final_frame( + self, reg: AgentRegistry + ) -> None: + with patch("app.core.orchestrator.ChatOpenAI") as mock_cls: + mock_cls.return_value = _mock_llm("task_agent") + request = ChatRequest( + message="x" * 200, execution_mode="direct" + ) # long enough to produce multiple chunks + chunks = [chunk async for chunk in orchestrate_stream(request, reg)] + + # All but the last chunk should be plain text (not valid final JSON) + non_final = chunks[:-1] + for chunk in non_final: + try: + parsed = json.loads(chunk) + assert parsed.get("done") is not True + except json.JSONDecodeError: + pass # plain text chunk — expected