diff --git a/.env.example b/.env.example index 98945d4..a45f18b 100644 --- a/.env.example +++ b/.env.example @@ -23,21 +23,6 @@ LLM_ROUTER_MODEL=gpt-4o-mini STRIPE_SECRET_KEY= STRIPE_WEBHOOK_SECRET= -# ── AWS / S3 ────────────────────────────────────────────────────────────────── -S3_BUCKET=adiuva -S3_REGION=us-east-1 -S3_ENDPOINT_URL= -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= -# For MinIO (homelab): S3_ENDPOINT_URL=http://minio:9000 - -# ── Vector Store ────────────────────────────────────────────────────────────── -# Pinecone is used when PINECONE_API_KEY is set; otherwise falls back to Qdrant. -PINECONE_API_KEY= -PINECONE_INDEX=adiuva -QDRANT_URL= -QDRANT_API_KEY= -# For local Qdrant (homelab): QDRANT_URL=http://qdrant:6333 # ── Langfuse (leave empty to disable observability) ─────────────────────────── LANGFUSE_SECRET_KEY= diff --git a/.gitignore b/.gitignore index b4418da..4e57c0d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ env/ .pytest_cache/ htmlcov/ .coverage +tests/fixtures/private*/ # Docker *.log diff --git a/README.md b/README.md index 19da6ea..a9bc2fc 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Adiuva Cloud API -**AI-powered project management backend with E2E encrypted cloud storage, LLM orchestration, and a plugin marketplace.** +**AI-powered project management backend with LLM orchestration and subscription billing.** -Built with FastAPI · Python 3.12 · PostgreSQL · LangChain · Stripe · AWS S3 +Built with FastAPI · Python 3.12 · PostgreSQL · LangChain · Stripe --- @@ -20,9 +20,7 @@ Built with FastAPI · Python 3.12 · PostgreSQL · LangChain · Stripe · AWS S3 - [AI Agent System](#ai-agent-system) - [Orchestration & Execution Plans](#orchestration--execution-plans) - [Middleware](#middleware) -- [Storage Layer](#storage-layer) - [Billing & Tiers](#billing--tiers) -- [Plugin Marketplace](#plugin-marketplace) - [Testing](#testing) - [Project Structure](#project-structure) - [License](#license) @@ -31,15 +29,13 @@ Built with FastAPI · Python 3.12 · PostgreSQL · LangChain · Stripe · AWS S3 ## Overview -Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron desktop app**. It provides LLM-powered chat orchestration, end-to-end encrypted cloud storage, a vector search engine, an encrypted backup system, a plugin marketplace with revenue sharing, and Stripe-based subscription billing across four tiers. +Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron desktop app**. It provides LLM-powered chat orchestration, text embedding generation, and Stripe-based subscription billing across four tiers. ### Design Principles -1. **Never persist user data in plaintext** — the database stores only auth, billing, storage metadata, and marketplace data. All user content is E2E encrypted by the client before reaching the server. -2. **Never expose prompts** — system prompts stay server-side; responses are sanitized to strip any leaked prompt fragments. -3. **Never decrypt user blobs** — the backend performs only checksum verification; no decryption keys ever reach the server. -4. **Stateless request handling** — all context comes from the client and JWT; no server-side session state. -5. **Tier gates enforced server-side** — the server always reads the current tier from the database, never trusting client-reported values. +1. **Never expose prompts** — system prompts stay server-side; responses are sanitized to strip any leaked prompt fragments. +2. **Stateless request handling** — all context comes from the client and JWT; no server-side session state. +3. **Tier gates enforced server-side** — the server always reads the current tier from the database, never trusting client-reported values. --- @@ -54,27 +50,26 @@ Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron deskto │ ┌──────────────────┐ ┌────────────────────────────┐ │ │ │ Auth Routes │ │ Chat Routes │ │ │ │ Billing Routes │ │ ↓ │ │ - │ │ Storage Routes │ │ Orchestrator (GPT-4o-mini)│ │ - │ │ Backup Routes │ │ ↓ classify intent │ │ - │ │ Plugin Routes │ │ Agent Registry │ │ - │ │ Vector Routes │ │ ↓ │ │ - │ │ Plans Routes │ │ TaskAgent | ProjectAgent │ │ - │ └──────────────────┘ │ NoteAgent | CheckptAgent │ │ + │ │ Agent Routes │ │ Orchestrator (GPT-4o-mini)│ │ + │ │ Device WS │ │ ↓ classify intent │ │ + │ └──────────────────┘ │ Agent Registry │ │ + │ │ ↓ │ │ + │ │ TaskAgent | ProjectAgent │ │ + │ │ NoteAgent | CheckptAgent │ │ │ │ (GPT-4o + LangChain) │ │ │ └────────────────────────────┘ │ └────────────────────────────────────────────────────────┘ - │ │ │ - ┌────────▼───┐ ┌───────▼───────┐ ┌──▼─────────────┐ - │ PostgreSQL │ │ AWS S3 │ │ Pinecone / │ - │ (Auth, │ │ (E2E blobs, │ │ Qdrant │ - │ Billing, │ │ backups) │ │ (Vectors) │ - │ Metadata) │ └───────────────┘ └────────────────┘ + │ + ┌────────▼───┐ + │ PostgreSQL │ + │ (Auth, │ + │ Billing, │ + │ Agents) │ └────────────┘ │ ┌────────▼───┐ │ Stripe │ - │ (Billing, │ - │ Connect) │ + │ (Billing) │ └────────────┘ ``` @@ -85,18 +80,14 @@ Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron deskto 1. **LLM-powered orchestration** — GPT-4o-mini classifies user intent and routes to the appropriate domain agent. 2. **4 specialized AI agents** — Tasks (8 tools), Projects (6 tools), Timelines (4 tools), Notes (5 tools), all powered by GPT-4o via LangChain. 3. **Execution plans & playbooks** — Server-side prompt template registry; clients receive only opaque template IDs, never raw prompts. -4. **E2E encrypted cloud storage** — The backend never decrypts user data; SHA-256 checksum verification uses constant-time comparison to prevent timing attacks. -5. **Cloud vector store** — Pinecone or Qdrant with user-isolated namespaces and encrypted blob payloads. -6. **Encrypted backup system** — Tiered storage limits with `If-Modified-Since` support for efficient syncing. -7. **Plugin marketplace** — Catalog, admin review/approval workflow, security checklist, and 70/30 revenue sharing via Stripe Connect. -8. **Stripe billing** — Four-tier subscription model (Free / Pro / Power / Team) with checkout sessions and full webhook lifecycle handling. -9. **JWT authentication** — Access + refresh tokens with bcrypt password hashing, SHA-256 token hashing, and automatic rotation. -10. **Prompt IP protection** — Sanitizer middleware strips system prompts, reasoning markers, tool schemas, and agent routing metadata from all chat responses. -11. **Tier-based rate limiting** — Sliding-window per-user limiter scaling from 20 to 200 requests/min by subscription tier. -12. **Zero-trust data model** — User content is never stored in plaintext; the database holds only authentication, billing, and metadata records. -13. **WebSocket streaming** — Real-time chat with 30-second heartbeat keep-alive and chunked text delivery. -14. **Alembic migrations** — Versioned schema management with seed data for the plugin marketplace. -15. **Comprehensive test suite** — In-memory SQLite + moto S3 mocks, per-tier test fixtures, and full API coverage without external dependencies. +4. **Text embeddings** — Generates text-embedding-3-small vectors for local client-side note search. +5. **Stripe billing** — Four-tier subscription model (Free / Pro / Power / Team) with checkout sessions and full webhook lifecycle handling. +6. **JWT authentication** — Access + refresh tokens with bcrypt password hashing, SHA-256 token hashing, and automatic rotation. +7. **Prompt IP protection** — Sanitizer middleware strips system prompts, reasoning markers, tool schemas, and agent routing metadata from all chat responses. +8. **Tier-based rate limiting** — Sliding-window per-user limiter scaling from 20 to 200 requests/min by subscription tier. +9. **WebSocket streaming** — Real-time chat with 30-second heartbeat keep-alive and chunked text delivery. +10. **Alembic migrations** — Versioned schema management. +11. **Comprehensive test suite** — In-memory SQLite, per-tier test fixtures, and full API coverage without external dependencies. --- @@ -114,7 +105,6 @@ Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron deskto | `pydantic-settings` | ≥ 2.7.0 | Environment-based configuration | | `python-jose[cryptography]` | ≥ 3.3.0 | JWT encoding and decoding | | `stripe` | ≥ 11.0.0 | Billing and payment integration | -| `boto3` | ≥ 1.35.0 | AWS S3 client | | `slowapi` | ≥ 0.1.9 | Rate limiting utilities | | `sqlalchemy` | ≥ 2.0.0 | Async ORM and query builder | | `asyncpg` | ≥ 0.30.0 | PostgreSQL async driver | @@ -124,12 +114,9 @@ Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron deskto | `httpx` | ≥ 0.28.0 | Async HTTP client (used in tests) | | `websockets` | ≥ 14.0 | WebSocket protocol support | | `psycopg2-binary` | ≥ 2.9.0 | Synchronous PostgreSQL driver (Alembic) | -| `pinecone` | ≥ 5.0.0 | Pinecone vector store client | -| `qdrant-client` | ≥ 1.7.0 | Qdrant vector store client | | `pytest` | ≥ 8.0.0 | Test framework | | `pytest-asyncio` | ≥ 0.24.0 | Async test support | | `aiosqlite` | ≥ 0.20.0 | In-memory SQLite for tests | -| `moto[s3]` | ≥ 5.0.0 | AWS S3 mock for tests | | `ruff` | ≥ 0.8.0 | Linter and formatter | --- @@ -142,7 +129,6 @@ Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron deskto - PostgreSQL 16+ - An OpenAI API key (for LLM features) - Stripe API keys (optional — billing stubs gracefully when unconfigured) -- AWS credentials (optional — needed for S3 storage in production) ### Installation @@ -194,11 +180,6 @@ This starts two services: - **app** — FastAPI server on port `8000` - **db** — PostgreSQL 16 (Alpine) on port `5432` with a persistent volume and health checks -The compose file also includes optional services for fully local deployments: - -- **minio** — S3-compatible object storage on ports `9000` (API) and `9001` (console) -- **qdrant** — Vector search engine on ports `6333` (HTTP) and `6334` (gRPC) - ### Dockerfile Details The Dockerfile uses a multi-stage build: @@ -216,7 +197,7 @@ gunicorn app.main:app -k uvicorn.workers.UvicornWorker -w 4 --timeout 120 -b 0.0 ## Homelab / Self-Hosted Deployment -You can run the entire stack locally on a homelab with **no cloud dependencies except the LLM provider**. The compose file includes MinIO (S3 replacement) and Qdrant (vector store) out of the box. +You can run the entire stack locally on a homelab with **no cloud dependencies except the LLM provider**. ### 1. Start all services @@ -224,35 +205,14 @@ You can run the entire stack locally on a homelab with **no cloud dependencies e docker compose up -d ``` -This starts PostgreSQL, MinIO, and Qdrant alongside the app. +This starts PostgreSQL alongside the app. -### 2. Create the MinIO bucket - -Open the MinIO console at [http://localhost:9001](http://localhost:9001) (login: `minioadmin` / `minioadmin`) and create a bucket named `adiuva`, or use the CLI: - -```bash -docker compose exec minio mc alias set local http://localhost:9000 minioadmin minioadmin -docker compose exec minio mc mb local/adiuva -``` - -### 3. Configure your `.env` +### 2. Configure your `.env` ```bash # Database (uses the compose PostgreSQL) DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/adiuva -# S3 → MinIO -S3_BUCKET=adiuva -S3_REGION=us-east-1 -S3_ENDPOINT_URL=http://minio:9000 -AWS_ACCESS_KEY_ID=minioadmin -AWS_SECRET_ACCESS_KEY=minioadmin - -# Vector store → local Qdrant (leave PINECONE_API_KEY empty) -QDRANT_URL=http://qdrant:6333 -QDRANT_API_KEY= -PINECONE_API_KEY= - # Billing — leave empty to stub (no Stripe needed) STRIPE_SECRET_KEY= STRIPE_WEBHOOK_SECRET= @@ -267,7 +227,7 @@ JWT_SECRET=your-secret-here ENV=dev ``` -### 4. Run migrations +### 3. Run migrations ```bash docker compose exec app alembic upgrade head @@ -278,9 +238,7 @@ docker compose exec app alembic upgrade head | Service | Runs on | Port | Notes | |---|---|---|---| | FastAPI app | Docker | 8000 | API server | -| PostgreSQL | Docker | 5432 | Auth, billing, metadata | -| MinIO | Docker | 9000 / 9001 | S3-compatible blob & backup storage | -| Qdrant | Docker | 6333 / 6334 | Vector search (replaces Pinecone) | +| PostgreSQL | Docker | 5432 | Auth, billing, agents | | Stripe | — | — | Stubbed when keys are empty | | OpenAI / LLM | Cloud | — | Only external dependency | @@ -300,17 +258,7 @@ All variables are loaded from a `.env` file via Pydantic Settings. Source: `app/ | `JWT_ACCESS_TOKEN_EXPIRE_MINUTES` | `int` | `30` | Access token time-to-live | | `JWT_REFRESH_TOKEN_EXPIRE_DAYS` | `int` | `30` | Refresh token time-to-live | | `STRIPE_SECRET_KEY` | `str` | `""` | Stripe API key (empty = stub mode) | -| `STRIPE_WEBHOOK_SECRET` | `str` | `""` | Stripe webhook signature secret | -| `S3_BUCKET` | `str` | `""` | S3 bucket for encrypted blobs and backups | -| `S3_REGION` | `str` | `us-east-1` | AWS region | -| `S3_ENDPOINT_URL` | `str` | `""` | Custom S3 endpoint (e.g. `http://minio:9000` for MinIO). Leave empty for AWS. | -| `AWS_ACCESS_KEY_ID` | `str` | `""` | AWS credentials | -| `AWS_SECRET_ACCESS_KEY` | `str` | `""` | AWS credentials | -| `PINECONE_API_KEY` | `str` | `""` | Pinecone API key (if set, Pinecone is used for vectors) | -| `PINECONE_INDEX` | `str` | `adiuva` | Pinecone index name | -| `QDRANT_URL` | `str` | `""` | Qdrant URL (used when Pinecone is not configured) | -| `QDRANT_API_KEY` | `str` | `""` | Qdrant API key | -| `OPENAI_API_KEY` | `str` | `""` | OpenAI key for LLM agent calls | +| `STRIPE_WEBHOOK_SECRET` | `str` | `\"\"` | Stripe webhook signature secret |\n| `OPENAI_API_KEY` | `str` | `\"\"` | OpenAI key for LLM agent calls | | `LLM_MODEL` | `str` | `gpt-4o` | LiteLLM model identifier for agents (e.g. `anthropic/claude-3.5-sonnet`, `gemini/gemini-pro`, `ollama/llama3`) | | `LLM_ROUTER_MODEL` | `str` | `gpt-4o-mini` | Lighter model used for intent classification / routing | | `CORS_ORIGINS` | `list[str]` | `["app://.", "http://localhost:3000", "http://localhost:5173"]` | Allowed CORS origins | @@ -342,6 +290,7 @@ All routes are prefixed with `/api/v1`. **27 endpoints** total (25 REST + 1 WebS | Method | Path | Auth | Description | |---|---|---|---| | `POST` | `/api/v1/chat` | JWT | Route message through the orchestrator; returns `ChatResponse` or `ExecutionPlan` depending on execution mode | +| `POST` | `/api/v1/chat/embed` | JWT | Generate a 1536-dim text embedding vector (`text-embedding-3-small`). Used by Electron for local note search. | | `WS` | `/api/v1/chat/stream` | JWT (query param `?token=`) | Streaming chat — first frame is a `ChatRequest`, server yields text chunks, final frame is `{"done": true, "response": "...", "actions": [...]}`. 30-second heartbeat ping. | ### Plans @@ -351,42 +300,6 @@ All routes are prefixed with `/api/v1`. **27 endpoints** total (25 REST + 1 WebS | `GET` | `/api/v1/plans/playbook` | JWT | List all cached execution plan playbooks | | `GET` | `/api/v1/plans/playbook/{plan_id}` | JWT | Retrieve a specific playbook by ID | -### Storage (Cloud Records) - -| Method | Path | Auth | Description | -|---|---|---|---| -| `POST` | `/api/v1/storage/records` | JWT | Upload an E2E encrypted record (verifies checksum, enforces storage quota) | -| `GET` | `/api/v1/storage/records` | JWT | List record metadata with pagination (`?table`, `?page`, `?limit`); no blob bytes returned | -| `GET` | `/api/v1/storage/records/{id}` | JWT | Download encrypted blob with `X-Checksum` response header | -| `PUT` | `/api/v1/storage/records/{id}` | JWT | Replace an existing blob (verifies checksum, enforces quota) | -| `DELETE` | `/api/v1/storage/records/{id}` | JWT | Delete a record and its S3 blob | - -### Vectors (Cloud Vector Store) - -| Method | Path | Auth | Description | -|---|---|---|---| -| `POST` | `/api/v1/storage/vectors/upsert` | JWT | Verify checksums and upsert encrypted vectors | -| `POST` | `/api/v1/storage/vectors/search` | JWT | Search user-scoped vector namespace | -| `DELETE` | `/api/v1/storage/vectors` | JWT | Delete vectors by ID list | - -### Backup - -| Method | Path | Auth | Description | -|---|---|---|---| -| `PUT` | `/api/v1/backup` | JWT | Upload encrypted backup blob with custom headers (`X-Backup-Version`, `X-Backup-Timestamp`, `X-Backup-Checksum`). Tier quota enforced. | -| `GET` | `/api/v1/backup` | JWT | Download latest backup blob. Supports `If-Modified-Since`. | -| `GET` | `/api/v1/backup/history` | JWT | List backup metadata (no blob content) | -| `DELETE` | `/api/v1/backup/{backup_id}` | JWT | Delete a specific backup | - -### Plugins (Marketplace) - -| Method | Path | Auth | Description | -|---|---|---|---| -| `GET` | `/api/v1/plugins` | JWT (Power+) | Browse the marketplace (`?category`, `?q`, `?page`, `?sort=rating\|installs\|newest`) | -| `GET` | `/api/v1/plugins/{id}` | JWT (Power+) | Plugin detail with install count and ratings | -| `POST` | `/api/v1/plugins/{id}/install` | JWT (Power+) | Install plugin; triggers Stripe Connect revenue split for paid plugins | -| `DELETE` | `/api/v1/plugins/{id}/install` | JWT | Uninstall plugin | - ### Billing | Method | Path | Auth | Description | @@ -400,7 +313,7 @@ All routes are prefixed with `/api/v1`. **27 endpoints** total (25 REST + 1 WebS ## Data Model -9 tables managed by Alembic migrations. Source: `app/models.py` +3 tables managed by Alembic migrations. Source: `app/models.py` ### Tables @@ -409,27 +322,18 @@ All routes are prefixed with `/api/v1`. **27 endpoints** total (25 REST + 1 WebS | `users` | `id` (UUID) | `email` (unique), `password_hash`, `tier`, `stripe_customer_id`, timestamps | User accounts | | `refresh_tokens` | `id` (UUID) | `user_id` (FK), `token_hash` (SHA-256, unique), `expires_at` | Hashed refresh tokens for rotation | | `subscriptions` | `id` (UUID) | `user_id` (FK, unique), `stripe_subscription_id`, `tier`, `status`, `current_period_end` | Stripe subscription records | -| `storage_records` | `id` (UUID) | `user_id` (FK), `table_name`, `s3_key`, `checksum`, `size_bytes`, timestamps | S3 blob metadata (no plaintext content) | -| `backup_metadata` | `id` (UUID) | `user_id` (FK), `s3_key`, `version`, `timestamp`, `checksum`, `size_bytes` | Backup manifests | -| `plugins` | `id` (String) | `name`, `description`, `version`, `author_id` (FK), `category`, `price_cents`, `permissions` (JSON), `status`, `s3_package_key`, `install_count`, `avg_rating` | Marketplace plugin catalog | -| `plugin_installations` | `id` (UUID) | `plugin_id` (FK), `user_id` (FK), unique constraint on (`plugin_id`, `user_id`) | Per-user install tracking | -| `plugin_reviews` | `id` (UUID) | `plugin_id` (FK), `reviewer_id` (FK), `decision`, `notes`, `reviewed_at` | Admin review decisions | -| `revenue_events` | `id` (UUID) | `plugin_id` (FK), `user_id` (FK), `amount_cents`, `developer_share_cents`, `stripe_transfer_id` | 70/30 revenue split ledger | ### Enum Types | Enum | Values | |---|---| | `billing_tier` | `free`, `pro`, `power`, `team` | -| `plugin_status` | `pending_review`, `approved`, `rejected` | -| `review_decision` | `approved`, `rejected` | ### Migrations | Version | Description | |---|---| -| `001_initial_schema` | Creates all 9 tables with indexes and foreign key constraints | -| `002_seed_plugins` | Seeds 3 approved plugins: GitHub Sync (free), Slack Notifier (€4.99), Time Tracker (€9.99) | +| `001_initial_schema` | Creates core auth and billing tables with indexes and foreign key constraints | --- @@ -439,7 +343,7 @@ The agent system uses a registry pattern with LangChain tool-calling agents powe ### Architecture -- **`BaseAgent`** — Abstract base with `user_id`, `shared_memory`, and `vector_store_context`. +- **`BaseAgent`** — Abstract base with `user_id` and `shared_memory`. - **`ChatAgent(BaseAgent)`** — Abstract `handle(query, context)` and `get_tools()` methods, plus a shared `_tool_loop(llm, messages, tools, max_iter=5)` for iterative tool calling. - **`AgentRegistry`** — Singleton registry with `@register` decorator, `get(name)`, `list_agents()`, and `call_agent(name, query, context)`. @@ -554,39 +458,6 @@ Source: `app/api/middleware/sanitizer.py` - Scans JSON response bodies and replaces leaked prompt IP fragments with `[REDACTED]`. - Detects: system prompt openers, agent routing metadata, LangChain tool schemas, internal reasoning markers (``, `[INST]`), and known prompt fingerprints. - Logs sanitization events as `WARNING`. -- Binary responses (storage, backup) are never touched. - ---- - -## Storage Layer - -### Blob Store - -Source: `app/storage/blob_store.py` - -- S3-backed storage for E2E encrypted blobs. -- Object keys follow the pattern: `{user_id}/{table}/{record_id}` -- Server-side SSE-S3 encryption at rest (additional layer on top of client-side E2E encryption). -- Methods: `upload()`, `download()`, `delete()` (idempotent), `list_keys()` -- The backend **never inspects or decrypts blob content**. - -### Vector Store - -Source: `app/storage/vector_store.py` - -- Runtime-configurable: **Pinecone** (when `PINECONE_API_KEY` is set) or **Qdrant** (fallback). -- User isolation: Pinecone uses `namespace=user_id`; Qdrant filters by `user_id` payload field. -- 32-dimensional SHA-256-derived float vectors (deterministic, not semantically meaningful on encrypted data — a documented trade-off for privacy). -- Encrypted blobs are stored as base64 in metadata/payload for verbatim retrieval. -- Methods: `upsert()`, `search()`, `delete()` - -### Encryption Utilities - -Source: `app/storage/encryption.py` - -- `verify_checksum(blob, checksum)` — SHA-256 hash comparison using `hmac.compare_digest` (constant-time to prevent timing attacks). -- `reject_if_tampered(blob, checksum)` — Raises HTTP 400 on checksum mismatch. -- **No decryption key ever reaches the backend.** --- @@ -600,11 +471,8 @@ Source: `app/billing/stripe_service.py`, `app/billing/tier_manager.py` |---|---|---|---|---| | AI Agents | 3 | Unlimited | Unlimited | Unlimited | | Batch Active | 2 | 10 | Unlimited | Unlimited | -| Cloud Storage | 0 GB | 5 GB | 25 GB | Unlimited | -| Backup Storage | 0 GB | 5 GB | 25 GB | Unlimited | | LLM Providers | 1 | Unlimited | Unlimited | Unlimited | | Batch Builder | — | — | ✓ | ✓ | -| Plugin Marketplace | — | — | ✓ | ✓ | | SSO | — | — | — | ✓ | | Rate Limit | 20 req/min | 60 req/min | 120 req/min | 200 req/min | @@ -620,47 +488,6 @@ Source: `app/billing/stripe_service.py`, `app/billing/tier_manager.py` - `get_tier(user_id)` — Returns the user's current billing tier. - `check_feature(tier, feature)` — Boolean feature gate check. - `require_feature(tier, feature)` — Raises HTTP 403 if the feature is not available. -- `enforce_quota(user_id, tier)` / `enforce_backup_quota(user_id, tier)` — Raises HTTP 402 if storage limits are exceeded. - ---- - -## Plugin Marketplace - -Source: `app/marketplace/` - -### Plugin Registry - -- PostgreSQL-backed catalog of submitted and approved plugins. -- `list_plugins(db, category, query, page, sort)` — Paginated listing (page size: 20) with optional filtering by category, text search, and sorting by `rating`, `installs`, or `newest`. -- `get_plugin(db, plugin_id)` — Full manifest with install count and ratings. -- `submit_plugin(db, manifest, s3_key)` — Submits a plugin with `pending_review` status. -- `approve_plugin()` / `reject_plugin(reason)` — Admin workflow for plugin approval. -- `record_install()` / `record_uninstall()` — Tracks per-user installations and updates install counts. - -### Review Queue - -- Automated security checklist before human review: - - Plugin ID must match `^[a-z0-9-]+$` - - Permissions must be from the allowed set only - - No binary blobs in the manifest -- **Allowed permissions:** `read:tasks`, `write:tasks`, `read:projects`, `write:projects`, `read:notes`, `write:notes`, `read:timelines`, `write:timelines`, `read:calendar`, `write:calendar` -- `get_pending(db)` — Lists plugins awaiting review. -- `submit_review(db, plugin_id, reviewer_id, decision, notes)` — Records the review decision. - -### Revenue Sharing - -- **70% developer / 30% platform** split on all paid plugin sales. -- `record_install(db, plugin_id, user_id, amount_cents)` — Records the revenue event and triggers a Stripe Connect transfer for the developer share. -- `get_earnings(db, developer_id, period)` — Aggregated earnings report for plugin developers. -- Gracefully stubs transfers when Stripe is not configured. - -### Seed Plugins - -| Plugin | Category | Price | -|---|---|---| -| GitHub Sync | Productivity | Free | -| Slack Notifier | Communication | €4.99 | -| Time Tracker | Productivity | €9.99 | --- @@ -682,10 +509,8 @@ pytest -v ### Test Infrastructure - **Database:** Async SQLite in-memory via `aiosqlite` + `StaticPool` — fast, no PostgreSQL needed. -- **S3 mock:** `moto[s3]` with a fixture that patches `BlobStore` settings. - **Auth helpers:** `make_jwt(tier)` and `auth_header(tier)` generate per-tier test tokens. - **Seed data:** Auto-creates one `User` + `Subscription` per tier (free/pro/power/team) before each test. -- **Plugin seeds:** Fixture adds 3 approved plugins for marketplace tests. - **FK enforcement:** SQLite `PRAGMA foreign_keys=ON`. - **No external dependencies** — all tests run fully offline. @@ -694,13 +519,6 @@ pytest -v | File | Coverage | |---|---| | `test_auth.py` | Register, login, token access, refresh, expiration | -| `test_orchestrator.py` | Intent classification, single agent routing, pipeline, plan mode | -| `test_agents.py` | Each agent with mocked LLM: registration, tools, handle method | -| `test_storage.py` | Create, list, download, update, delete records; checksum rejection; quota enforcement | -| `test_backup.py` | Upload, download, history, delete; tier-based storage limits | -| `test_plugins.py` | List, install, uninstall, revenue events, tier gate enforcement | -| `test_agent_registry.py` | Registry singleton, registration, lookup, listing | -| `test_execution_plan.py` | Plan builder, template registry, plan cache | | `test_middleware.py` | Rate limiting by tier, sanitizer prompt leak detection | --- @@ -710,7 +528,6 @@ pytest -v ``` adiuva-api/ ├── alembic.ini # Alembic configuration -├── BACKEND_PLAN.md # Architecture & design decisions ├── docker-compose.yml # Docker Compose (app + PostgreSQL) ├── Dockerfile # Multi-stage production build ├── requirements.txt # Python dependencies @@ -719,13 +536,12 @@ adiuva-api/ │ ├── env.py # Alembic environment config │ ├── script.py.mako # Migration template │ └── versions/ -│ ├── 001_initial_schema.py # Tables, indexes, FKs -│ └── 002_seed_plugins.py # Seed marketplace plugins +│ └── 001_initial_schema.py # Tables, indexes, FKs │ ├── app/ # Application source │ ├── main.py # FastAPI app factory, middleware, routes │ ├── db.py # Async SQLAlchemy engine & session -│ ├── models.py # SQLAlchemy ORM models (9 tables) +│ ├── models.py # SQLAlchemy ORM models │ ├── schemas.py # Pydantic request/response schemas │ │ │ ├── config/ @@ -734,53 +550,35 @@ adiuva-api/ │ ├── agents/ # LLM-powered domain agents │ │ ├── task_agent.py # Task & comment CRUD (8 tools) │ │ ├── project_agent.py # Project lifecycle (6 tools) -│ │ ├── timeline_agent.py # Milestones (4 tools) +│ │ ├── timeline_agent.py # Milestones (4 tools) │ │ └── note_agent.py # Markdown notes (5 tools) │ │ │ ├── core/ # Orchestration engine │ │ ├── agent_registry.py # BaseAgent, ChatAgent, AgentRegistry │ │ ├── llm.py # LiteLLM factory (get_llm, get_router_llm) -│ │ ├── orchestrator.py # Intent classification & routing -│ │ └── execution_plan.py # Plan builder, templates, cache +│ │ └── deep_agent.py # Deep agent orchestration │ │ │ ├── api/ # HTTP layer │ │ ├── deps.py # Shared FastAPI dependencies │ │ ├── middleware/ -│ │ │ ├── auth.py # JWT validation, live tier lookup │ │ │ ├── rate_limit.py # Sliding-window tier rate limiter │ │ │ └── sanitizer.py # Prompt IP leak protection │ │ └── routes/ │ │ ├── auth.py # Register, login, refresh, me -│ │ ├── chat.py # Chat + WebSocket streaming -│ │ ├── plans.py # Execution plan playbooks -│ │ ├── storage.py # E2E encrypted record CRUD -│ │ ├── vectors.py # Vector upsert, search, delete -│ │ ├── backup.py # Encrypted backup management -│ │ ├── plugins.py # Marketplace browse & install -│ │ └── billing.py # Stripe checkout & webhooks +│ │ ├── chat.py # Chat + embed endpoint +│ │ ├── billing.py # Stripe checkout, webhooks, subscription +│ │ ├── agents.py # Agent catalog, config, runs +│ │ └── device_ws.py # Persistent device WebSocket │ │ -│ ├── storage/ # Storage backends -│ │ ├── blob_store.py # S3 blob storage -│ │ ├── vector_store.py # Pinecone / Qdrant vector store -│ │ └── encryption.py # Checksum verification utilities -│ │ -│ ├── billing/ # Subscription management -│ │ ├── stripe_service.py # Stripe API integration -│ │ └── tier_manager.py # Feature matrix & quota enforcement -│ │ -│ └── marketplace/ # Plugin ecosystem -│ ├── plugin_registry.py # Catalog CRUD & search -│ ├── plugin_review.py # Security checklist & review queue -│ └── revenue_share.py # 70/30 split & Stripe Connect +│ └── billing/ +│ ├── stripe_service.py # Stripe API wrapper +│ └── tier_manager.py # Feature matrix, rate limits │ └── tests/ # Test suite - ├── conftest.py # Fixtures: DB, S3, auth, seeds + ├── conftest.py # Fixtures: DB, auth, seeds ├── test_auth.py ├── test_orchestrator.py ├── test_agents.py - ├── test_storage.py - ├── test_backup.py - ├── test_plugins.py ├── test_agent_registry.py ├── test_execution_plan.py └── test_middleware.py diff --git a/alembic/versions/001_initial_schema.py b/alembic/versions/001_initial_schema.py index 462ee59..ea9895b 100644 --- a/alembic/versions/001_initial_schema.py +++ b/alembic/versions/001_initial_schema.py @@ -1,5 +1,4 @@ -"""Initial schema: users, refresh_tokens, subscriptions, storage_records, -backup_metadata, plugins, plugin_installations, plugin_reviews, revenue_events. +"""Initial schema: users, refresh_tokens, subscriptions. Revision ID: 001 Revises: @@ -28,18 +27,6 @@ def upgrade() -> None: EXCEPTION WHEN duplicate_object THEN NULL; END $$; """) - op.execute(""" - DO $$ BEGIN - CREATE TYPE plugin_status AS ENUM ('pending_review', 'approved', 'rejected'); - EXCEPTION WHEN duplicate_object THEN NULL; - END $$; - """) - op.execute(""" - DO $$ BEGIN - CREATE TYPE review_decision AS ENUM ('approved', 'rejected'); - EXCEPTION WHEN duplicate_object THEN NULL; - END $$; - """) # ── users ───────────────────────────────────────────────────────────── op.create_table( @@ -88,122 +75,10 @@ def upgrade() -> None: op.create_index("ix_subscriptions_user_id", "subscriptions", ["user_id"]) op.create_index("ix_subscriptions_stripe_id", "subscriptions", ["stripe_subscription_id"]) - # ── storage_records ─────────────────────────────────────────────────── - op.create_table( - "storage_records", - sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("table_name", sa.String(100), nullable=False), - sa.Column("s3_key", sa.String(500), nullable=False), - sa.Column("checksum", sa.String(64), nullable=False), - sa.Column("size_bytes", sa.Integer, nullable=False), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), - ) - op.create_index("ix_storage_records_user_id", "storage_records", ["user_id"]) - - # ── backup_metadata ─────────────────────────────────────────────────── - op.create_table( - "backup_metadata", - sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("s3_key", sa.String(500), nullable=False), - sa.Column("version", sa.Integer, nullable=False), - sa.Column("timestamp", sa.BigInteger, nullable=False), - sa.Column("checksum", sa.String(64), nullable=False), - sa.Column("size_bytes", sa.Integer, nullable=False), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), - ) - op.create_index("ix_backup_metadata_user_id", "backup_metadata", ["user_id"]) - - # ── plugins ─────────────────────────────────────────────────────────── - op.create_table( - "plugins", - sa.Column("id", sa.String(255), nullable=False), - sa.Column("name", sa.String(255), nullable=False), - sa.Column("description", sa.Text, nullable=False, server_default=""), - sa.Column("version", sa.String(50), nullable=False, server_default="1.0.0"), - sa.Column("author_id", postgresql.UUID(as_uuid=False), nullable=True), - sa.Column("author_name", sa.String(255), nullable=False, server_default=""), - sa.Column("category", sa.String(100), nullable=False, server_default=""), - sa.Column("price_cents", sa.Integer, nullable=False, server_default="0"), - sa.Column("permissions", sa.Text, nullable=False, server_default="[]"), - sa.Column("status", postgresql.ENUM("pending_review", "approved", "rejected", name="plugin_status", create_type=False), nullable=False, server_default="pending_review"), - sa.Column("s3_package_key", sa.String(500), nullable=True), - sa.Column("install_count", sa.Integer, nullable=False, server_default="0"), - sa.Column("avg_rating", sa.Float, nullable=False, server_default="0.0"), - sa.Column("rejection_reason", sa.Text, nullable=True), - sa.Column("submitted_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["author_id"], ["users.id"], ondelete="SET NULL"), - ) - - # ── plugin_installations ────────────────────────────────────────────── - op.create_table( - "plugin_installations", - sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("plugin_id", sa.String(255), nullable=False), - sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("installed_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["plugin_id"], ["plugins.id"], ondelete="CASCADE"), - sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), - sa.UniqueConstraint("plugin_id", "user_id", name="uq_plugin_user"), - ) - op.create_index("ix_plugin_installations_plugin_id", "plugin_installations", ["plugin_id"]) - op.create_index("ix_plugin_installations_user_id", "plugin_installations", ["user_id"]) - - # ── plugin_reviews ──────────────────────────────────────────────────── - op.create_table( - "plugin_reviews", - sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("plugin_id", sa.String(255), nullable=False), - sa.Column("reviewer_id", postgresql.UUID(as_uuid=False), nullable=True), - sa.Column("decision", postgresql.ENUM("approved", "rejected", name="review_decision", create_type=False), nullable=False), - sa.Column("notes", sa.Text, nullable=True), - sa.Column("reviewed_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["plugin_id"], ["plugins.id"], ondelete="CASCADE"), - sa.ForeignKeyConstraint(["reviewer_id"], ["users.id"], ondelete="SET NULL"), - ) - op.create_index("ix_plugin_reviews_plugin_id", "plugin_reviews", ["plugin_id"]) - - # ── revenue_events ──────────────────────────────────────────────────── - op.create_table( - "revenue_events", - sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("plugin_id", sa.String(255), nullable=False), - sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), - sa.Column("amount_cents", sa.Integer, nullable=False, server_default="0"), - sa.Column("developer_share_cents", sa.Integer, nullable=False, server_default="0"), - sa.Column("stripe_transfer_id", sa.String(255), nullable=True), - sa.Column("paid_at", sa.DateTime(timezone=True), nullable=True), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), - sa.PrimaryKeyConstraint("id"), - sa.ForeignKeyConstraint(["plugin_id"], ["plugins.id"], ondelete="CASCADE"), - sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), - ) - op.create_index("ix_revenue_events_plugin_id", "revenue_events", ["plugin_id"]) - op.create_index("ix_revenue_events_user_id", "revenue_events", ["user_id"]) - def downgrade() -> None: - op.drop_table("revenue_events") - op.drop_table("plugin_reviews") - op.drop_table("plugin_installations") - op.drop_table("plugins") - op.drop_table("backup_metadata") - op.drop_table("storage_records") op.drop_table("subscriptions") op.drop_table("refresh_tokens") op.drop_table("users") - op.execute("DROP TYPE IF EXISTS review_decision") - op.execute("DROP TYPE IF EXISTS plugin_status") op.execute("DROP TYPE IF EXISTS billing_tier") diff --git a/alembic/versions/002_seed_plugins.py b/alembic/versions/002_seed_plugins.py deleted file mode 100644 index e38fcaa..0000000 --- a/alembic/versions/002_seed_plugins.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Seed approved plugins: GitHub Sync, Slack Notifier, Time Tracker. - -Revision ID: 002 -Revises: 001 -Create Date: 2026-03-03 -""" - -from __future__ import annotations - -import json -from datetime import datetime, timezone -from typing import Sequence, Union - -import sqlalchemy as sa -from alembic import op - -revision: str = "002" -down_revision: Union[str, None] = "001" -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - -_SEED_PLUGINS = [ - { - "id": "plugin-github-sync", - "name": "GitHub Sync", - "description": "Sync tasks with GitHub Issues and pull requests.", - "version": "1.0.0", - "author_name": "Adiuva", - "category": "productivity", - "price_cents": 0, - "permissions": json.dumps(["read:tasks", "write:tasks"]), - "status": "approved", - "s3_package_key": "plugins/plugin-github-sync/1.0.0/package.zip", - "install_count": 0, - "avg_rating": 0.0, - }, - { - "id": "plugin-slack-notify", - "name": "Slack Notifier", - "description": "Post task and timeline updates to Slack channels.", - "version": "1.2.0", - "author_name": "Adiuva", - "category": "communication", - "price_cents": 499, - "permissions": json.dumps(["read:tasks", "read:timelines"]), - "status": "approved", - "s3_package_key": "plugins/plugin-slack-notify/1.2.0/package.zip", - "install_count": 0, - "avg_rating": 0.0, - }, - { - "id": "plugin-time-tracker", - "name": "Time Tracker", - "description": "Track time spent on tasks with automatic reporting.", - "version": "0.9.1", - "author_name": "Third Party", - "category": "productivity", - "price_cents": 999, - "permissions": json.dumps(["read:tasks", "write:tasks"]), - "status": "approved", - "s3_package_key": "plugins/plugin-time-tracker/0.9.1/package.zip", - "install_count": 0, - "avg_rating": 0.0, - }, -] - - -def upgrade() -> None: - plugins = sa.table( - "plugins", - sa.column("id", sa.String), - sa.column("name", sa.String), - sa.column("description", sa.Text), - sa.column("version", sa.String), - sa.column("author_name", sa.String), - sa.column("category", sa.String), - sa.column("price_cents", sa.Integer), - sa.column("permissions", sa.Text), - sa.column("status", sa.Enum("pending_review", "approved", "rejected", name="plugin_status")), - sa.column("s3_package_key", sa.String), - sa.column("install_count", sa.Integer), - sa.column("avg_rating", sa.Float), - ) - op.bulk_insert(plugins, _SEED_PLUGINS) - - -def downgrade() -> None: - op.execute( - "DELETE FROM plugins WHERE id IN (" - "'plugin-github-sync', 'plugin-slack-notify', 'plugin-time-tracker'" - ")" - ) diff --git a/app/api/middleware/sanitizer.py b/app/api/middleware/sanitizer.py index 570937f..4dd3531 100644 --- a/app/api/middleware/sanitizer.py +++ b/app/api/middleware/sanitizer.py @@ -8,8 +8,7 @@ that could reveal server-side prompt IP: - Internal reasoning markers (, , [INST], …) - Exact-match known prompt fingerprints -Binary responses (storage blobs, backup data) are never touched — the -middleware only activates for paths under /api/v1/chat. +The middleware only activates for paths under /api/v1/chat. Any sanitisation event is logged as a WARNING with the request path and the names of the fields that were modified. diff --git a/app/api/routes/backup.py b/app/api/routes/backup.py deleted file mode 100644 index 2b8eeae..0000000 --- a/app/api/routes/backup.py +++ /dev/null @@ -1,171 +0,0 @@ -"""Backup routes: upload, download, history, and delete E2E-encrypted backups. - -Blobs are stored in S3 via BlobStore. Backup metadata is persisted in the -PostgreSQL ``backup_metadata`` table. - -IMPORTANT: GET /history must be declared BEFORE GET / to avoid FastAPI -treating "history" as a ``{backup_id}`` path parameter. -""" - -from __future__ import annotations - -import uuid -from email.utils import parsedate_to_datetime - -from fastapi import APIRouter, Depends, Header, HTTPException, Request, Response, status -from sqlalchemy import func, select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.api.deps import get_current_user -from app.billing.tier_manager import tier_manager -from app.db import get_session -from app.models import BackupMetadata as BackupMetadataModel -from app.schemas import BackupMetadata, UserProfile -from app.storage.blob_store import BlobStore -from app.storage.encryption import reject_if_tampered - -router = APIRouter(prefix="/backup", tags=["backup"]) - -_blob_store = BlobStore() - - -async def _current_backup_bytes(user_id: str, db: AsyncSession) -> int: - """Return total backup bytes stored by *user_id*.""" - result = await db.execute( - select(func.coalesce(func.sum(BackupMetadataModel.size_bytes), 0)).where( - BackupMetadataModel.user_id == user_id - ) - ) - return int(result.scalar_one()) - - -async def _check_backup_quota( - user: UserProfile, size_bytes: int, db: AsyncSession -) -> None: - """Raise HTTP 402 if the upload would exceed the tier's backup limit.""" - current = await _current_backup_bytes(user.id, db) - tier_manager.enforce_backup_quota( - user.tier, current_bytes=current, additional_bytes=size_bytes - ) - - -@router.put("") -async def upload_backup( - request: Request, - x_backup_version: int = Header(..., alias="X-Backup-Version"), - x_backup_timestamp: int = Header(..., alias="X-Backup-Timestamp"), - x_backup_checksum: str = Header(..., alias="X-Backup-Checksum"), - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, bool]: - """Upload an E2E-encrypted backup blob. - - Metadata is passed via custom headers; the raw body is the encrypted blob. - """ - blob = await request.body() - reject_if_tampered(blob, x_backup_checksum) - await _check_backup_quota(current_user, len(blob), db) - - s3_key = await _blob_store.upload( - current_user.id, "backup", str(x_backup_timestamp), blob, x_backup_checksum - ) - - row = BackupMetadataModel( - id=str(uuid.uuid4()), - user_id=current_user.id, - s3_key=s3_key, - version=x_backup_version, - timestamp=x_backup_timestamp, - checksum=x_backup_checksum, - size_bytes=len(blob), - ) - db.add(row) - await db.commit() - - return {"ok": True} - - -@router.get("/history", response_model=list[BackupMetadata]) -async def backup_history( - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> list[BackupMetadata]: - """Return backup metadata records for the authenticated user (no blob bytes).""" - result = await db.execute( - select(BackupMetadataModel) - .where(BackupMetadataModel.user_id == current_user.id) - .order_by(BackupMetadataModel.timestamp.desc()) - ) - rows = result.scalars().all() - return [ - BackupMetadata( - version=r.version, - timestamp=r.timestamp, - checksum=r.checksum, - chunk_count=1, - ) - for r in rows - ] - - -@router.get("") -async def download_backup( - request: Request, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> Response: - """Download the latest backup blob. Supports ``If-Modified-Since``.""" - result = await db.execute( - select(BackupMetadataModel) - .where(BackupMetadataModel.user_id == current_user.id) - .order_by(BackupMetadataModel.timestamp.desc()) - .limit(1) - ) - latest = result.scalar_one_or_none() - if latest is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No backup found") - - ims_header = request.headers.get("If-Modified-Since") - if ims_header: - try: - ims_dt = parsedate_to_datetime(ims_header) - ims_ms = int(ims_dt.timestamp() * 1000) - if latest.timestamp <= ims_ms: - return Response(status_code=status.HTTP_304_NOT_MODIFIED) - except Exception: - pass # malformed header — ignore and serve the blob - - blob = await _blob_store.download(current_user.id, latest.s3_key) - return Response( - content=blob, - media_type="application/octet-stream", - headers={ - "X-Backup-Version": str(latest.version), - "X-Backup-Timestamp": str(latest.timestamp), - "X-Checksum": latest.checksum, - }, - ) - - -@router.delete("/{backup_id}", response_model=dict) -async def delete_backup( - backup_id: str, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, bool]: - """Delete a specific backup by ID.""" - result = await db.execute( - select(BackupMetadataModel).where( - BackupMetadataModel.id == backup_id, - BackupMetadataModel.user_id == current_user.id, - ) - ) - target = result.scalar_one_or_none() - if target is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Backup not found") - - await _blob_store.delete(current_user.id, target.s3_key) - await db.delete(target) - await db.commit() - - return {"ok": True} diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py index 6270d0e..00c01ec 100644 --- a/app/api/routes/chat.py +++ b/app/api/routes/chat.py @@ -1,4 +1,4 @@ -"""Chat routes: POST /chat (REST fallback). +"""Chat routes: POST /chat (REST fallback) and POST /chat/embed (text → vector). WebSocket chat is handled by the unified device WS endpoint (/api/v1/ws/device). """ @@ -7,14 +7,30 @@ from __future__ import annotations from fastapi import APIRouter, Depends from fastapi.responses import JSONResponse +from pydantic import BaseModel from app.api.deps import get_current_user from app.core.deep_agent import run_home +from app.core.llm import embed from app.schemas import ChatRequest, UserProfile router = APIRouter(prefix="/chat", tags=["chat"]) +# ── Embed helpers ───────────────────────────────────────────────────────── + + +class _EmbedRequest(BaseModel): + text: str + + +class _EmbedResponse(BaseModel): + vector: list[float] + + +# ── Endpoints ───────────────────────────────────────────────────────────── + + @router.post("") async def chat( body: ChatRequest, @@ -27,3 +43,17 @@ async def chat( context=body.context.model_dump(), ) return JSONResponse(content={"response": response}) + + +@router.post("/embed", response_model=_EmbedResponse) +async def embed_text( + body: _EmbedRequest, + current_user: UserProfile = Depends(get_current_user), +) -> _EmbedResponse: + """Generate a 1536-dim embedding vector for the given text. + + Uses ``text-embedding-3-small`` via OpenAI. Auth required (JWT). + Used by Electron (vectordb.ts) for local note search. + """ + vector = await embed(body.text) + return _EmbedResponse(vector=vector) diff --git a/app/api/routes/plugins.py b/app/api/routes/plugins.py deleted file mode 100644 index f3a2e6e..0000000 --- a/app/api/routes/plugins.py +++ /dev/null @@ -1,148 +0,0 @@ -"""Plugins routes: browse and install plugins from the marketplace. - -Backed by ``PluginRegistry`` and ``RevenueShare`` service classes that -persist data in the PostgreSQL ``plugins`` and ``revenue_events`` tables. -""" - -from __future__ import annotations - -from typing import Any, Literal - -from fastapi import APIRouter, Depends, HTTPException, Query, status -from pydantic import BaseModel -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.api.deps import get_current_user -from app.db import get_session -from app.marketplace.plugin_registry import registry -from app.marketplace.revenue_share import revenue_share -from app.models import PluginInstallation, PluginReview as PluginReviewModel -from app.schemas import PluginInstallRequest, PluginListResponse, PluginManifest, UserProfile - -router = APIRouter(prefix="/plugins", tags=["plugins"]) - - -# ── Tier gate ───────────────────────────────────────────────────────── - -def _require_plugin_tier(user: UserProfile) -> None: - """Raise HTTP 403 for users below Power tier.""" - if user.tier not in ("power", "team"): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Plugin marketplace requires Power tier or above", - ) - - -# ── Local detail schema ──────────────────────────────────────────────── - -class _PluginDetail(BaseModel): - plugin: PluginManifest - install_count: int - ratings: list[Any] - - -# ── Routes ──────────────────────────────────────────────────────────── - -@router.get("", response_model=PluginListResponse) -async def list_plugins( - category: str | None = Query(default=None), - q: str | None = Query(default=None), - page: int = Query(default=1, ge=1), - sort: Literal["rating", "installs", "newest"] = Query(default="newest"), - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> PluginListResponse: - """Browse the plugin marketplace. Requires Power tier or above.""" - _require_plugin_tier(current_user) - return await registry.list_plugins(db, category=category, query=q, page=page, sort=sort) - - -@router.get("/{plugin_id}", response_model=_PluginDetail) -async def get_plugin( - plugin_id: str, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> _PluginDetail: - """Get full plugin details including install count. Requires Power tier or above.""" - _require_plugin_tier(current_user) - entry = await registry.get_plugin(db, plugin_id) - if entry is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Plugin not found") - - # Fetch review ratings for this plugin - review_result = await db.execute( - select(PluginReviewModel).where(PluginReviewModel.plugin_id == plugin_id) - ) - reviews = review_result.scalars().all() - ratings = [ - { - "reviewer_id": r.reviewer_id, - "decision": r.decision, - "notes": r.notes, - "reviewed_at": int(r.reviewed_at.timestamp() * 1000) if r.reviewed_at else None, - } - for r in reviews - ] - - return _PluginDetail( - plugin=entry["manifest"], - install_count=entry["install_count"], - ratings=ratings, - ) - - -@router.post("/{plugin_id}/install", response_model=dict) -async def install_plugin( - plugin_id: str, - body: PluginInstallRequest, # noqa: ARG001 — reserved for future fields - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - """Install a plugin. Triggers Stripe Connect revenue split for paid plugins. - - Requires Power tier or above. - """ - _require_plugin_tier(current_user) - entry = await registry.get_plugin(db, plugin_id) - if entry is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Plugin not found") - - # Record the installation in plugin_installations - installation = PluginInstallation( - plugin_id=plugin_id, - user_id=current_user.id, - ) - db.add(installation) - await db.flush() - - await revenue_share.record_install( - db, - plugin_id=plugin_id, - user_id=current_user.id, - amount_cents=entry["manifest"].price_cents, - ) - - download_url = f"https://cdn.adiuva.app/plugins/{plugin_id}/package.zip" - return {"ok": True, "download_url": download_url} - - -@router.delete("/{plugin_id}/install", response_model=dict) -async def uninstall_plugin( - plugin_id: str, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, bool]: - """Unregister a plugin installation.""" - result = await db.execute( - select(PluginInstallation).where( - PluginInstallation.plugin_id == plugin_id, - PluginInstallation.user_id == current_user.id, - ) - ) - installation = result.scalar_one_or_none() - if installation is not None: - await db.delete(installation) - await db.commit() - await registry.record_uninstall(db, plugin_id) - return {"ok": True} diff --git a/app/api/routes/storage.py b/app/api/routes/storage.py deleted file mode 100644 index ae71abd..0000000 --- a/app/api/routes/storage.py +++ /dev/null @@ -1,195 +0,0 @@ -"""Storage routes: CRUD for E2E-encrypted cloud records. - -Blobs are stored in S3 via BlobStore. Record metadata is persisted in the -PostgreSQL ``storage_records`` table. -""" - -from __future__ import annotations - -import uuid - -from fastapi import APIRouter, Depends, HTTPException, Query, Response, status -from pydantic import BaseModel -from sqlalchemy import func, select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.api.deps import get_current_user -from app.billing.tier_manager import tier_manager -from app.db import get_session -from app.models import StorageRecord -from app.schemas import StorageRecordCreate, StorageRecordUpdate, UserProfile -from app.storage.blob_store import BlobStore -from app.storage.encryption import reject_if_tampered - -router = APIRouter(prefix="/storage", tags=["storage"]) - -_blob_store = BlobStore() - - -# ── Local response schemas ───────────────────────────────────────────── - -class _CreateResponse(BaseModel): - id: str - created_at: int - - -class _RecordMeta(BaseModel): - id: str - table: str - checksum: str - created_at: int - updated_at: int - - -# ── Helpers ──────────────────────────────────────────────────────────── - -async def _current_usage_bytes(user_id: str, db: AsyncSession) -> int: - """Return total bytes stored by *user_id*.""" - result = await db.execute( - select(func.coalesce(func.sum(StorageRecord.size_bytes), 0)).where( - StorageRecord.user_id == user_id - ) - ) - return int(result.scalar_one()) - - -async def _check_quota(user: UserProfile, additional_bytes: int, db: AsyncSession) -> None: - """Raise HTTP 402 if adding *additional_bytes* would exceed the tier limit.""" - current = await _current_usage_bytes(user.id, db) - tier_manager.enforce_quota(user.tier, current_bytes=current, additional_bytes=additional_bytes) - - -async def _get_record_for_user( - record_id: str, user_id: str, db: AsyncSession -) -> StorageRecord: - """Look up a record and verify ownership. Returns 404 on mismatch - to prevent user enumeration attacks.""" - result = await db.execute( - select(StorageRecord).where( - StorageRecord.id == record_id, StorageRecord.user_id == user_id - ) - ) - record = result.scalar_one_or_none() - if record is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found") - return record - - -# ── Routes ───────────────────────────────────────────────────────────── - -@router.post("/records", response_model=_CreateResponse, status_code=status.HTTP_201_CREATED) -async def create_record( - body: StorageRecordCreate, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> _CreateResponse: - """Upload a new E2E-encrypted blob. Verifies checksum before storing.""" - reject_if_tampered(body.blob, body.checksum) - await _check_quota(current_user, len(body.blob), db) - - record_id = str(uuid.uuid4()) - - s3_key = await _blob_store.upload( - current_user.id, body.table, record_id, body.blob, body.checksum - ) - - record = StorageRecord( - id=record_id, - user_id=current_user.id, - table_name=body.table, - s3_key=s3_key, - checksum=body.checksum, - size_bytes=len(body.blob), - ) - db.add(record) - await db.commit() - await db.refresh(record) - - created_at_ms = int(record.created_at.timestamp() * 1000) - return _CreateResponse(id=record_id, created_at=created_at_ms) - - -@router.get("/records", response_model=list[_RecordMeta]) -async def list_records( - table: str | None = Query(default=None), - page: int = Query(default=1, ge=1), - limit: int = Query(default=50, ge=1, le=200), - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> list[_RecordMeta]: - """List record metadata for the authenticated user. Blob bytes are never returned.""" - query = select(StorageRecord).where(StorageRecord.user_id == current_user.id) - if table is not None: - query = query.where(StorageRecord.table_name == table) - query = query.offset((page - 1) * limit).limit(limit) - - result = await db.execute(query) - rows = result.scalars().all() - - return [ - _RecordMeta( - id=r.id, - table=r.table_name, - checksum=r.checksum, - created_at=int(r.created_at.timestamp() * 1000), - updated_at=int(r.updated_at.timestamp() * 1000), - ) - for r in rows - ] - - -@router.get("/records/{record_id}") -async def download_record( - record_id: str, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> Response: - """Download an E2E-encrypted blob. Returns raw bytes with ``X-Checksum`` header.""" - record = await _get_record_for_user(record_id, current_user.id, db) - blob = await _blob_store.download(current_user.id, record.s3_key) - return Response( - content=blob, - media_type="application/octet-stream", - headers={"X-Checksum": record.checksum}, - ) - - -@router.put("/records/{record_id}", response_model=dict) -async def update_record( - record_id: str, - body: StorageRecordUpdate, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, bool]: - """Replace the blob for an existing record. Verifies checksum before storing.""" - record = await _get_record_for_user(record_id, current_user.id, db) - reject_if_tampered(body.blob, body.checksum) - - delta = len(body.blob) - record.size_bytes - if delta > 0: - await _check_quota(current_user, delta, db) - - s3_key = await _blob_store.upload( - current_user.id, record.table_name, record_id, body.blob, body.checksum - ) - - record.s3_key = s3_key - record.checksum = body.checksum - record.size_bytes = len(body.blob) - await db.commit() - - return {"ok": True} - - -@router.delete("/records/{record_id}", response_model=dict) -async def delete_record( - record_id: str, - current_user: UserProfile = Depends(get_current_user), - db: AsyncSession = Depends(get_session), -) -> dict[str, bool]: - """Delete a record and its S3 blob.""" - record = await _get_record_for_user(record_id, current_user.id, db) - await _blob_store.delete(current_user.id, record.s3_key) - await db.delete(record) - await db.commit() - return {"ok": True} diff --git a/app/api/routes/vectors.py b/app/api/routes/vectors.py deleted file mode 100644 index a03e602..0000000 --- a/app/api/routes/vectors.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Vectors routes: upsert, search, delete cloud vector store entries, and embed text.""" - -from __future__ import annotations - -from fastapi import APIRouter, Depends -from pydantic import BaseModel - -from app.api.deps import get_current_user -from app.core.llm import embed -from app.schemas import ( - UserProfile, - VectorSearchRequest, - VectorSearchResponse, - VectorUpsertRequest, -) -from app.storage.encryption import reject_if_tampered -from app.storage.vector_store import VectorStore - -router = APIRouter(prefix="/storage", tags=["vectors"]) - -_vector_store = VectorStore() - - -class _VectorDeleteRequest(BaseModel): - ids: list[str] - - -class _EmbedRequest(BaseModel): - text: str - - -class _EmbedResponse(BaseModel): - vector: list[float] - - -@router.post("/vectors/upsert", response_model=dict) -async def upsert_vectors( - body: VectorUpsertRequest, - current_user: UserProfile = Depends(get_current_user), -) -> dict[str, int]: - """Verify checksums and store encrypted vectors in the user-scoped namespace.""" - for item in body.vectors: - reject_if_tampered(item.blob, item.checksum) - await _vector_store.upsert(current_user.id, body.vectors) - return {"upserted": len(body.vectors)} - - -@router.post("/vectors/search", response_model=VectorSearchResponse) -async def search_vectors( - body: VectorSearchRequest, - current_user: UserProfile = Depends(get_current_user), -) -> VectorSearchResponse: - """Search the user-scoped vector namespace with an encrypted query blob.""" - results = await _vector_store.search(current_user.id, body.query_blob, body.top_k) - return VectorSearchResponse(results=results) - - -@router.delete("/vectors", response_model=dict) -async def delete_vectors( - body: _VectorDeleteRequest, - current_user: UserProfile = Depends(get_current_user), -) -> dict[str, bool]: - """Delete vectors by ID, scoped to the authenticated user.""" - await _vector_store.delete(current_user.id, body.ids) - return {"ok": True} - - -@router.post("/vectors/embed", response_model=_EmbedResponse) -async def embed_text( - body: _EmbedRequest, - current_user: UserProfile = Depends(get_current_user), -) -> _EmbedResponse: - """Generate a 1536-dim embedding vector for the given text. - - Uses ``text-embedding-3-small`` via OpenAI. Auth required (JWT). - Used by backend tools (note_agent) and Electron (vectordb.ts) alike. - """ - vector = await embed(body.text) - return _EmbedResponse(vector=vector) diff --git a/app/billing/tier_manager.py b/app/billing/tier_manager.py index ed5f3de..06dd050 100644 --- a/app/billing/tier_manager.py +++ b/app/billing/tier_manager.py @@ -22,44 +22,32 @@ FEATURES: dict[str, dict[str, Any]] = { "agents": 3, "batch_active": 2, "batch_runs_per_day": 5, - "cloud_storage_gb": 0, - "backup_gb": 0, "providers": 1, "batch_builder": False, - "plugin_marketplace": False, "sso": False, }, "pro": { "agents": -1, # unlimited "batch_active": 10, "batch_runs_per_day": 50, - "cloud_storage_gb": 5, - "backup_gb": 5, "providers": -1, "batch_builder": False, - "plugin_marketplace": False, "sso": False, }, "power": { "agents": -1, "batch_active": -1, # unlimited "batch_runs_per_day": -1, # unlimited - "cloud_storage_gb": 25, - "backup_gb": 25, "providers": -1, "batch_builder": True, - "plugin_marketplace": True, "sso": False, }, "team": { "agents": -1, "batch_active": -1, "batch_runs_per_day": -1, # unlimited - "cloud_storage_gb": -1, # unlimited - "backup_gb": -1, # unlimited "providers": -1, "batch_builder": True, - "plugin_marketplace": True, "sso": True, }, } @@ -125,71 +113,6 @@ class TierManager: """Return the requests-per-minute limit for ``tier``.""" return RATE_LIMITS.get(tier, RATE_LIMITS["free"]) - # ── Storage quota ──────────────────────────────────────────────────── - - def enforce_quota( - self, - tier: BillingTier, - current_bytes: int = 0, - additional_bytes: int = 0, - ) -> None: - """Raise ``HTTP 402`` if the user would exceed their cloud storage quota. - - ``tier`` is the caller's current tier (from ``current_user.tier``). - ``current_bytes`` is the total bytes already stored (queried by caller). - """ - limit_gb: int = FEATURES[tier]["cloud_storage_gb"] - if limit_gb == 0: - raise HTTPException( - status_code=status.HTTP_402_PAYMENT_REQUIRED, - detail=f"Cloud storage is not available on the '{tier}' tier", - ) - if limit_gb == -1: - return # unlimited - limit_bytes = limit_gb * 1024 ** 3 - if current_bytes + additional_bytes > limit_bytes: - raise HTTPException( - status_code=status.HTTP_402_PAYMENT_REQUIRED, - detail=f"Storage quota exceeded for tier '{tier}'", - ) - - def enforce_backup_quota( - self, - tier: BillingTier, - current_bytes: int = 0, - additional_bytes: int = 0, - ) -> None: - """Raise ``HTTP 402`` if the user would exceed their backup quota.""" - limit_gb: int = FEATURES[tier]["backup_gb"] - if limit_gb == 0: - raise HTTPException( - status_code=status.HTTP_402_PAYMENT_REQUIRED, - detail=f"Backup is not available on the '{tier}' tier", - ) - if limit_gb == -1: - return # unlimited - limit_bytes = limit_gb * 1024 ** 3 - if current_bytes + additional_bytes > limit_bytes: - raise HTTPException( - status_code=status.HTTP_402_PAYMENT_REQUIRED, - detail=f"Backup quota exceeded for tier '{tier}'", - ) - - def check_quota( - self, - tier: BillingTier, - current_bytes: int = 0, - additional_bytes: int = 0, - ) -> bool: - """Return ``True`` if the user can store ``additional_bytes`` more data.""" - limit_gb: int = FEATURES[tier]["cloud_storage_gb"] - if limit_gb == 0: - return False - if limit_gb == -1: - return True - limit_bytes = limit_gb * 1024 ** 3 - return current_bytes + additional_bytes <= limit_bytes - # Module-level singleton shared across the app. tier_manager = TierManager() diff --git a/app/config/settings.py b/app/config/settings.py index 88b4de8..c461126 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -12,17 +12,6 @@ class Settings(BaseSettings): STRIPE_SECRET_KEY: str = "" STRIPE_WEBHOOK_SECRET: str = "" - S3_BUCKET: str = "" - S3_REGION: str = "us-east-1" - S3_ENDPOINT_URL: str = "" - AWS_ACCESS_KEY_ID: str = "" - AWS_SECRET_ACCESS_KEY: str = "" - - PINECONE_API_KEY: str = "" - PINECONE_INDEX: str = "adiuva" - QDRANT_URL: str = "" - QDRANT_API_KEY: str = "" - OPENAI_API_KEY: str = "" ANTHROPIC_API_KEY: str = "" GOOGLE_API_KEY: str = "" diff --git a/app/main.py b/app/main.py index ff5f5b2..c1859d6 100644 --- a/app/main.py +++ b/app/main.py @@ -50,14 +50,10 @@ def create_app() -> FastAPI: app.add_middleware(SanitizerMiddleware) app.add_middleware(TierRateLimitMiddleware) - from app.api.routes import agents, auth, backup, billing, chat, device_ws, plugins, storage, vectors + from app.api.routes import agents, auth, billing, chat, device_ws app.include_router(auth.router, prefix="/api/v1") app.include_router(chat.router, prefix="/api/v1") - app.include_router(storage.router, prefix="/api/v1") - app.include_router(vectors.router, prefix="/api/v1") - app.include_router(backup.router, prefix="/api/v1") - app.include_router(plugins.router, prefix="/api/v1") app.include_router(billing.router, prefix="/api/v1") app.include_router(agents.router, prefix="/api/v1") app.include_router(device_ws.router, prefix="/api/v1") diff --git a/app/marketplace/__init__.py b/app/marketplace/__init__.py deleted file mode 100644 index 99c27bc..0000000 --- a/app/marketplace/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Plugin marketplace package. - -Three service classes introduced in Step 10: - - ``PluginRegistry`` — catalog, submit/approve/reject, install counts - - ``ReviewQueue`` — approval workflow + security checklist - - ``RevenueShare`` — 70/30 split tracking and Stripe Connect payouts -""" diff --git a/app/marketplace/plugin_registry.py b/app/marketplace/plugin_registry.py deleted file mode 100644 index 0bc7fbe..0000000 --- a/app/marketplace/plugin_registry.py +++ /dev/null @@ -1,212 +0,0 @@ -"""Plugin catalog registry backed by PostgreSQL. - -Maintains the authoritative list of plugins, their review status, and -aggregate install counts. All data is persisted in the ``plugins`` table. - -Module-level singleton:: - - from app.marketplace.plugin_registry import registry -""" - -from __future__ import annotations - -import json -from typing import Any, Literal - -from sqlalchemy import select, func -from sqlalchemy.ext.asyncio import AsyncSession - -from app.models import Plugin -from app.schemas import PluginListResponse, PluginManifest - -_PAGE_SIZE = 20 - - -def _plugin_to_manifest(p: Plugin) -> PluginManifest: - """Convert an ORM ``Plugin`` row to a Pydantic ``PluginManifest``.""" - try: - permissions = json.loads(p.permissions) if p.permissions else [] - except (json.JSONDecodeError, TypeError): - permissions = [] - return PluginManifest( - id=p.id, - name=p.name, - description=p.description, - version=p.version, - author=p.author_name, - permissions=permissions, - category=p.category, - price_cents=p.price_cents, - ) - - -class PluginRegistry: - """PostgreSQL-backed plugin catalog. - - All methods accept an ``AsyncSession`` parameter so the calling route - controls the session lifecycle. - """ - - # ── Queries ────────────────────────────────────────────────────── - - async def list_plugins( - self, - db: AsyncSession, - category: str | None = None, - query: str | None = None, - page: int = 1, - sort: Literal["rating", "installs", "newest"] = "newest", - ) -> PluginListResponse: - """Return a page of approved plugins, optionally filtered and sorted.""" - base = select(Plugin).where(Plugin.status == "approved") - - if category: - base = base.where(Plugin.category == category) - if query: - pattern = f"%{query}%" - base = base.where( - Plugin.name.ilike(pattern) | Plugin.description.ilike(pattern) - ) - - # Count - count_q = select(func.count()).select_from(base.subquery()) - total = (await db.execute(count_q)).scalar_one() - - # Sort - if sort == "installs": - base = base.order_by(Plugin.install_count.desc()) - elif sort == "rating": - base = base.order_by(Plugin.avg_rating.desc()) - else: # newest - base = base.order_by(Plugin.created_at.desc()) - - base = base.offset((page - 1) * _PAGE_SIZE).limit(_PAGE_SIZE) - rows = (await db.execute(base)).scalars().all() - - return PluginListResponse( - plugins=[_plugin_to_manifest(r) for r in rows], - total=total, - page=page, - ) - - async def get_plugin(self, db: AsyncSession, plugin_id: str) -> dict[str, Any] | None: - """Return ``{manifest, status, install_count, avg_rating}`` or ``None``.""" - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - p = result.scalar_one_or_none() - if p is None: - return None - return { - "manifest": _plugin_to_manifest(p), - "status": p.status, - "install_count": p.install_count, - "avg_rating": p.avg_rating, - } - - # ── Mutations ──────────────────────────────────────────────────── - - async def submit_plugin( - self, - db: AsyncSession, - manifest: PluginManifest, - package_s3_key: str, - ) -> str: - """Add *manifest* to the catalog with ``status='pending_review'``. - - Returns the plugin_id. If a plugin with the same id already exists - it is overwritten (re-submission after rejection). - """ - plugin_id = manifest.id - existing = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = existing.scalar_one_or_none() - - if row is not None: - row.name = manifest.name - row.description = manifest.description - row.version = manifest.version - row.author_name = manifest.author - row.category = manifest.category - row.price_cents = manifest.price_cents - row.permissions = json.dumps(manifest.permissions) - row.status = "pending_review" - row.s3_package_key = package_s3_key - row.rejection_reason = None - else: - row = Plugin( - id=plugin_id, - name=manifest.name, - description=manifest.description, - version=manifest.version, - author_name=manifest.author, - category=manifest.category, - price_cents=manifest.price_cents, - permissions=json.dumps(manifest.permissions), - status="pending_review", - s3_package_key=package_s3_key, - install_count=0, - avg_rating=0.0, - ) - db.add(row) - await db.commit() - return plugin_id - - async def approve_plugin(self, db: AsyncSession, plugin_id: str) -> None: - """Set *plugin_id* status to ``'approved'``. - - Raises ``KeyError`` if the plugin is not found. - """ - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = result.scalar_one_or_none() - if row is None: - raise KeyError(f"Plugin not found: {plugin_id}") - row.status = "approved" - row.rejection_reason = None - await db.commit() - - async def reject_plugin(self, db: AsyncSession, plugin_id: str, reason: str) -> None: - """Set *plugin_id* status to ``'rejected'`` and record the reason. - - Raises ``KeyError`` if the plugin is not found. - """ - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = result.scalar_one_or_none() - if row is None: - raise KeyError(f"Plugin not found: {plugin_id}") - row.status = "rejected" - row.rejection_reason = reason - await db.commit() - - async def record_install(self, db: AsyncSession, plugin_id: str) -> None: - """Increment the install count for *plugin_id* (no-op if not found).""" - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = result.scalar_one_or_none() - if row is not None: - row.install_count = row.install_count + 1 - await db.commit() - - async def record_uninstall(self, db: AsyncSession, plugin_id: str) -> None: - """Decrement the install count for *plugin_id*, floored at 0.""" - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = result.scalar_one_or_none() - if row is not None: - row.install_count = max(0, row.install_count - 1) - await db.commit() - - # ── Internal helpers used by ReviewQueue ───────────────────────── - - async def get_pending_entries(self, db: AsyncSession) -> list[dict[str, Any]]: - """Return all entries with status='pending_review'.""" - result = await db.execute( - select(Plugin).where(Plugin.status == "pending_review") - ) - rows = result.scalars().all() - return [ - { - "manifest": _plugin_to_manifest(r), - "submitted_at": int(r.submitted_at.timestamp()) if r.submitted_at else 0, - } - for r in rows - ] - - -# Module-level singleton -registry = PluginRegistry() diff --git a/app/marketplace/plugin_review.py b/app/marketplace/plugin_review.py deleted file mode 100644 index 28a5764..0000000 --- a/app/marketplace/plugin_review.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Plugin review workflow backed by PostgreSQL. - -Manages the approval queue for newly submitted plugins and enforces a -security checklist before any plugin is made visible in the marketplace. - -Module-level singleton:: - - from app.marketplace.plugin_review import review_queue -""" - -from __future__ import annotations - -import re -from typing import Any, Literal - -from sqlalchemy.ext.asyncio import AsyncSession - -from app.marketplace.plugin_registry import registry -from app.models import PluginReview as PluginReviewModel -from app.schemas import PluginManifest - -# ── Security policy ─────────────────────────────────────────────────── - -ALLOWED_PERMISSIONS: frozenset[str] = frozenset( - { - "read:tasks", - "write:tasks", - "read:projects", - "write:projects", - "read:notes", - "write:notes", - "read:timelines", - "write:timelines", - "read:calendar", - "write:calendar", - } -) - -_PLUGIN_ID_RE = re.compile(r"^[a-z0-9-]+$") - - -def validate_manifest(manifest: PluginManifest) -> None: - """Enforce the plugin security checklist. - - Raises: - ``ValueError`` on the first violation found. Callers should catch - this and return HTTP 422 / reject the submission. - - Checks: - 1. Plugin id matches ``^[a-z0-9-]+$`` - 2. All declared permissions are in ``ALLOWED_PERMISSIONS`` - 3. No manifest field contains raw binary data - """ - if not _PLUGIN_ID_RE.match(manifest.id): - raise ValueError( - f"Invalid plugin id format: '{manifest.id}'. " - "Only lowercase letters, digits, and hyphens are allowed." - ) - - for perm in manifest.permissions: - if perm not in ALLOWED_PERMISSIONS: - raise ValueError( - f"Unknown permission: '{perm}'. " - f"Allowed permissions: {sorted(ALLOWED_PERMISSIONS)}" - ) - - for field_name, value in manifest.model_dump().items(): - if isinstance(value, (bytes, bytearray)): - raise ValueError( - f"Binary content is not allowed in manifest field '{field_name}'." - ) - - -class ReviewQueue: - """Approval queue for pending plugin submissions. - - Delegates status changes to the shared ``PluginRegistry`` singleton. - Review records are persisted in the ``plugin_reviews`` table. - """ - - async def get_pending(self, db: AsyncSession) -> list[dict[str, Any]]: - """Return all plugins currently awaiting review. - - Each item is ``{plugin_id, manifest, submitted_at}``. - """ - entries = await registry.get_pending_entries(db) - return [ - { - "plugin_id": e["manifest"].id, - "manifest": e["manifest"], - "submitted_at": e["submitted_at"], - } - for e in entries - ] - - async def submit_review( - self, - db: AsyncSession, - plugin_id: str, - reviewer_id: str, - decision: Literal["approved", "rejected"], - notes: str = "", - ) -> None: - """Record a review decision and update the plugin's status. - - Raises: - ``KeyError`` if *plugin_id* is not found in the registry. - """ - if decision == "approved": - await registry.approve_plugin(db, plugin_id) - else: - await registry.reject_plugin(db, plugin_id, reason=notes) - - review = PluginReviewModel( - plugin_id=plugin_id, - reviewer_id=reviewer_id, - decision=decision, - notes=notes, - ) - db.add(review) - await db.commit() - - -# Module-level singleton -review_queue = ReviewQueue() diff --git a/app/marketplace/revenue_share.py b/app/marketplace/revenue_share.py deleted file mode 100644 index 05f1d9f..0000000 --- a/app/marketplace/revenue_share.py +++ /dev/null @@ -1,233 +0,0 @@ -"""Revenue share tracking and Stripe Connect payouts backed by PostgreSQL. - -Records every plugin installation as a revenue event and facilitates -70 % / 30 % payouts to developers via Stripe Connect. Data is persisted -in the ``revenue_events`` table. - -Module-level singleton:: - - from app.marketplace.revenue_share import revenue_share -""" - -from __future__ import annotations - -import logging -from datetime import datetime, timezone -from typing import Any - -import stripe as stripe_lib -from sqlalchemy import extract, func, select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config.settings import settings -from app.marketplace.plugin_registry import registry -from app.models import Plugin, RevenueEvent - -logger = logging.getLogger(__name__) - -# ── Revenue split constants ─────────────────────────────────────────── - -DEVELOPER_SHARE: float = 0.70 -PLATFORM_SHARE: float = 0.30 - - -class RevenueShare: - """Records installation revenue events and coordinates developer payouts. - - Stripe Connect calls are gracefully stubbed when ``STRIPE_SECRET_KEY`` - is not configured, consistent with the rest of the billing layer. - """ - - # ── Helpers ────────────────────────────────────────────────────── - - @staticmethod - def _stripe_configured() -> bool: - return bool(settings.STRIPE_SECRET_KEY) - - @staticmethod - def _stripe() -> Any: - stripe_lib.api_key = settings.STRIPE_SECRET_KEY - return stripe_lib - - # ── Core operations ────────────────────────────────────────────── - - async def record_install( - self, - db: AsyncSession, - plugin_id: str, - user_id: str, - amount_cents: int, - ) -> None: - """Record a plugin installation and trigger a Stripe Connect charge if paid. - - For free plugins (``amount_cents == 0``) no payment is initiated but - the event is still recorded for analytics. - - For paid plugins the developer receives 70 % via a Stripe Connect - destination charge. If Stripe is not configured or the charge fails - the installation still succeeds (the event is recorded and the install - count is incremented) — a warning is logged for monitoring. - """ - developer_share_cents = int(amount_cents * DEVELOPER_SHARE) - stripe_transfer_id: str | None = None - - if amount_cents > 0 and self._stripe_configured(): - # Look up the plugin's author Stripe account from the DB - result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - plugin_row = result.scalar_one_or_none() - developer_stripe_account: str | None = None - if plugin_row and plugin_row.author_id: - # Future: look up user.stripe_connect_account_id - developer_stripe_account = None # no real account yet - - if developer_stripe_account: - try: - s = self._stripe() - transfer = s.Transfer.create( - amount=developer_share_cents, - currency="eur", - destination=developer_stripe_account, - description=f"Revenue share for plugin {plugin_id}", - metadata={"plugin_id": plugin_id, "user_id": user_id}, - ) - stripe_transfer_id = transfer["id"] - except Exception as exc: - logger.warning( - "Stripe Connect transfer failed for plugin %s: %s", - plugin_id, - exc, - ) - else: - logger.debug( - "No Stripe account on file for plugin %s developer; " - "skipping transfer.", - plugin_id, - ) - - event = RevenueEvent( - plugin_id=plugin_id, - user_id=user_id, - amount_cents=amount_cents, - developer_share_cents=developer_share_cents, - stripe_transfer_id=stripe_transfer_id, - ) - db.add(event) - await db.commit() - - await registry.record_install(db, plugin_id) - - async def get_earnings( - self, - db: AsyncSession, - developer_id: str, - period: str | None = None, - ) -> dict[str, Any]: - """Return aggregated earnings for *developer_id*. - - ``period`` is an optional ``YYYY-MM`` string to restrict the window. - - Returns:: - - { - "developer_id": str, - "period": str | None, - "total_installs": int, - "total_revenue_cents": int, - "developer_share_cents": int, - } - """ - # Find plugin ids belonging to this developer (by author_name match) - plugin_q = select(Plugin.id).where(Plugin.author_name == developer_id) - plugin_result = await db.execute(plugin_q) - developer_plugin_ids = [row[0] for row in plugin_result.all()] - - if not developer_plugin_ids: - return { - "developer_id": developer_id, - "period": period, - "total_installs": 0, - "total_revenue_cents": 0, - "developer_share_cents": 0, - } - - query = select( - func.count().label("total_installs"), - func.coalesce(func.sum(RevenueEvent.amount_cents), 0).label("total_revenue"), - func.coalesce(func.sum(RevenueEvent.developer_share_cents), 0).label("dev_share"), - ).where(RevenueEvent.plugin_id.in_(developer_plugin_ids)) - - if period: - # Filter by YYYY-MM: extract year and month from created_at - try: - year, month = period.split("-") - query = query.where( - extract("year", RevenueEvent.created_at) == int(year), - extract("month", RevenueEvent.created_at) == int(month), - ) - except ValueError: - pass # invalid period format — return all - - result = await db.execute(query) - row = result.one() - - return { - "developer_id": developer_id, - "period": period, - "total_installs": row.total_installs, - "total_revenue_cents": row.total_revenue, - "developer_share_cents": row.dev_share, - } - - async def payout_developer(self, db: AsyncSession, plugin_id: str, period: str) -> None: - """Aggregate unpaid revenue for *period* and issue a Stripe Transfer. - - Marks processed events with ``paid_at`` timestamp. - Stubs gracefully when Stripe is not configured. - """ - try: - year, month = period.split("-") - year_int, month_int = int(year), int(month) - except ValueError: - logger.warning("Invalid period format: %s", period) - return - - result = await db.execute( - select(RevenueEvent).where( - RevenueEvent.plugin_id == plugin_id, - RevenueEvent.paid_at.is_(None), - extract("year", RevenueEvent.created_at) == year_int, - extract("month", RevenueEvent.created_at) == month_int, - ) - ) - unpaid = list(result.scalars().all()) - - total_dev_share = sum(e.developer_share_cents for e in unpaid) - if total_dev_share <= 0 or not unpaid: - logger.debug("Nothing to pay out for plugin %s in period %s", plugin_id, period) - return - - if self._stripe_configured(): - plugin_result = await db.execute(select(Plugin).where(Plugin.id == plugin_id)) - plugin_row = plugin_result.scalar_one_or_none() - developer_stripe_account: str | None = None # Future: fetch from DB - if plugin_row and developer_stripe_account: - try: - s = self._stripe() - s.Transfer.create( - amount=total_dev_share, - currency="eur", - destination=developer_stripe_account, - description=f"Payout for plugin {plugin_id} period {period}", - ) - except Exception as exc: - logger.warning("Payout transfer failed for plugin %s: %s", plugin_id, exc) - return - - paid_ts = datetime.now(timezone.utc) - for event in unpaid: - event.paid_at = paid_ts - await db.commit() - - -# Module-level singleton -revenue_share = RevenueShare() diff --git a/app/models.py b/app/models.py index 7a9b732..fea6054 100644 --- a/app/models.py +++ b/app/models.py @@ -1,19 +1,15 @@ """SQLAlchemy ORM models for all persistent tables. -Only auth, billing, storage metadata, and marketplace data live here. -User content (notes, tasks, etc.) is NEVER persisted server-side — -it lives in E2E-encrypted blobs in S3, referenced by storage_records. +Only auth, billing, agent config, and memory data live here. +User content (notes, tasks, etc.) lives exclusively on the client. Table inventory: users — account credentials + tier refresh_tokens — hashed refresh token store subscriptions — Stripe subscription records - storage_records — S3 blob metadata (no plaintext) - backup_metadata — encrypted backup manifests - plugins — marketplace plugin catalog - plugin_installations — per-user install records - plugin_reviews — admin review decisions - revenue_events — Stripe Connect 70/30 split ledger + local_agent_configs — per-device batch agent configs + cloud_agent_configs — OAuth-backed cloud agent configs + agent_run_logs — execution history for all agents memory_core — per-user persistent key/value preferences (encrypted) memory_associative — per-user semantic memory with embeddings (encrypted) memory_episodic — per-user session summaries (encrypted) @@ -26,7 +22,6 @@ import uuid from datetime import datetime, timezone from sqlalchemy import ( - BigInteger, Boolean, DateTime, Enum, @@ -36,7 +31,6 @@ from sqlalchemy import ( JSON, String, Text, - UniqueConstraint, Uuid, func, ) @@ -58,8 +52,6 @@ def _now() -> datetime: # ── Enum types ──────────────────────────────────────────────────────────── TierEnum = Enum("free", "pro", "power", "team", name="billing_tier") -PluginStatusEnum = Enum("pending_review", "approved", "rejected", name="plugin_status") -ReviewDecisionEnum = Enum("approved", "rejected", name="review_decision") AgentTypeEnum = Enum("local", "cloud", name="agent_type") AgentStatusEnum = Enum("running", "success", "error", "partial", name="agent_run_status") CloudProviderEnum = Enum("gmail", "teams", "outlook", name="cloud_provider") @@ -137,151 +129,6 @@ class Subscription(Base): user: Mapped[User] = relationship(back_populates="subscription") -class StorageRecord(Base): - __tablename__ = "storage_records" - - id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), primary_key=True, default=_uuid - ) - user_id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True - ) - table_name: Mapped[str] = mapped_column(String(100), nullable=False) - s3_key: Mapped[str] = mapped_column(String(500), nullable=False) - checksum: Mapped[str] = mapped_column(String(64), nullable=False) - size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) - created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - updated_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() - ) - - -class BackupMetadata(Base): - __tablename__ = "backup_metadata" - - id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), primary_key=True, default=_uuid - ) - user_id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True - ) - s3_key: Mapped[str] = mapped_column(String(500), nullable=False) - version: Mapped[int] = mapped_column(Integer, nullable=False) - timestamp: Mapped[int] = mapped_column(BigInteger, nullable=False) - checksum: Mapped[str] = mapped_column(String(64), nullable=False) - size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) - created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - - -class Plugin(Base): - __tablename__ = "plugins" - - id: Mapped[str] = mapped_column(String(255), primary_key=True) - name: Mapped[str] = mapped_column(String(255), nullable=False) - description: Mapped[str] = mapped_column(Text, nullable=False, default="") - version: Mapped[str] = mapped_column(String(50), nullable=False, default="1.0.0") - # nullable until developer account system is built - author_id: Mapped[str | None] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="SET NULL"), nullable=True - ) - author_name: Mapped[str] = mapped_column(String(255), nullable=False, default="") - category: Mapped[str] = mapped_column(String(100), nullable=False, default="") - price_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - permissions: Mapped[str] = mapped_column(Text, nullable=False, default="[]") # JSON list - status: Mapped[str] = mapped_column(PluginStatusEnum, nullable=False, default="pending_review") - s3_package_key: Mapped[str | None] = mapped_column(String(500), nullable=True) - install_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - avg_rating: Mapped[float] = mapped_column(Float, nullable=False, default=0.0) - rejection_reason: Mapped[str | None] = mapped_column(Text, nullable=True) - submitted_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - - installations: Mapped[list[PluginInstallation]] = relationship( - back_populates="plugin", cascade="all, delete-orphan" - ) - reviews: Mapped[list[PluginReview]] = relationship( - back_populates="plugin", cascade="all, delete-orphan" - ) - revenue_events: Mapped[list[RevenueEvent]] = relationship( - back_populates="plugin", cascade="all, delete-orphan" - ) - - -class PluginInstallation(Base): - __tablename__ = "plugin_installations" - __table_args__ = (UniqueConstraint("plugin_id", "user_id", name="uq_plugin_user"),) - - id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), primary_key=True, default=_uuid - ) - plugin_id: Mapped[str] = mapped_column( - String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True - ) - user_id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True - ) - installed_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - - plugin: Mapped[Plugin] = relationship(back_populates="installations") - - -class PluginReview(Base): - __tablename__ = "plugin_reviews" - - id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), primary_key=True, default=_uuid - ) - plugin_id: Mapped[str] = mapped_column( - String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True - ) - reviewer_id: Mapped[str | None] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="SET NULL"), nullable=True - ) - decision: Mapped[str] = mapped_column(ReviewDecisionEnum, nullable=False) - notes: Mapped[str | None] = mapped_column(Text, nullable=True) - reviewed_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - - plugin: Mapped[Plugin] = relationship(back_populates="reviews") - - -class RevenueEvent(Base): - __tablename__ = "revenue_events" - - id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), primary_key=True, default=_uuid - ) - plugin_id: Mapped[str] = mapped_column( - String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True - ) - user_id: Mapped[str] = mapped_column( - Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True - ) - amount_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - developer_share_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - stripe_transfer_id: Mapped[str | None] = mapped_column(String(255), nullable=True) - paid_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) - created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - - plugin: Mapped[Plugin] = relationship(back_populates="revenue_events") - - class LocalAgentConfig(Base): __tablename__ = "local_agent_configs" diff --git a/app/schemas.py b/app/schemas.py index 77568dd..d0301fd 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -50,88 +50,6 @@ class ChatResponse(BaseModel): response: str -# ── Backup ─────────────────────────────────────────────────────────── - -class BackupMetadata(BaseModel): - version: int - timestamp: int - checksum: str - chunk_count: int - - -# ── Cloud Storage (E2E encrypted blobs) ────────────────────────────── - -class StorageRecord(BaseModel): - id: str - user_id: str - table: str - blob: bytes - checksum: str - created_at: int - updated_at: int - - -class StorageRecordCreate(BaseModel): - table: str - blob: bytes - checksum: str - - -class StorageRecordUpdate(BaseModel): - blob: bytes - checksum: str - - -# ── Cloud Vector Store (E2E encrypted vectors) ──────────────────────── - -class VectorItem(BaseModel): - id: str - blob: bytes # encrypted vector + metadata — backend never decrypts - checksum: str - - -class VectorUpsertRequest(BaseModel): - vectors: list[VectorItem] - - -class VectorSearchRequest(BaseModel): - query_blob: bytes # encrypted query — backend never decrypts - top_k: int = 10 - - -class VectorSearchResult(BaseModel): - id: str - score: float - blob: bytes - - -class VectorSearchResponse(BaseModel): - results: list[VectorSearchResult] - - -# ── Plugin Marketplace ──────────────────────────────────────────────── - -class PluginManifest(BaseModel): - id: str - name: str - description: str - version: str - author: str - permissions: list[str] - category: str - price_cents: int = 0 - - -class PluginListResponse(BaseModel): - plugins: list[PluginManifest] - total: int - page: int - - -class PluginInstallRequest(BaseModel): - plugin_id: str - - # ── WebSocket Frame Protocol ────────────────────────────────────────── class WsFrameType(str, Enum): diff --git a/app/storage/__init__.py b/app/storage/__init__.py deleted file mode 100644 index 9223ba7..0000000 --- a/app/storage/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Cloud storage layer — E2E encrypted blobs and vectors.""" diff --git a/app/storage/blob_store.py b/app/storage/blob_store.py deleted file mode 100644 index 3aedfa6..0000000 --- a/app/storage/blob_store.py +++ /dev/null @@ -1,106 +0,0 @@ -"""S3-backed store for E2E-encrypted blobs. - -Keys are structured as ``{user_id}/{table}/{record_id}``. -The backend never inspects blob content — it stores and retrieves opaque bytes. -""" - -from __future__ import annotations - -from typing import Any - -import boto3 - -from app.config.settings import settings - - -class BlobStore: - """Thin wrapper around boto3 S3. - - All blobs must be E2E encrypted by the client before upload. - The backend adds SSE-S3 as an extra layer of at-rest encryption - but cannot decrypt the inner client-side payload. - """ - - def _client(self) -> Any: - kwargs: dict[str, Any] = { - "region_name": settings.S3_REGION, - "aws_access_key_id": settings.AWS_ACCESS_KEY_ID, - "aws_secret_access_key": settings.AWS_SECRET_ACCESS_KEY, - } - if settings.S3_ENDPOINT_URL and isinstance(settings.S3_ENDPOINT_URL, str): - kwargs["endpoint_url"] = settings.S3_ENDPOINT_URL - return boto3.client("s3", **kwargs) - - @staticmethod - def _key(user_id: str, table: str, record_id: str) -> str: - return f"{user_id}/{table}/{record_id}" - - async def upload( - self, - user_id: str, - table: str, - record_id: str, - blob: bytes, - checksum: str, - ) -> str: - """Store *blob* in S3 and return the S3 key. - - Args: - user_id: Owner of the blob (used as key prefix). - table: Logical table name (e.g. ``"tasks"``). - record_id: Record UUID. - blob: Raw bytes (pre-encrypted by client). - checksum: SHA-256 hex digest supplied by the client; stored as - object metadata for download-time verification. - - Returns: - The S3 key under which the blob was stored. - """ - key = self._key(user_id, table, record_id) - self._client().put_object( - Bucket=settings.S3_BUCKET, - Key=key, - Body=blob, - ServerSideEncryption="AES256", # SSE-S3 at rest - Metadata={"checksum": checksum}, - ) - return key - - async def download(self, user_id: str, s3_key: str) -> bytes: - """Retrieve the blob stored at *s3_key*. - - *user_id* is retained in the signature so higher-level code can - enforce ownership without re-parsing the key. - - Raises: - ``botocore.exceptions.ClientError`` with code ``NoSuchKey`` if the - object does not exist. - """ - response = self._client().get_object( - Bucket=settings.S3_BUCKET, - Key=s3_key, - ) - return response["Body"].read() - - async def delete(self, user_id: str, s3_key: str) -> None: - """Delete the object at *s3_key*. - - S3 ``delete_object`` is idempotent — it succeeds even if the key does - not exist. - """ - self._client().delete_object( - Bucket=settings.S3_BUCKET, - Key=s3_key, - ) - - async def list_keys(self, user_id: str, table: str) -> list[str]: - """Return all S3 keys for a given user + table combination. - - Uses the prefix ``{user_id}/{table}/`` to scope the listing. - """ - prefix = f"{user_id}/{table}/" - response = self._client().list_objects_v2( - Bucket=settings.S3_BUCKET, - Prefix=prefix, - ) - return [obj["Key"] for obj in response.get("Contents", [])] diff --git a/app/storage/encryption.py b/app/storage/encryption.py deleted file mode 100644 index 2dfefa2..0000000 --- a/app/storage/encryption.py +++ /dev/null @@ -1,32 +0,0 @@ -"""Integrity verification only — the backend NEVER decrypts user data.""" - -from __future__ import annotations - -import hashlib -import hmac - -from fastapi import HTTPException - - -def verify_checksum(blob: bytes, checksum: str) -> bool: - """Return ``True`` if SHA-256(blob) matches *checksum*. - - Uses ``hmac.compare_digest`` for constant-time comparison to prevent - timing-based side-channel attacks. - """ - computed = hashlib.sha256(blob).hexdigest() - return hmac.compare_digest(computed, checksum) - - -def reject_if_tampered(blob: bytes, checksum: str) -> None: - """Raise ``HTTP 400`` if the blob does not match its checksum. - - Call this before storing or forwarding any client-provided blob. - The backend never holds decryption keys — this check only verifies - that the opaque bytes arrived intact. - """ - if not verify_checksum(blob, checksum): - raise HTTPException( - status_code=400, - detail="Checksum mismatch: blob integrity check failed", - ) diff --git a/app/storage/vector_store.py b/app/storage/vector_store.py deleted file mode 100644 index a2d5c32..0000000 --- a/app/storage/vector_store.py +++ /dev/null @@ -1,205 +0,0 @@ -"""Cloud vector store — wraps Pinecone (default) or Qdrant. - -Vectors are pre-encrypted blobs from the client. The backend stores them -alongside a deterministic 32-dim float representation derived from the blob's -SHA-256 hash. Semantic ANN search is not meaningful on encrypted data — this -is a known trade-off documented in the backend plan. - -Isolation: Pinecone uses ``namespace=user_id``; Qdrant filters by -``user_id`` payload field on a shared collection. -""" - -from __future__ import annotations - -import base64 -import hashlib -from typing import Any - -from pinecone import Pinecone -from qdrant_client import QdrantClient -from qdrant_client.models import FieldCondition, Filter, MatchValue, PointIdsList, PointStruct - -from app.config.settings import settings -from app.schemas import VectorItem, VectorSearchResult - -_QDRANT_COLLECTION = "adiuva_vectors" - - -def _blob_to_vector(blob: bytes) -> list[float]: - """Derive a 32-dim float vector from *blob* for storage purposes only. - - Uses SHA-256 to produce a deterministic 32-byte fingerprint, then - normalises each byte to the range [-1.0, 1.0]. This vector carries no - semantic meaning on encrypted data. - """ - return [(b - 128) / 128.0 for b in hashlib.sha256(blob).digest()] - - -class VectorStore: - """Thin wrapper around Pinecone or Qdrant. - - The backend to use is selected at runtime: - - Pinecone: when ``settings.PINECONE_API_KEY`` is non-empty. - - Qdrant: otherwise (requires ``settings.QDRANT_URL``). - """ - - def _use_pinecone(self) -> bool: - return bool(settings.PINECONE_API_KEY) - - # ── Pinecone helpers ────────────────────────────────────────────── - - def _pinecone_index(self) -> Any: - pc = Pinecone(api_key=settings.PINECONE_API_KEY) - return pc.Index(settings.PINECONE_INDEX) - - # ── Qdrant helpers ──────────────────────────────────────────────── - - def _qdrant_client(self) -> Any: - return QdrantClient( - url=settings.QDRANT_URL, - api_key=settings.QDRANT_API_KEY or None, - ) - - # ── Public API ──────────────────────────────────────────────────── - - async def upsert(self, user_id: str, vectors: list[VectorItem]) -> None: - """Store encrypted vectors in the backend. - - Each ``VectorItem.blob`` is base64-encoded and kept in metadata/payload - so it can be returned verbatim during search. - - Args: - user_id: Used as Pinecone namespace or Qdrant payload field. - vectors: List of encrypted vector items from the client. - """ - if self._use_pinecone(): - await self._pinecone_upsert(user_id, vectors) - else: - await self._qdrant_upsert(user_id, vectors) - - async def search( - self, - user_id: str, - query_blob: bytes, - top_k: int, - ) -> list[VectorSearchResult]: - """Query the vector store and return encrypted result blobs. - - The query vector is derived from *query_blob* using the same - deterministic mapping as upsert. - - Args: - user_id: Scopes the search to this user's namespace. - query_blob: Encrypted query from the client. - top_k: Maximum number of results to return. - - Returns: - List of ``VectorSearchResult`` with ``id``, ``score``, and ``blob``. - """ - if self._use_pinecone(): - return await self._pinecone_search(user_id, query_blob, top_k) - return await self._qdrant_search(user_id, query_blob, top_k) - - async def delete(self, user_id: str, vector_ids: list[str]) -> None: - """Remove vectors by ID, scoped to *user_id*. - - Args: - user_id: Namespace / payload filter to prevent cross-user deletion. - vector_ids: List of vector IDs to remove. - """ - if self._use_pinecone(): - await self._pinecone_delete(user_id, vector_ids) - else: - await self._qdrant_delete(user_id, vector_ids) - - # ── Pinecone implementation ─────────────────────────────────────── - - async def _pinecone_upsert(self, user_id: str, vectors: list[VectorItem]) -> None: - index = self._pinecone_index() - records = [ - { - "id": v.id, - "values": _blob_to_vector(v.blob), - "metadata": { - "blob": base64.b64encode(v.blob).decode(), - "checksum": v.checksum, - "user_id": user_id, - }, - } - for v in vectors - ] - index.upsert(vectors=records, namespace=user_id) - - async def _pinecone_search( - self, user_id: str, query_blob: bytes, top_k: int - ) -> list[VectorSearchResult]: - index = self._pinecone_index() - query_vector = _blob_to_vector(query_blob) - response = index.query( - vector=query_vector, - top_k=top_k, - namespace=user_id, - include_metadata=True, - ) - results: list[VectorSearchResult] = [] - for match in response.get("matches", []): - blob_bytes = base64.b64decode(match["metadata"]["blob"]) - results.append( - VectorSearchResult( - id=match["id"], - score=match["score"], - blob=blob_bytes, - ) - ) - return results - - async def _pinecone_delete(self, user_id: str, vector_ids: list[str]) -> None: - index = self._pinecone_index() - index.delete(ids=vector_ids, namespace=user_id) - - # ── Qdrant implementation ───────────────────────────────────────── - - async def _qdrant_upsert(self, user_id: str, vectors: list[VectorItem]) -> None: - client = self._qdrant_client() - points = [ - PointStruct( - id=v.id, - vector=_blob_to_vector(v.blob), - payload={ - "blob": base64.b64encode(v.blob).decode(), - "checksum": v.checksum, - "user_id": user_id, - }, - ) - for v in vectors - ] - client.upsert(collection_name=_QDRANT_COLLECTION, points=points) - - async def _qdrant_search( - self, user_id: str, query_blob: bytes, top_k: int - ) -> list[VectorSearchResult]: - client = self._qdrant_client() - query_vector = _blob_to_vector(query_blob) - hits = client.search( - collection_name=_QDRANT_COLLECTION, - query_vector=query_vector, - query_filter=Filter( - must=[FieldCondition(key="user_id", match=MatchValue(value=user_id))] - ), - limit=top_k, - ) - return [ - VectorSearchResult( - id=str(hit.id), - score=hit.score, - blob=base64.b64decode(hit.payload["blob"]), - ) - for hit in hits - ] - - async def _qdrant_delete(self, user_id: str, vector_ids: list[str]) -> None: - client = self._qdrant_client() - client.delete( - collection_name=_QDRANT_COLLECTION, - points_selector=PointIdsList(points=vector_ids), - ) diff --git a/docker-compose.yml b/docker-compose.yml index c54bd25..21197ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,37 +36,6 @@ services: # image: redis:7-alpine # restart: unless-stopped - # ── Local S3-compatible storage (MinIO) ── - minio: - image: minio/minio:latest - command: server /data --console-address ":9001" - ports: - - "9000:9000" - - "9001:9001" - environment: - MINIO_ROOT_USER: minioadmin - MINIO_ROOT_PASSWORD: minioadmin - volumes: - - minio_data:/data - healthcheck: - test: ["CMD", "mc", "ready", "local"] - interval: 5s - timeout: 5s - retries: 5 - restart: unless-stopped - - # ── Local vector store (Qdrant) ── - qdrant: - image: qdrant/qdrant:latest - ports: - - "6333:6333" - - "6334:6334" - volumes: - - qdrant_data:/qdrant/storage - restart: unless-stopped - volumes: postgres_data: - minio_data: - qdrant_data: copilot_tokens: diff --git a/tests/test_backup.py b/tests/test_backup.py deleted file mode 100644 index d2926be..0000000 --- a/tests/test_backup.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Tests for backup routes: upload, download, history, delete. - -Exercises the backup lifecycle through the FastAPI TestClient against the -in-memory SQLite test database and moto-mocked S3 bucket. -""" - -from __future__ import annotations - -import hashlib - - -from tests.conftest import auth_header, TEST_USER_IDS - - -# ── Helpers ─────────────────────────────────────────────────────────── - -_BLOB = b"encrypted-backup-blob-opaque-bytes" -_CHECKSUM = hashlib.sha256(_BLOB).hexdigest() -_VERSION = 1 -_TIMESTAMP = 1700000000000 # arbitrary ms timestamp - - -def _backup_headers(tier: str = "power", **overrides) -> dict[str, str]: - """Return auth + backup metadata headers.""" - headers = auth_header(tier) - headers["X-Backup-Version"] = str(overrides.get("version", _VERSION)) - headers["X-Backup-Timestamp"] = str(overrides.get("timestamp", _TIMESTAMP)) - headers["X-Backup-Checksum"] = overrides.get("checksum", _CHECKSUM) - headers["Content-Type"] = "application/octet-stream" - return headers - - -def _upload(client, tier="power", **overrides) -> "Response": # noqa: F821 - """Upload a backup blob and return the response.""" - return client.put( - "/api/v1/backup", - content=overrides.pop("blob", _BLOB), - headers=_backup_headers(tier, **overrides), - ) - - -# ── TestUploadBackup ────────────────────────────────────────────────── - - -class TestUploadBackup: - """PUT /api/v1/backup""" - - def test_upload_success(self, client, s3_bucket) -> None: - resp = _upload(client, tier="power") - assert resp.status_code == 200 - assert resp.json() == {"ok": True} - - def test_upload_creates_history_entry(self, client, s3_bucket) -> None: - _upload(client, tier="power") - history = client.get( - "/api/v1/backup/history", headers=auth_header("power") - ).json() - assert len(history) == 1 - assert history[0]["version"] == _VERSION - assert history[0]["timestamp"] == _TIMESTAMP - assert history[0]["checksum"] == _CHECKSUM - - def test_upload_bad_checksum(self, client, s3_bucket) -> None: - resp = _upload(client, tier="power", checksum="0" * 64) - assert resp.status_code == 400 - - def test_upload_free_tier_blocked(self, client, s3_bucket) -> None: - """Free tier has backup_gb=0 → should return 402.""" - resp = _upload(client, tier="free") - assert resp.status_code == 402 - - def test_upload_pro_tier_allowed(self, client, s3_bucket) -> None: - """Pro tier has backup_gb=5 → small blob succeeds.""" - resp = _upload(client, tier="pro") - assert resp.status_code == 200 - - -# ── TestDownloadBackup ──────────────────────────────────────────────── - - -class TestDownloadBackup: - """GET /api/v1/backup""" - - def test_download_latest(self, client, s3_bucket) -> None: - _upload(client, tier="power") - resp = client.get("/api/v1/backup", headers=auth_header("power")) - assert resp.status_code == 200 - assert resp.content == _BLOB - assert resp.headers["X-Checksum"] == _CHECKSUM - assert resp.headers["X-Backup-Version"] == str(_VERSION) - - def test_download_no_backup_returns_404(self, client, s3_bucket) -> None: - resp = client.get("/api/v1/backup", headers=auth_header("power")) - assert resp.status_code == 404 - - def test_download_if_modified_since_returns_304(self, client, s3_bucket) -> None: - """When If-Modified-Since is after the backup timestamp → 304.""" - _upload(client, tier="power", timestamp=1700000000000) - resp = client.get( - "/api/v1/backup", - headers={ - **auth_header("power"), - "If-Modified-Since": "Thu, 01 Jan 2099 00:00:00 GMT", - }, - ) - assert resp.status_code == 304 - - def test_download_if_modified_since_returns_200(self, client, s3_bucket) -> None: - """When If-Modified-Since is before the backup timestamp → serve blob.""" - _upload(client, tier="power", timestamp=1700000000000) - resp = client.get( - "/api/v1/backup", - headers={ - **auth_header("power"), - "If-Modified-Since": "Thu, 01 Jan 2000 00:00:00 GMT", - }, - ) - assert resp.status_code == 200 - assert resp.content == _BLOB - - def test_download_multiple_returns_latest(self, client, s3_bucket) -> None: - """When multiple backups exist, GET returns the one with the highest timestamp.""" - _upload(client, tier="power", timestamp=1000) - blob2 = b"second-encrypted-backup" - checksum2 = hashlib.sha256(blob2).hexdigest() - _upload(client, tier="power", timestamp=2000, blob=blob2, checksum=checksum2) - resp = client.get("/api/v1/backup", headers=auth_header("power")) - assert resp.status_code == 200 - assert resp.content == blob2 - - -# ── TestBackupHistory ───────────────────────────────────────────────── - - -class TestBackupHistory: - """GET /api/v1/backup/history""" - - def test_history_empty(self, client, s3_bucket) -> None: - resp = client.get("/api/v1/backup/history", headers=auth_header("power")) - assert resp.status_code == 200 - assert resp.json() == [] - - def test_history_returns_entries(self, client, s3_bucket) -> None: - _upload(client, tier="power", timestamp=1000) - _upload(client, tier="power", timestamp=2000) - history = client.get( - "/api/v1/backup/history", headers=auth_header("power") - ).json() - assert len(history) == 2 - # Ordered by timestamp descending - assert history[0]["timestamp"] == 2000 - assert history[1]["timestamp"] == 1000 - - def test_history_isolated_per_user(self, client, s3_bucket) -> None: - """One user's backups should not appear in another user's history.""" - _upload(client, tier="power") - resp = client.get("/api/v1/backup/history", headers=auth_header("team")) - assert resp.json() == [] - - -# ── TestDeleteBackup ────────────────────────────────────────────────── - - -class TestDeleteBackup: - """DELETE /api/v1/backup/{backup_id}""" - - def _get_backup_id(self, client, tier="power") -> str: - """Upload a backup and return its DB id from history.""" - _upload(client, tier=tier) - client.get( - "/api/v1/backup/history", headers=auth_header(tier) - ).json() - # History returns BackupMetadata schema which doesn't have `id`. - # We need to look it up via a different means. - # Since there's only 1 backup, find via history length. - # Actually the schema doesn't return id — let's verify via re-download. - # We'll use a workaround: upload, then list history to confirm it exists, - # then try to delete — but we need the id... - # Let's check if history includes an id field. - # The schema is: version, timestamp, checksum, chunk_count — no id. - # We'll need to query the DB directly or use a known ID. - # For testing, we'll search history then use the DB. - return None # pragma: no cover — overridden below - - def test_delete_success(self, client, s3_bucket, db_session) -> None: - _upload(client, tier="power") - - # Discover the backup_id via direct DB query - import asyncio - from sqlalchemy import select - from app.models import BackupMetadata - - async def _get_id(): - result = await db_session.execute( - select(BackupMetadata.id).where( - BackupMetadata.user_id == TEST_USER_IDS["power"] - ) - ) - return result.scalar_one() - - backup_id = asyncio.get_event_loop().run_until_complete(_get_id()) - - resp = client.delete( - f"/api/v1/backup/{backup_id}", headers=auth_header("power") - ) - assert resp.status_code == 200 - assert resp.json() == {"ok": True} - - # History should now be empty - history = client.get( - "/api/v1/backup/history", headers=auth_header("power") - ).json() - assert history == [] - - def test_delete_nonexistent(self, client, s3_bucket) -> None: - resp = client.delete( - "/api/v1/backup/no-such-id", headers=auth_header("power") - ) - assert resp.status_code == 404 - - def test_delete_other_users_backup(self, client, s3_bucket, db_session) -> None: - """Cannot delete another user's backup (ownership check returns 404).""" - _upload(client, tier="power") - - import asyncio - from sqlalchemy import select - from app.models import BackupMetadata - - async def _get_id(): - result = await db_session.execute( - select(BackupMetadata.id).where( - BackupMetadata.user_id == TEST_USER_IDS["power"] - ) - ) - return result.scalar_one() - - backup_id = asyncio.get_event_loop().run_until_complete(_get_id()) - - # team user tries to delete power user's backup → 404 - resp = client.delete( - f"/api/v1/backup/{backup_id}", headers=auth_header("team") - ) - assert resp.status_code == 404 diff --git a/tests/test_plugins.py b/tests/test_plugins.py deleted file mode 100644 index 9c25d85..0000000 --- a/tests/test_plugins.py +++ /dev/null @@ -1,400 +0,0 @@ -"""Tests for Step 10+12: Plugin Marketplace (DB-backed). - -Covers: - - PluginRegistry: catalog management, filtering, sorting, install counts (PostgreSQL) - - ReviewQueue: pending queue, review decisions, manifest security checklist - - RevenueShare: install event recording, earnings aggregation (PostgreSQL) - - Route integration: tier gate, list/get/install/uninstall via TestClient -""" - -from __future__ import annotations - -import uuid - -import pytest -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.marketplace.plugin_registry import PluginRegistry -from app.marketplace.plugin_review import ReviewQueue, validate_manifest -from app.marketplace.revenue_share import RevenueShare -from app.models import Plugin, PluginReview as PluginReviewModel, RevenueEvent -from app.schemas import PluginManifest -from tests.conftest import TEST_USER_IDS, auth_header - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _fresh_manifest( - plugin_id: str | None = None, - category: str = "productivity", - price_cents: int = 0, - permissions: list[str] | None = None, -) -> PluginManifest: - pid = plugin_id or f"plugin-{uuid.uuid4().hex[:8]}" - return PluginManifest( - id=pid, - name=f"Plugin {pid}", - description=f"Description for {pid}", - version="1.0.0", - author="test-author", - permissions=permissions or ["read:tasks"], - category=category, - price_cents=price_cents, - ) - - -# --------------------------------------------------------------------------- -# PluginRegistry (DB-backed) -# --------------------------------------------------------------------------- - - -class TestPluginRegistry: - """Each test uses the conftest db_session fixture with a fresh in-memory DB.""" - - @pytest.fixture - def reg(self) -> PluginRegistry: - return PluginRegistry() - - @pytest.mark.asyncio - async def test_seed_plugins_are_listed( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - result = await reg.list_plugins(db_session) - assert result.total == 3 - assert all(p.id.startswith("plugin-") for p in result.plugins) - - @pytest.mark.asyncio - async def test_list_approved_only( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "plugins/key.zip") - result = await reg.list_plugins(db_session) - ids = [p.id for p in result.plugins] - assert manifest.id not in ids # still pending - - @pytest.mark.asyncio - async def test_list_filter_by_category( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - result = await reg.list_plugins(db_session, category="communication") - assert result.total == 1 - assert result.plugins[0].id == "plugin-slack-notify" - - @pytest.mark.asyncio - async def test_list_filter_by_query( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - result = await reg.list_plugins(db_session, query="time") - assert result.total == 1 - assert result.plugins[0].id == "plugin-time-tracker" - - @pytest.mark.asyncio - async def test_list_sort_by_installs( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await reg.record_install(db_session, "plugin-slack-notify") - await reg.record_install(db_session, "plugin-slack-notify") - result = await reg.list_plugins(db_session, sort="installs") - assert result.plugins[0].id == "plugin-slack-notify" - - @pytest.mark.asyncio - async def test_get_plugin_found( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - entry = await reg.get_plugin(db_session, "plugin-github-sync") - assert entry is not None - assert entry["manifest"].id == "plugin-github-sync" - assert "install_count" in entry - - @pytest.mark.asyncio - async def test_get_plugin_not_found( - self, reg: PluginRegistry, db_session: AsyncSession - ) -> None: - entry = await reg.get_plugin(db_session, "no-such-plugin") - assert entry is None - - @pytest.mark.asyncio - async def test_submit_sets_pending( - self, reg: PluginRegistry, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - plugin_id = await reg.submit_plugin(db_session, manifest, "key.zip") - assert plugin_id == manifest.id - result = await db_session.execute(select(Plugin).where(Plugin.id == plugin_id)) - row = result.scalar_one() - assert row.status == "pending_review" - - @pytest.mark.asyncio - async def test_approve_makes_visible( - self, reg: PluginRegistry, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "key.zip") - await reg.approve_plugin(db_session, manifest.id) - result = await reg.list_plugins(db_session) - assert manifest.id in [p.id for p in result.plugins] - - @pytest.mark.asyncio - async def test_reject_stores_reason( - self, reg: PluginRegistry, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "key.zip") - await reg.reject_plugin(db_session, manifest.id, reason="Unsafe permissions") - result = await db_session.execute(select(Plugin).where(Plugin.id == manifest.id)) - row = result.scalar_one() - assert row.status == "rejected" - assert row.rejection_reason == "Unsafe permissions" - listed = await reg.list_plugins(db_session) - assert manifest.id not in [p.id for p in listed.plugins] - - @pytest.mark.asyncio - async def test_approve_unknown_raises_key_error( - self, reg: PluginRegistry, db_session: AsyncSession - ) -> None: - with pytest.raises(KeyError): - await reg.approve_plugin(db_session, "ghost-plugin") - - @pytest.mark.asyncio - async def test_record_install_increments_count( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await reg.record_install(db_session, "plugin-github-sync") - entry = await reg.get_plugin(db_session, "plugin-github-sync") - assert entry is not None - assert entry["install_count"] == 1 - - @pytest.mark.asyncio - async def test_record_uninstall_decrements_count( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await reg.record_install(db_session, "plugin-github-sync") - await reg.record_install(db_session, "plugin-github-sync") - await reg.record_uninstall(db_session, "plugin-github-sync") - entry = await reg.get_plugin(db_session, "plugin-github-sync") - assert entry is not None - assert entry["install_count"] == 1 - - @pytest.mark.asyncio - async def test_record_uninstall_floors_at_zero( - self, reg: PluginRegistry, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await reg.record_uninstall(db_session, "plugin-github-sync") - entry = await reg.get_plugin(db_session, "plugin-github-sync") - assert entry is not None - assert entry["install_count"] == 0 - - -# --------------------------------------------------------------------------- -# ReviewQueue (DB-backed) -# --------------------------------------------------------------------------- - - -class TestReviewQueue: - @pytest.fixture - def reg(self) -> PluginRegistry: - return PluginRegistry() - - @pytest.fixture - def queue(self) -> ReviewQueue: - return ReviewQueue() - - @pytest.mark.asyncio - async def test_get_pending_returns_submitted_plugins( - self, reg: PluginRegistry, queue: ReviewQueue, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "key.zip") - pending = await queue.get_pending(db_session) - assert any(p["plugin_id"] == manifest.id for p in pending) - - @pytest.mark.asyncio - async def test_submit_review_approved( - self, reg: PluginRegistry, queue: ReviewQueue, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "key.zip") - await queue.submit_review(db_session, manifest.id, TEST_USER_IDS["power"], "approved", "Looks good") - result = await db_session.execute(select(Plugin).where(Plugin.id == manifest.id)) - row = result.scalar_one() - assert row.status == "approved" - # Check review row was persisted - review_result = await db_session.execute( - select(PluginReviewModel).where(PluginReviewModel.plugin_id == manifest.id) - ) - review = review_result.scalar_one() - assert review.decision == "approved" - - @pytest.mark.asyncio - async def test_submit_review_rejected( - self, reg: PluginRegistry, queue: ReviewQueue, db_session: AsyncSession - ) -> None: - manifest = _fresh_manifest() - await reg.submit_plugin(db_session, manifest, "key.zip") - await queue.submit_review( - db_session, manifest.id, TEST_USER_IDS["power"], "rejected", "Bad permissions" - ) - result = await db_session.execute(select(Plugin).where(Plugin.id == manifest.id)) - row = result.scalar_one() - assert row.status == "rejected" - - def test_validate_manifest_ok(self) -> None: - manifest = _fresh_manifest(permissions=["read:tasks", "write:notes"]) - validate_manifest(manifest) # should not raise - - def test_validate_manifest_unknown_permission(self) -> None: - manifest = _fresh_manifest(permissions=["read:tasks", "read:secrets"]) - with pytest.raises(ValueError, match="Unknown permission"): - validate_manifest(manifest) - - def test_validate_manifest_invalid_id_format(self) -> None: - manifest = _fresh_manifest(plugin_id="Plugin_ID_Invalid") - with pytest.raises(ValueError, match="Invalid plugin id format"): - validate_manifest(manifest) - - def test_validate_manifest_id_with_uppercase(self) -> None: - manifest = _fresh_manifest(plugin_id="UpperCase") - with pytest.raises(ValueError, match="Invalid plugin id format"): - validate_manifest(manifest) - - -# --------------------------------------------------------------------------- -# RevenueShare (DB-backed) -# --------------------------------------------------------------------------- - - -class TestRevenueShare: - @pytest.fixture - def rs(self) -> RevenueShare: - return RevenueShare() - - @pytest.mark.asyncio - async def test_record_install_free_plugin( - self, rs: RevenueShare, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await rs.record_install(db_session, "plugin-github-sync", TEST_USER_IDS["power"], amount_cents=0) - result = await db_session.execute( - select(RevenueEvent).where(RevenueEvent.plugin_id == "plugin-github-sync") - ) - event = result.scalar_one() - assert event.developer_share_cents == 0 - - @pytest.mark.asyncio - async def test_record_install_paid_plugin_no_stripe( - self, rs: RevenueShare, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await rs.record_install( - db_session, "plugin-slack-notify", TEST_USER_IDS["pro"], amount_cents=499 - ) - result = await db_session.execute( - select(RevenueEvent).where(RevenueEvent.plugin_id == "plugin-slack-notify") - ) - event = result.scalar_one() - assert event.amount_cents == 499 - assert event.developer_share_cents == int(499 * 0.70) - - @pytest.mark.asyncio - async def test_record_install_increments_registry_count( - self, rs: RevenueShare, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - reg = PluginRegistry() - await rs.record_install(db_session, "plugin-github-sync", TEST_USER_IDS["power"], amount_cents=0) - entry = await reg.get_plugin(db_session, "plugin-github-sync") - assert entry is not None - assert entry["install_count"] == 1 - - @pytest.mark.asyncio - async def test_get_earnings_empty( - self, rs: RevenueShare, db_session: AsyncSession - ) -> None: - result = await rs.get_earnings(db_session, "unknown-dev") - assert result["total_installs"] == 0 - assert result["total_revenue_cents"] == 0 - assert result["developer_share_cents"] == 0 - - @pytest.mark.asyncio - async def test_get_earnings_aggregates( - self, rs: RevenueShare, db_session: AsyncSession, seed_plugins: list[Plugin] - ) -> None: - await rs.record_install(db_session, "plugin-slack-notify", TEST_USER_IDS["power"], amount_cents=499) - await rs.record_install(db_session, "plugin-slack-notify", TEST_USER_IDS["pro"], amount_cents=499) - result = await rs.get_earnings(db_session, "Adiuva") - assert result["total_installs"] == 2 - assert result["total_revenue_cents"] == 998 - assert result["developer_share_cents"] == int(499 * 0.70) * 2 - - -# --------------------------------------------------------------------------- -# Route integration tests -# --------------------------------------------------------------------------- - - -class TestPluginRoutes: - def test_list_plugins_requires_power_tier(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins", headers=auth_header("free")) - assert resp.status_code == 403 - - def test_list_plugins_pro_tier_blocked(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins", headers=auth_header("pro")) - assert resp.status_code == 403 - - def test_list_plugins_power_tier_ok(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins", headers=auth_header("power")) - assert resp.status_code == 200 - data = resp.json() - assert "plugins" in data - assert data["total"] == 3 - - def test_list_plugins_team_tier_ok(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins", headers=auth_header("team")) - assert resp.status_code == 200 - - def test_get_plugin_found(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins/plugin-github-sync", headers=auth_header()) - assert resp.status_code == 200 - data = resp.json() - assert data["plugin"]["id"] == "plugin-github-sync" - assert "install_count" in data - - def test_get_plugin_not_found(self, client, seed_plugins) -> None: - resp = client.get("/api/v1/plugins/no-such-plugin", headers=auth_header()) - assert resp.status_code == 404 - - def test_install_plugin_free(self, client, seed_plugins) -> None: - resp = client.post( - "/api/v1/plugins/plugin-github-sync/install", - json={"plugin_id": "plugin-github-sync"}, - headers=auth_header(), - ) - assert resp.status_code == 200 - data = resp.json() - assert data["ok"] is True - assert "download_url" in data - - def test_install_plugin_not_found(self, client, seed_plugins) -> None: - resp = client.post( - "/api/v1/plugins/ghost/install", - json={"plugin_id": "ghost"}, - headers=auth_header(), - ) - assert resp.status_code == 404 - - def test_uninstall_plugin_ok(self, client, seed_plugins) -> None: - resp = client.delete( - "/api/v1/plugins/plugin-github-sync/install", - headers=auth_header(), - ) - assert resp.status_code == 200 - assert resp.json()["ok"] is True - - def test_install_requires_power_tier(self, client, seed_plugins) -> None: - resp = client.post( - "/api/v1/plugins/plugin-github-sync/install", - json={"plugin_id": "plugin-github-sync"}, - headers=auth_header("free"), - ) - assert resp.status_code == 403 diff --git a/tests/test_storage.py b/tests/test_storage.py deleted file mode 100644 index 881854d..0000000 --- a/tests/test_storage.py +++ /dev/null @@ -1,562 +0,0 @@ -"""Tests for the storage layer: encryption, BlobStore, VectorStore, and storage routes.""" - -from __future__ import annotations - -import base64 -import hashlib -from unittest.mock import MagicMock, patch - -import boto3 -import pytest -from botocore.exceptions import ClientError - -from app.storage.encryption import reject_if_tampered, verify_checksum -from app.storage.blob_store import BlobStore -from app.storage.vector_store import VectorStore, _blob_to_vector -from app.schemas import VectorItem, VectorSearchResult -from tests.conftest import auth_header, S3_TEST_BUCKET - - -# ── Helpers ─────────────────────────────────────────────────────────── - -_BLOB = b"encrypted-payload-opaque-to-server" -_CHECKSUM = hashlib.sha256(_BLOB).hexdigest() -_BUCKET = S3_TEST_BUCKET -_REGION = "us-east-1" - - -def _pinecone_mock(): - """Return a mock Pinecone index with realistic return shapes.""" - mock_index = MagicMock() - mock_index.query.return_value = { - "matches": [ - { - "id": "v1", - "score": 0.95, - "metadata": { - "blob": base64.b64encode(b"result-blob").decode(), - "checksum": hashlib.sha256(b"result-blob").hexdigest(), - "user_id": "u1", - }, - } - ] - } - mock_pc = MagicMock() - mock_pc.return_value.Index.return_value = mock_index - return mock_pc, mock_index - - -# ── TestEncryption ──────────────────────────────────────────────────── - - -class TestEncryption: - def test_verify_checksum_correct(self) -> None: - assert verify_checksum(_BLOB, _CHECKSUM) is True - - def test_verify_checksum_wrong(self) -> None: - assert verify_checksum(_BLOB, "0" * 64) is False - - def test_verify_checksum_empty_checksum(self) -> None: - assert verify_checksum(_BLOB, "") is False - - def test_verify_checksum_empty_blob(self) -> None: - expected = hashlib.sha256(b"").hexdigest() - assert verify_checksum(b"", expected) is True - - def test_verify_checksum_tampered_blob(self) -> None: - tampered = _BLOB + b"\x00" - assert verify_checksum(tampered, _CHECKSUM) is False - - def test_reject_if_tampered_passes_when_valid(self) -> None: - # Should not raise - reject_if_tampered(_BLOB, _CHECKSUM) - - def test_reject_if_tampered_raises_400_on_mismatch(self) -> None: - from fastapi import HTTPException - - with pytest.raises(HTTPException) as exc_info: - reject_if_tampered(_BLOB, "bad" * 20) - assert exc_info.value.status_code == 400 - - def test_reject_if_tampered_detail_mentions_checksum(self) -> None: - from fastapi import HTTPException - - with pytest.raises(HTTPException) as exc_info: - reject_if_tampered(_BLOB, "bad" * 20) - assert "checksum" in exc_info.value.detail.lower() - - def test_checksum_is_sha256_hex(self) -> None: - cs = hashlib.sha256(_BLOB).hexdigest() - assert len(cs) == 64 - assert all(c in "0123456789abcdef" for c in cs) - - -# ── TestBlobStore ───────────────────────────────────────────────────── - - -class TestBlobStore: - @pytest.mark.asyncio - async def test_upload_returns_correct_key(self, s3_bucket: str) -> None: - store = BlobStore() - key = await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - assert key == "u1/tasks/r1" - - @pytest.mark.asyncio - async def test_upload_object_exists_in_s3(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - # Verify by downloading — no exception means object exists - retrieved = await store.download("u1", "u1/tasks/r1") - assert retrieved == _BLOB - - @pytest.mark.asyncio - async def test_download_retrieves_same_bytes(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "notes", "n1", b"note-data", hashlib.sha256(b"note-data").hexdigest()) - result = await store.download("u1", "u1/notes/n1") - assert result == b"note-data" - - @pytest.mark.asyncio - async def test_delete_removes_object(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - await store.delete("u1", "u1/tasks/r1") - with pytest.raises(ClientError) as exc_info: - await store.download("u1", "u1/tasks/r1") - assert exc_info.value.response["Error"]["Code"] == "NoSuchKey" - - @pytest.mark.asyncio - async def test_delete_is_idempotent(self, s3_bucket: str) -> None: - store = BlobStore() - # Delete a key that never existed — should not raise - await store.delete("u1", "u1/tasks/nonexistent") - - @pytest.mark.asyncio - async def test_list_keys_returns_correct_keys(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - await store.upload("u1", "tasks", "r2", _BLOB, _CHECKSUM) - keys = await store.list_keys("u1", "tasks") - assert set(keys) == {"u1/tasks/r1", "u1/tasks/r2"} - - @pytest.mark.asyncio - async def test_list_keys_scoped_to_table(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - await store.upload("u1", "notes", "n1", _BLOB, _CHECKSUM) - keys = await store.list_keys("u1", "tasks") - assert "u1/notes/n1" not in keys - assert "u1/tasks/r1" in keys - - @pytest.mark.asyncio - async def test_list_keys_no_cross_user_leakage(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - await store.upload("u2", "tasks", "r1", _BLOB, _CHECKSUM) - keys_u1 = await store.list_keys("u1", "tasks") - assert "u2/tasks/r1" not in keys_u1 - - @pytest.mark.asyncio - async def test_list_keys_empty_table(self, s3_bucket: str) -> None: - store = BlobStore() - keys = await store.list_keys("u1", "tasks") - assert keys == [] - - @pytest.mark.asyncio - async def test_upload_uses_sse_s3_encryption(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - # Verify S3 metadata was set — check via head_object - with patch("app.storage.blob_store.settings") as mock_settings: - mock_settings.S3_BUCKET = _BUCKET - mock_settings.S3_REGION = _REGION - mock_settings.AWS_ACCESS_KEY_ID = "testing" - mock_settings.AWS_SECRET_ACCESS_KEY = "testing" - client = boto3.client("s3", region_name=_REGION) - response = client.head_object(Bucket=_BUCKET, Key="u1/tasks/r1") - assert response.get("ServerSideEncryption") == "AES256" - - @pytest.mark.asyncio - async def test_upload_stores_checksum_in_metadata(self, s3_bucket: str) -> None: - store = BlobStore() - await store.upload("u1", "tasks", "r1", _BLOB, _CHECKSUM) - client = boto3.client("s3", region_name=_REGION) - response = client.head_object(Bucket=_BUCKET, Key="u1/tasks/r1") - assert response["Metadata"]["checksum"] == _CHECKSUM - - -# ── _blob_to_vector helper ──────────────────────────────────────────── - - -class TestBlobToVector: - def test_returns_32_floats(self) -> None: - v = _blob_to_vector(b"test") - assert len(v) == 32 - - def test_all_values_in_range(self) -> None: - v = _blob_to_vector(b"test") - assert all(-1.0 <= x <= 1.0 for x in v) - - def test_deterministic(self) -> None: - assert _blob_to_vector(b"same") == _blob_to_vector(b"same") - - def test_different_blobs_different_vectors(self) -> None: - assert _blob_to_vector(b"aaa") != _blob_to_vector(b"bbb") - - -# ── TestVectorStorePinecone ─────────────────────────────────────────── - - -class TestVectorStorePinecone: - def _store(self) -> VectorStore: - store = VectorStore() - store._use_pinecone = lambda: True # type: ignore[method-assign] - return store - - @pytest.mark.asyncio - async def test_upsert_calls_index_upsert(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - items = [VectorItem(id="v1", blob=b"enc-blob", checksum=hashlib.sha256(b"enc-blob").hexdigest())] - await store.upsert("u1", items) - mock_index.upsert.assert_called_once() - call_kwargs = mock_index.upsert.call_args[1] - assert call_kwargs.get("namespace") == "u1" - - @pytest.mark.asyncio - async def test_upsert_encodes_blob_as_base64_in_metadata(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - items = [VectorItem(id="v1", blob=b"secret", checksum=hashlib.sha256(b"secret").hexdigest())] - await store.upsert("u1", items) - vectors_arg = mock_index.upsert.call_args[1]["vectors"] - assert vectors_arg[0]["metadata"]["blob"] == base64.b64encode(b"secret").decode() - - @pytest.mark.asyncio - async def test_search_calls_index_query(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - await store.search("u1", b"query-blob", top_k=5) - mock_index.query.assert_called_once() - query_kwargs = mock_index.query.call_args[1] - assert query_kwargs.get("namespace") == "u1" - assert query_kwargs.get("top_k") == 5 - assert query_kwargs.get("include_metadata") is True - - @pytest.mark.asyncio - async def test_search_returns_vector_search_results(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - results = await store.search("u1", b"query", top_k=10) - assert len(results) == 1 - assert isinstance(results[0], VectorSearchResult) - assert results[0].id == "v1" - assert results[0].score == 0.95 - assert results[0].blob == b"result-blob" - - @pytest.mark.asyncio - async def test_search_uses_derived_query_vector(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - await store.search("u1", b"query-blob", top_k=3) - expected_vector = _blob_to_vector(b"query-blob") - actual_vector = mock_index.query.call_args[1].get("vector") - assert actual_vector == expected_vector - - @pytest.mark.asyncio - async def test_delete_calls_index_delete(self) -> None: - mock_pc, mock_index = _pinecone_mock() - with patch("app.storage.vector_store.Pinecone", mock_pc): - store = self._store() - await store.delete("u1", ["v1", "v2"]) - mock_index.delete.assert_called_once() - delete_kwargs = mock_index.delete.call_args[1] - assert delete_kwargs.get("namespace") == "u1" - assert set(delete_kwargs.get("ids", [])) == {"v1", "v2"} - - -# ── TestVectorStoreQdrant ───────────────────────────────────────────── - - -class TestVectorStoreQdrant: - def _store(self) -> VectorStore: - store = VectorStore() - store._use_pinecone = lambda: False # type: ignore[method-assign] - return store - - def _qdrant_mock(self) -> MagicMock: - mock_hit = MagicMock() - mock_hit.id = "v1" - mock_hit.score = 0.88 - mock_hit.payload = { - "blob": base64.b64encode(b"qdrant-result").decode(), - "user_id": "u1", - } - mock_client = MagicMock() - mock_client.search.return_value = [mock_hit] - return mock_client - - @pytest.mark.asyncio - async def test_upsert_calls_client_upsert(self) -> None: - mock_client = MagicMock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - items = [VectorItem(id="v1", blob=b"enc", checksum=hashlib.sha256(b"enc").hexdigest())] - await store.upsert("u1", items) - mock_client.upsert.assert_called_once() - - @pytest.mark.asyncio - async def test_upsert_uses_correct_collection(self) -> None: - mock_client = MagicMock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - items = [VectorItem(id="v1", blob=b"enc", checksum=hashlib.sha256(b"enc").hexdigest())] - await store.upsert("u1", items) - call_kwargs = mock_client.upsert.call_args[1] - assert call_kwargs["collection_name"] == "adiuva_vectors" - - @pytest.mark.asyncio - async def test_search_calls_client_search(self) -> None: - mock_client = self._qdrant_mock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - await store.search("u1", b"query", top_k=5) - mock_client.search.assert_called_once() - - @pytest.mark.asyncio - async def test_search_passes_limit(self) -> None: - mock_client = self._qdrant_mock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - await store.search("u1", b"query", top_k=7) - call_kwargs = mock_client.search.call_args[1] - assert call_kwargs.get("limit") == 7 - - @pytest.mark.asyncio - async def test_search_returns_vector_search_results(self) -> None: - mock_client = self._qdrant_mock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - results = await store.search("u1", b"query", top_k=5) - assert len(results) == 1 - assert isinstance(results[0], VectorSearchResult) - assert results[0].id == "v1" - assert results[0].score == 0.88 - assert results[0].blob == b"qdrant-result" - - @pytest.mark.asyncio - async def test_delete_calls_client_delete(self) -> None: - mock_client = MagicMock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - await store.delete("u1", ["v1", "v2"]) - mock_client.delete.assert_called_once() - - @pytest.mark.asyncio - async def test_delete_uses_correct_collection(self) -> None: - mock_client = MagicMock() - with patch("app.storage.vector_store.QdrantClient", return_value=mock_client): - store = self._store() - await store.delete("u1", ["v1"]) - call_kwargs = mock_client.delete.call_args[1] - assert call_kwargs["collection_name"] == "adiuva_vectors" - - -# ── TestStorageRoutes (integration) ─────────────────────────────────── - - -class TestStorageRoutes: - """Integration tests for POST/GET/PUT/DELETE /api/v1/storage/records. - - Pydantic v2 converts JSON string → bytes via ``str.encode('utf-8')``. - So "hello" in JSON becomes ``b"hello"`` on the server. We use plain - ASCII strings as blob values and compute checksums accordingly. - """ - - _BLOB_STR = "encrypted-payload-opaque-to-server" - _BLOB_BYTES = _BLOB_STR.encode() - _BLOB_CHECKSUM = hashlib.sha256(_BLOB_BYTES).hexdigest() - - @classmethod - def _create_payload(cls, blob_str: str | None = None) -> dict: - blob_str = blob_str or cls._BLOB_STR - checksum = hashlib.sha256(blob_str.encode()).hexdigest() - return { - "table": "tasks", - "blob": blob_str, - "checksum": checksum, - } - - def _create_record(self, client, tier="power", blob_str=None): - payload = self._create_payload(blob_str) - return client.post( - "/api/v1/storage/records", - json=payload, - headers=auth_header(tier), - ) - - # ── Create ──────────────────────────────────────────────────────── - - def test_create_record(self, client, s3_bucket) -> None: - resp = self._create_record(client) - assert resp.status_code == 201 - data = resp.json() - assert "id" in data - assert "created_at" in data - - def test_create_record_bad_checksum(self, client, s3_bucket) -> None: - payload = { - "table": "tasks", - "blob": self._BLOB_STR, - "checksum": "0" * 64, - } - resp = client.post( - "/api/v1/storage/records", - json=payload, - headers=auth_header("power"), - ) - assert resp.status_code == 400 - - def test_create_record_free_tier_blocked(self, client, s3_bucket) -> None: - """Free tier has cloud_storage_gb=0 → 402.""" - resp = self._create_record(client, tier="free") - assert resp.status_code == 402 - - def test_create_record_pro_tier_allowed(self, client, s3_bucket) -> None: - """Pro tier has cloud_storage_gb=5 → succeeds for small blob.""" - resp = self._create_record(client, tier="pro") - assert resp.status_code == 201 - - # ── List ────────────────────────────────────────────────────────── - - def test_list_records(self, client, s3_bucket) -> None: - self._create_record(client) - self._create_record(client, blob_str="second-blob") - resp = client.get( - "/api/v1/storage/records", - headers=auth_header("power"), - ) - assert resp.status_code == 200 - data = resp.json() - assert len(data) == 2 - # Each entry has metadata, no blob bytes - for item in data: - assert "id" in item - assert "table" in item - assert "checksum" in item - assert "blob" not in item - - def test_list_records_filter_by_table(self, client, s3_bucket) -> None: - self._create_record(client) - # Create in a different table - note_blob = "note-blob" - payload = { - "table": "notes", - "blob": note_blob, - "checksum": hashlib.sha256(note_blob.encode()).hexdigest(), - } - client.post( - "/api/v1/storage/records", - json=payload, - headers=auth_header("power"), - ) - resp = client.get( - "/api/v1/storage/records?table=notes", - headers=auth_header("power"), - ) - assert resp.status_code == 200 - data = resp.json() - assert len(data) == 1 - assert data[0]["table"] == "notes" - - def test_list_records_isolated_per_user(self, client, s3_bucket) -> None: - """One user's records should not appear in another user's list.""" - self._create_record(client, tier="power") - resp = client.get( - "/api/v1/storage/records", - headers=auth_header("team"), - ) - assert resp.json() == [] - - # ── Download ────────────────────────────────────────────────────── - - def test_download_record(self, client, s3_bucket) -> None: - create_resp = self._create_record(client) - record_id = create_resp.json()["id"] - resp = client.get( - f"/api/v1/storage/records/{record_id}", - headers=auth_header("power"), - ) - assert resp.status_code == 200 - assert resp.content == self._BLOB_BYTES - assert resp.headers["X-Checksum"] == self._BLOB_CHECKSUM - - def test_download_record_not_found(self, client, s3_bucket) -> None: - resp = client.get( - "/api/v1/storage/records/nonexistent-id", - headers=auth_header("power"), - ) - assert resp.status_code == 404 - - # ── Update ──────────────────────────────────────────────────────── - - def test_update_record(self, client, s3_bucket) -> None: - create_resp = self._create_record(client) - record_id = create_resp.json()["id"] - new_blob_str = "updated-encrypted-payload" - new_checksum = hashlib.sha256(new_blob_str.encode()).hexdigest() - resp = client.put( - f"/api/v1/storage/records/{record_id}", - json={"blob": new_blob_str, "checksum": new_checksum}, - headers=auth_header("power"), - ) - assert resp.status_code == 200 - assert resp.json() == {"ok": True} - - # Verify download returns the updated blob - dl = client.get( - f"/api/v1/storage/records/{record_id}", - headers=auth_header("power"), - ) - assert dl.content == new_blob_str.encode() - - def test_update_record_bad_checksum(self, client, s3_bucket) -> None: - create_resp = self._create_record(client) - record_id = create_resp.json()["id"] - resp = client.put( - f"/api/v1/storage/records/{record_id}", - json={"blob": "some-data", "checksum": "0" * 64}, - headers=auth_header("power"), - ) - assert resp.status_code == 400 - - # ── Delete ──────────────────────────────────────────────────────── - - def test_delete_record(self, client, s3_bucket) -> None: - create_resp = self._create_record(client) - record_id = create_resp.json()["id"] - resp = client.delete( - f"/api/v1/storage/records/{record_id}", - headers=auth_header("power"), - ) - assert resp.status_code == 200 - assert resp.json() == {"ok": True} - - # Subsequent GET should return 404 - dl = client.get( - f"/api/v1/storage/records/{record_id}", - headers=auth_header("power"), - ) - assert dl.status_code == 404 - - def test_delete_record_not_found(self, client, s3_bucket) -> None: - resp = client.delete( - "/api/v1/storage/records/nonexistent", - headers=auth_header("power"), - ) - assert resp.status_code == 404