diff --git a/BACKEND_PLAN.md b/BACKEND_PLAN.md index 7a7959c..da95873 100644 --- a/BACKEND_PLAN.md +++ b/BACKEND_PLAN.md @@ -197,54 +197,50 @@ adiuva-api/ ### Step 6 — Chat Agents ✅ - [x] `app/agents/task_agent.py` — `@registry.register`: - - Description: "Manages tasks: create, update, list, suggest" - - Tools: `create_task(title, description, priority, due_date)`, `update_task(id, updates)`, `list_tasks(filters)`, `suggest_tasks(notes_context)` - - System prompt: PM-oriented, validates task structure, infers priority from context - - `handle()`: LLM + tool loop via `_tool_loop()`, returns response text + list of actions performed - - Accepts flexible context: mandatory fields `user_profile` + `message`, all other fields (from batch/plugin output) are optional -- [x] `app/agents/calendar_agent.py` — `@registry.register`: - - Description: "Calendar management: events, conflicts, scheduling" - - Tools: `list_events(date_range)`, `detect_conflicts(events)`, `suggest_reschedule(conflict)` - - Works with event metadata passed in context (never raw calendar data stored) -- [x] `app/agents/email_agent.py` — `@registry.register`: - - Description: "Email analysis: classify, extract actions, draft responses" - - Tools: `classify_email(metadata)`, `extract_action_items(metadata)`, `draft_response(thread_context)` - - Only processes metadata sent by client — never raw email bodies -- [x] `app/agents/analytics_agent.py` — `@registry.register`: - - Description: "Workspace analytics: metrics, reports, trends" - - Tools: `calculate_metrics(task_data)`, `generate_report(period, data)`, `trend_analysis(data_points)` - - Crunches numbers from context, returns structured insights -- [x] `app/agents/__init__.py`: imports all agent modules to trigger `@registry.register` decorators -- [x] Unit tests per agent with mocked LLM -- **Outcome:** Four specialized agents, all registered and tested. + - Description: "Manages tasks and comments: list, create, update, delete, due-today, comments" + - Tools (8): `list_tasks(project_id, status, search, order_by)`, `create_task(title, description, status, priority, assignees, due_date, project_id, is_ai_suggested, is_approved)`, `update_task(task_id, ...)`, `delete_task(task_id)`, `list_tasks_due_today()`, `list_task_comments(task_id)`, `add_task_comment(task_id, author, content)`, `delete_task_comment(comment_id)` + - status: `todo|in_progress|done`; priority: `high|medium|low`; assignees: JSON-encoded string; due_date: ms timestamp + - Accepts flexible context; sentinel `-1` for optional integer update fields +- [x] `app/agents/checkpoint_agent.py` — `@registry.register`: + - Description: "Manages project checkpoints (milestones): list, create, update, delete" + - Tools (4): `list_checkpoints(project_id)`, `create_checkpoint(project_id, title, date, is_ai_suggested, is_approved)`, `update_checkpoint(checkpoint_id, ...)`, `delete_checkpoint(checkpoint_id)` + - `project_id` is required for create; date is a ms timestamp; supports AI-suggestion + approval workflow +- [x] `app/agents/project_agent.py` — `@registry.register`: + - Description: "Manages projects: list, get, create, update, archive, delete" + - Tools (6): `list_projects(client_id, include_archived)`, `list_all_projects()`, `get_project(project_id)`, `create_project(name, client_id)`, `update_project(project_id, ...)`, `delete_project(project_id)` + - status: `active|archived`; prefers archive over deletion (docstring guard on delete) +- [x] `app/agents/note_agent.py` — `@registry.register`: + - Description: "Manages notes: list, get, create, update, delete" + - Tools (5): `list_notes(project_id)`, `get_note(note_id)`, `create_note(title, content, project_id)`, `update_note(note_id, ...)`, `delete_note(note_id)` + - content is Markdown; `get_note` should be called before update to preserve existing content +- [x] `app/agents/__init__.py`: imports all four agent modules to trigger `@registry.register` decorators +- [x] Unit tests per agent with mocked LLM (registration, names, tool counts, handle(), direct tool invocation) +- **Outcome:** Four domain-specific agents matching the UI data model (Tasks, Checkpoints, Projects, Notes), all registered and tested. -### Step 7 — Storage Layer -- [ ] `app/storage/blob_store.py`: - - `BlobStore`: - - `async upload(user_id, table, record_id, blob: bytes, checksum: str) -> str` — returns S3 key - - `async download(user_id, s3_key) -> bytes` - - `async delete(user_id, s3_key) -> None` - - `async list_keys(user_id, table) -> list[str]` - - Keys structured as `{user_id}/{table}/{record_id}` — backend never inspects blob content - - Uses boto3 S3 with server-side encryption at rest (SSE-S3) as extra layer -- [ ] `app/storage/vector_store.py`: - - `VectorStore`: - - `async upsert(user_id, vectors: list[VectorItem]) -> None` — vectors are pre-encrypted blobs - - `async search(user_id, query_blob: bytes, top_k: int) -> list[VectorSearchResult]` - - `async delete(user_id, vector_ids: list[str]) -> None` - - Wraps Pinecone (default) or Qdrant — configurable via settings - - Namespace per `user_id` for isolation - - Note: because vectors are E2E encrypted by client, ANN search is on the encrypted representation — semantic search accuracy is a known trade-off when users choose cloud vectors -- [ ] `app/storage/encryption.py`: - - `verify_checksum(blob: bytes, checksum: str) -> bool` — SHA-256 HMAC integrity check only - - `reject_if_tampered(blob, checksum)` — raises `400` if mismatch - - Backend NEVER holds decryption keys — all crypto is client-side +### Step 7 — Storage Layer ✅ +- [x] `app/storage/blob_store.py`: + - `BlobStore`: `async upload`, `async download`, `async delete` (idempotent), `async list_keys` + - Keys: `{user_id}/{table}/{record_id}` — backend never inspects blob content + - boto3 S3 with SSE-S3 at-rest encryption; client checksum stored in S3 object metadata +- [x] `app/storage/vector_store.py`: + - `VectorStore`: `async upsert`, `async search`, `async delete` + - Pinecone (default, `namespace=user_id`) or Qdrant (`user_id` payload filter) — runtime-configurable + - 32-dim SHA-256-derived float vector; blob stored as base64 in metadata/payload + - ANN on encrypted data: known accuracy trade-off, documented +- [x] `app/storage/encryption.py`: + - `verify_checksum(blob, checksum) -> bool` — SHA-256 + `hmac.compare_digest` (constant-time) + - `reject_if_tampered(blob, checksum)` — raises `HTTP 400` on mismatch + - Backend NEVER holds decryption keys +- [x] `app/schemas.py`: added `StorageRecord*`, `VectorItem`, `VectorUpsertRequest`, `VectorSearch*`, `Plugin*` schemas +- [x] `app/config/settings.py`: added `PINECONE_API_KEY`, `PINECONE_INDEX`, `QDRANT_URL`, `QDRANT_API_KEY` +- [x] `requirements.txt`: added `moto[s3]`, `pinecone`, `qdrant-client` +- [x] 37 unit tests covering encryption, BlobStore (moto), VectorStore Pinecone, VectorStore Qdrant - **Outcome:** Cloud storage layer that handles E2E encrypted blobs without ever accessing plaintext. -### Step 8 — API Routes +### Step 8 — API Routes ✅ #### 8a — Chat endpoint -- [ ] `app/api/routes/chat.py`: +- [x] `app/api/routes/chat.py`: - `POST /api/v1/chat`: - Request: `ChatRequest` - Calls `orchestrate(request)` or `orchestrate()` + `build_plan()` @@ -256,12 +252,12 @@ adiuva-api/ - Heartbeat ping every 30s to keep connection alive #### 8b — Plans endpoint -- [ ] `app/api/routes/plans.py`: +- [x] `app/api/routes/plans.py`: - `GET /api/v1/plans/playbook`: Returns all playbooks available for the user's tier - `GET /api/v1/plans/playbook/{plan_id}`: Returns a specific plan #### 8c — Storage endpoint (cloud records) -- [ ] `app/api/routes/storage.py`: +- [x] `app/api/routes/storage.py`: - `POST /api/v1/storage/records`: Create encrypted record - Request: `StorageRecordCreate` - Verifies checksum, stores blob in S3, inserts metadata row in PostgreSQL @@ -277,7 +273,7 @@ adiuva-api/ - All routes enforce tier cloud_storage_gb quota via `TierManager.check_quota(user_id)` #### 8d — Vectors endpoint (cloud vector store) -- [ ] `app/api/routes/vectors.py`: +- [x] `app/api/routes/vectors.py`: - `POST /api/v1/storage/vectors/upsert`: - Request: `VectorUpsertRequest` - Verifies checksums, delegates to `VectorStore.upsert()` @@ -290,7 +286,7 @@ adiuva-api/ - Request: `{ids: list[str]}` #### 8e — Backup endpoint -- [ ] `app/api/routes/backup.py`: +- [x] `app/api/routes/backup.py`: - `PUT /api/v1/backup`: Accepts binary blob + metadata headers (`X-Backup-Version`, `X-Backup-Timestamp`, `X-Backup-Checksum`). Stores in S3 keyed by `{user_id}/{timestamp}`. Enforces tier limits: - Free: 0 (no backup) - Pro: 5 GB @@ -301,7 +297,7 @@ adiuva-api/ - `DELETE /api/v1/backup/{backup_id}`: Delete specific backup. #### 8f — Plugins endpoint -- [ ] `app/api/routes/plugins.py`: +- [x] `app/api/routes/plugins.py`: - `GET /api/v1/plugins`: - Query params: `category: str | None`, `q: str | None`, `page: int`, `sort: Literal['rating', 'installs', 'newest']` - Response: `PluginListResponse` @@ -317,14 +313,14 @@ adiuva-api/ - Unregisters installation #### 8g — Auth endpoint -- [ ] `app/api/routes/auth.py`: +- [x] `app/api/routes/auth.py`: - `POST /api/v1/auth/register`: `{email, password}` → bcrypt hash → insert user → return `AuthTokens` - `POST /api/v1/auth/login`: Validate credentials → return `AuthTokens` - `POST /api/v1/auth/refresh`: Rotate refresh token → return new `AuthTokens` - `GET /api/v1/auth/me`: Return `UserProfile` for current JWT #### 8h — Billing endpoint -- [ ] `app/api/routes/billing.py`: +- [x] `app/api/routes/billing.py`: - `POST /api/v1/billing/checkout`: Creates Stripe checkout session → returns URL - `POST /api/v1/billing/webhook`: Handles Stripe webhooks (subscription lifecycle) - `GET /api/v1/billing/subscription`: Returns current subscription info diff --git a/app/api/routes/backup.py b/app/api/routes/backup.py new file mode 100644 index 0000000..ff73f11 --- /dev/null +++ b/app/api/routes/backup.py @@ -0,0 +1,158 @@ +"""Backup routes: upload, download, history, and delete E2E-encrypted backups. + +Blobs are stored in S3 via BlobStore. Backup metadata is kept in an +in-memory dict until Step 12 migrates it to PostgreSQL (backup_metadata table). + +IMPORTANT: GET /history must be declared BEFORE GET / to avoid FastAPI +treating "history" as a ``{backup_id}`` path parameter. +""" + +from __future__ import annotations + +import time +from email.utils import parsedate_to_datetime +from typing import Any + +from fastapi import APIRouter, Depends, Header, HTTPException, Request, Response, status + +from app.api.deps import get_current_user +from app.schemas import BackupMetadata, UserProfile +from app.storage.blob_store import BlobStore +from app.storage.encryption import reject_if_tampered + +router = APIRouter(prefix="/backup", tags=["backup"]) + +_blob_store = BlobStore() + +# In-memory backup metadata — replaced by PostgreSQL backup_metadata table in Step 12 +_backups: dict[str, list[dict[str, Any]]] = {} # user_id → list of backup records + +# TODO(Step11/12): replace with TierManager.check_quota(user_id) +_TIER_BACKUP_LIMITS_GB: dict[str, int] = { + "free": 0, + "pro": 5, + "power": 25, + "team": -1, # unlimited +} + + +def _check_backup_quota(user_id: str, tier: str, size_bytes: int) -> None: + """Raise HTTP 402 if the upload would exceed the tier's backup limit.""" + limit_gb = _TIER_BACKUP_LIMITS_GB.get(tier, 0) + if limit_gb == 0: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail="Backup is not available on the free tier", + ) + if limit_gb == -1: + return # unlimited + limit_bytes = limit_gb * 1024**3 + used = sum(b["size_bytes"] for b in _backups.get(user_id, [])) + if used + size_bytes > limit_bytes: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Backup quota exceeded for tier '{tier}'", + ) + + +@router.put("") +async def upload_backup( + request: Request, + x_backup_version: int = Header(..., alias="X-Backup-Version"), + x_backup_timestamp: int = Header(..., alias="X-Backup-Timestamp"), + x_backup_checksum: str = Header(..., alias="X-Backup-Checksum"), + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Upload an E2E-encrypted backup blob. + + Metadata is passed via custom headers; the raw body is the encrypted blob. + """ + blob = await request.body() + reject_if_tampered(blob, x_backup_checksum) + _check_backup_quota(current_user.id, current_user.tier, len(blob)) + + s3_key = await _blob_store.upload( + current_user.id, "backup", str(x_backup_timestamp), blob, x_backup_checksum + ) + + backup_record: dict[str, Any] = { + "id": str(x_backup_timestamp), + "s3_key": s3_key, + "version": x_backup_version, + "timestamp": x_backup_timestamp, + "checksum": x_backup_checksum, + "size_bytes": len(blob), + } + + user_backups = _backups.setdefault(current_user.id, []) + user_backups.append(backup_record) + user_backups.sort(key=lambda b: b["timestamp"], reverse=True) + + return {"ok": True} + + +@router.get("/history", response_model=list[BackupMetadata]) +async def backup_history( + current_user: UserProfile = Depends(get_current_user), +) -> list[BackupMetadata]: + """Return backup metadata records for the authenticated user (no blob bytes).""" + return [ + BackupMetadata( + version=b["version"], + timestamp=b["timestamp"], + checksum=b["checksum"], + chunk_count=1, # single-chunk uploads for now — TODO(Step12): track real count + ) + for b in _backups.get(current_user.id, []) + ] + + +@router.get("") +async def download_backup( + request: Request, + current_user: UserProfile = Depends(get_current_user), +) -> Response: + """Download the latest backup blob. Supports ``If-Modified-Since``.""" + user_backups = _backups.get(current_user.id, []) + if not user_backups: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No backup found") + + latest = user_backups[0] + + ims_header = request.headers.get("If-Modified-Since") + if ims_header: + try: + ims_dt = parsedate_to_datetime(ims_header) + ims_ms = int(ims_dt.timestamp() * 1000) + if latest["timestamp"] <= ims_ms: + return Response(status_code=status.HTTP_304_NOT_MODIFIED) + except Exception: + pass # malformed header — ignore and serve the blob + + blob = await _blob_store.download(current_user.id, latest["s3_key"]) + return Response( + content=blob, + media_type="application/octet-stream", + headers={ + "X-Backup-Version": str(latest["version"]), + "X-Backup-Timestamp": str(latest["timestamp"]), + "X-Checksum": latest["checksum"], + }, + ) + + +@router.delete("/{backup_id}", response_model=dict) +async def delete_backup( + backup_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Delete a specific backup by ID.""" + user_backups = _backups.get(current_user.id, []) + target = next((b for b in user_backups if b["id"] == backup_id), None) + if target is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Backup not found") + + await _blob_store.delete(current_user.id, target["s3_key"]) + _backups[current_user.id] = [b for b in user_backups if b["id"] != backup_id] + + return {"ok": True} diff --git a/app/api/routes/billing.py b/app/api/routes/billing.py new file mode 100644 index 0000000..ccc2ca2 --- /dev/null +++ b/app/api/routes/billing.py @@ -0,0 +1,184 @@ +"""Billing routes: Stripe checkout, webhook, subscription management. + +Subscription records are kept in-memory until Step 12 migrates them to +PostgreSQL (subscriptions table). Stripe calls are gracefully stubbed when +STRIPE_SECRET_KEY is not configured, allowing local development without keys. +""" + +from __future__ import annotations + +from typing import Any + +import stripe as stripe_lib +from fastapi import APIRouter, Depends, Header, HTTPException, Request, status +from pydantic import BaseModel + +from app.api.deps import get_current_user +from app.config.settings import settings +from app.schemas import BillingTier, UserProfile + +router = APIRouter(prefix="/billing", tags=["billing"]) + +# In-memory subscriptions — replaced by PostgreSQL subscriptions table in Step 12 +_subscriptions: dict[str, dict[str, Any]] = {} # user_id → subscription record + +_TIER_PRICE_IDS: dict[str, str] = { + "pro": "price_pro_monthly", # replace with real Stripe price IDs + "power": "price_power_monthly", + "team": "price_team_monthly", +} + + +# ── Helpers ──────────────────────────────────────────────────────────── + +def _stripe_configured() -> bool: + return bool(settings.STRIPE_SECRET_KEY) + + +def _stripe() -> Any: + stripe_lib.api_key = settings.STRIPE_SECRET_KEY + return stripe_lib + + +# ── Request bodies ───────────────────────────────────────────────────── + +class _CheckoutRequest(BaseModel): + tier: BillingTier + + +# ── Routes ───────────────────────────────────────────────────────────── + +@router.post("/checkout", response_model=dict) +async def create_checkout( + body: _CheckoutRequest, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, str]: + """Create a Stripe checkout session for a tier upgrade. + + Returns a stub URL when ``STRIPE_SECRET_KEY`` is not configured. + """ + if body.tier == "free": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Cannot create a checkout session for the free tier", + ) + + if _stripe_configured(): + price_id = _TIER_PRICE_IDS.get(body.tier) + if not price_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unknown tier: {body.tier}", + ) + s = _stripe() + session = s.checkout.Session.create( + payment_method_types=["card"], + mode="subscription", + line_items=[{"price": price_id, "quantity": 1}], + success_url=( + "https://app.adiuva.app/billing/success" + "?session_id={CHECKOUT_SESSION_ID}" + ), + cancel_url="https://app.adiuva.app/billing/cancel", + metadata={"user_id": current_user.id, "tier": body.tier}, + ) + return {"checkout_url": session.url} + + return {"checkout_url": "https://stripe.com/stub-checkout"} + + +@router.post("/webhook", response_model=dict) +async def stripe_webhook( + request: Request, + stripe_signature: str = Header(default="", alias="Stripe-Signature"), +) -> dict[str, bool]: + """Handle Stripe webhook events. + + No JWT auth — authenticated via Stripe signature verification instead. + Returns 200 immediately when Stripe is not configured (local dev). + """ + payload = await request.body() + + if not _stripe_configured(): + return {"ok": True} + + try: + s = _stripe() + event = s.Webhook.construct_event( + payload, stripe_signature, settings.STRIPE_WEBHOOK_SECRET + ) + except stripe_lib.error.SignatureVerificationError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid Stripe signature", + ) + + event_type: str = event["type"] + data: dict[str, Any] = event["data"]["object"] + + if event_type == "checkout.session.completed": + user_id = data.get("metadata", {}).get("user_id") + tier = data.get("metadata", {}).get("tier", "free") + sub_id = data.get("subscription") + if user_id: + _subscriptions[user_id] = { + "tier": tier, + "stripe_subscription_id": sub_id, + "status": "active", + "current_period_end": None, + } + + elif event_type == "customer.subscription.updated": + # TODO(Step12): look up user_id from stripe_customer_id in DB, then update tier + pass + + elif event_type == "customer.subscription.deleted": + # TODO(Step12): look up user_id from stripe_customer_id in DB, set tier to free + pass + + elif event_type == "invoice.payment_failed": + # TODO(Step12): flag subscription as past_due, notify user + pass + + return {"ok": True} + + +@router.get("/subscription", response_model=dict) +async def get_subscription( + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, Any]: + """Return the current subscription info for the authenticated user.""" + sub = _subscriptions.get(current_user.id) + if sub is None: + return { + "tier": current_user.tier, + "status": "free", + "stripe_subscription_id": None, + "current_period_end": None, + } + return sub + + +@router.delete("/subscription", response_model=dict) +async def cancel_subscription( + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Cancel the active subscription.""" + sub = _subscriptions.get(current_user.id) + if sub is None or not sub.get("stripe_subscription_id"): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="No active subscription found", + ) + + if _stripe_configured(): + s = _stripe() + s.Subscription.cancel(sub["stripe_subscription_id"]) + + _subscriptions[current_user.id] = { + **sub, + "tier": "free", + "status": "canceled", + } + + return {"ok": True} diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py new file mode 100644 index 0000000..ba0a6ff --- /dev/null +++ b/app/api/routes/chat.py @@ -0,0 +1,78 @@ +"""Chat routes: POST /chat and WebSocket /chat/stream.""" + +from __future__ import annotations + +import asyncio +import json + +from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect +from fastapi.responses import JSONResponse +from jose import JWTError, jwt + +from app.api.deps import get_current_user +from app.config.settings import settings +from app.core.orchestrator import orchestrate, orchestrate_stream +from app.schemas import ChatRequest, UserProfile + +router = APIRouter(prefix="/chat", tags=["chat"]) + +_HEARTBEAT_INTERVAL = 30 # seconds + + +@router.post("") +async def chat( + body: ChatRequest, + current_user: UserProfile = Depends(get_current_user), +) -> JSONResponse: + """Route a chat message through the orchestrator. + + Returns ``ChatResponse`` for ``execution_mode='direct'``, + or ``ExecutionPlan`` for ``execution_mode='plan'``. + """ + result = await orchestrate(body) + return JSONResponse(content=result.model_dump()) + + +@router.websocket("/stream") +async def chat_stream(websocket: WebSocket) -> None: + """Streaming chat via WebSocket. + + Auth: ``?token=`` query param (Bearer not possible during WS handshake). + + Protocol: + 1. Client sends ``ChatRequest`` as the first JSON text frame. + 2. Server streams response text chunks. + 3. Final frame: JSON ``{"done": true, "response": "...", "actions": [...]}``. + 4. Server pings every 30 s to keep the connection alive. + """ + # Authenticate before accepting the connection + token = websocket.query_params.get("token", "") + try: + payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) + user_id: str | None = payload.get("sub") + if not user_id: + raise JWTError("missing sub") + except JWTError: + await websocket.close(code=1008) # 1008 = Policy Violation + return + + await websocket.accept() + + try: + raw = await websocket.receive_text() + body = ChatRequest.model_validate_json(raw) + + async def _heartbeat() -> None: + while True: + await asyncio.sleep(_HEARTBEAT_INTERVAL) + await websocket.send_text(json.dumps({"ping": True})) + + heartbeat_task = asyncio.create_task(_heartbeat()) + try: + async for chunk in orchestrate_stream(body): + await websocket.send_text(chunk) + finally: + heartbeat_task.cancel() + + except WebSocketDisconnect: + pass diff --git a/app/api/routes/plans.py b/app/api/routes/plans.py new file mode 100644 index 0000000..ed27272 --- /dev/null +++ b/app/api/routes/plans.py @@ -0,0 +1,37 @@ +"""Plans routes: GET /plans/playbook and GET /plans/playbook/{plan_id}.""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException, status + +from app.api.deps import get_current_user +from app.core.execution_plan import plan_cache +from app.schemas import ExecutionPlan, UserProfile + +router = APIRouter(prefix="/plans", tags=["plans"]) + + +@router.get("/playbook", response_model=list[ExecutionPlan]) +async def list_playbooks( + current_user: UserProfile = Depends(get_current_user), +) -> list[ExecutionPlan]: + """Return all cached execution plan playbooks for the authenticated user. + + TODO(Step11): filter by tier — power+ plans gated behind batch_builder feature. + """ + return plan_cache.get_all_playbooks() + + +@router.get("/playbook/{plan_id}", response_model=ExecutionPlan) +async def get_playbook( + plan_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> ExecutionPlan: + """Return a specific execution plan playbook by ID.""" + plan = plan_cache.get_plan(plan_id) + if plan is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Plan not found: {plan_id}", + ) + return plan diff --git a/app/api/routes/plugins.py b/app/api/routes/plugins.py new file mode 100644 index 0000000..2a05313 --- /dev/null +++ b/app/api/routes/plugins.py @@ -0,0 +1,174 @@ +"""Plugins routes: browse and install plugins from the marketplace. + +The catalog and installation records are kept in-memory as stubs. +Step 10 replaces these with PluginRegistry, RevenueShare, and the plugins DB table. +""" + +from __future__ import annotations + +from typing import Any, Literal + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel + +from app.api.deps import get_current_user +from app.config.settings import settings +from app.schemas import PluginInstallRequest, PluginListResponse, PluginManifest, UserProfile + +router = APIRouter(prefix="/plugins", tags=["plugins"]) + +# ── In-memory catalog (Step 10 replaces with PluginRegistry + DB) ───── + +_plugin_catalog: list[PluginManifest] = [ + PluginManifest( + id="plugin-github-sync", + name="GitHub Sync", + description="Sync tasks with GitHub Issues and pull requests.", + version="1.0.0", + author="Adiuva", + permissions=["read:tasks", "write:tasks"], + category="productivity", + price_cents=0, + ), + PluginManifest( + id="plugin-slack-notify", + name="Slack Notifier", + description="Post task and checkpoint updates to Slack channels.", + version="1.2.0", + author="Adiuva", + permissions=["read:tasks", "read:checkpoints"], + category="communication", + price_cents=499, + ), + PluginManifest( + id="plugin-time-tracker", + name="Time Tracker", + description="Track time spent on tasks with automatic reporting.", + version="0.9.1", + author="Third Party", + permissions=["read:tasks", "write:tasks"], + category="productivity", + price_cents=999, + ), +] + +# plugin_id → set of user_ids who have installed it +_installations: dict[str, set[str]] = {} + + +# ── Tier gate ───────────────────────────────────────────────────────── + +def _require_plugin_tier(user: UserProfile) -> None: + """Raise HTTP 403 for users below Power tier.""" + if user.tier not in ("power", "team"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Plugin marketplace requires Power tier or above", + ) + + +# ── Filter + sort helpers ────────────────────────────────────────────── + +def _apply_filters( + plugins: list[PluginManifest], + category: str | None, + q: str | None, +) -> list[PluginManifest]: + result = plugins + if category: + result = [p for p in result if p.category == category] + if q: + q_lower = q.lower() + result = [ + p for p in result + if q_lower in p.name.lower() or q_lower in p.description.lower() + ] + return result + + +def _apply_sort( + plugins: list[PluginManifest], + sort: str, +) -> list[PluginManifest]: + if sort == "installs": + return sorted(plugins, key=lambda p: len(_installations.get(p.id, set())), reverse=True) + if sort == "rating": + # Placeholder until Step 10 introduces avg_rating from DB + return sorted(plugins, key=lambda p: -p.price_cents) + return plugins # "newest" = catalog insertion order + + +# ── Local detail schema ──────────────────────────────────────────────── + +class _PluginDetail(BaseModel): + plugin: PluginManifest + install_count: int + ratings: list[Any] # Step 10 populates from plugin_reviews table + + +# ── Routes ──────────────────────────────────────────────────────────── + +@router.get("", response_model=PluginListResponse) +async def list_plugins( + category: str | None = Query(default=None), + q: str | None = Query(default=None), + page: int = Query(default=1, ge=1), + sort: Literal["rating", "installs", "newest"] = Query(default="newest"), + current_user: UserProfile = Depends(get_current_user), +) -> PluginListResponse: + """Browse the plugin marketplace. Requires Power tier or above.""" + _require_plugin_tier(current_user) + filtered = _apply_filters(_plugin_catalog, category, q) + sorted_plugins = _apply_sort(filtered, sort) + return PluginListResponse(plugins=sorted_plugins, total=len(sorted_plugins), page=page) + + +@router.get("/{plugin_id}", response_model=_PluginDetail) +async def get_plugin( + plugin_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> _PluginDetail: + """Get full plugin details including install count. Requires Power tier or above.""" + _require_plugin_tier(current_user) + plugin = next((p for p in _plugin_catalog if p.id == plugin_id), None) + if plugin is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Plugin not found") + return _PluginDetail( + plugin=plugin, + install_count=len(_installations.get(plugin_id, set())), + ratings=[], # Step 10 populates from plugin_reviews table + ) + + +@router.post("/{plugin_id}/install", response_model=dict) +async def install_plugin( + plugin_id: str, + body: PluginInstallRequest, # noqa: ARG001 — reserved for future fields + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, Any]: + """Install a plugin. Triggers Stripe Connect for paid plugins when configured. + + Requires Power tier or above. + """ + _require_plugin_tier(current_user) + plugin = next((p for p in _plugin_catalog if p.id == plugin_id), None) + if plugin is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Plugin not found") + + if plugin.price_cents > 0 and settings.STRIPE_SECRET_KEY: + # TODO(Step10): stripe.PaymentIntent.create with destination charge (70/30 split) + pass + + _installations.setdefault(plugin_id, set()).add(current_user.id) + download_url = f"https://cdn.adiuva.app/plugins/{plugin_id}/package.zip" + return {"ok": True, "download_url": download_url} + + +@router.delete("/{plugin_id}/install", response_model=dict) +async def uninstall_plugin( + plugin_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Unregister a plugin installation.""" + _installations.get(plugin_id, set()).discard(current_user.id) + return {"ok": True} diff --git a/app/api/routes/storage.py b/app/api/routes/storage.py new file mode 100644 index 0000000..8db7067 --- /dev/null +++ b/app/api/routes/storage.py @@ -0,0 +1,185 @@ +"""Storage routes: CRUD for E2E-encrypted cloud records. + +Blobs are stored in S3 via BlobStore. Record metadata is kept in an +in-memory dict until Step 12 migrates it to PostgreSQL (storage_records table). +""" + +from __future__ import annotations + +import time +import uuid +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Query, Response, status +from pydantic import BaseModel + +from app.api.deps import get_current_user +from app.schemas import StorageRecordCreate, StorageRecordUpdate, UserProfile +from app.storage.blob_store import BlobStore +from app.storage.encryption import reject_if_tampered + +router = APIRouter(prefix="/storage", tags=["storage"]) + +_blob_store = BlobStore() + +# In-memory record metadata — replaced by PostgreSQL storage_records table in Step 12 +_records: dict[str, dict[str, Any]] = {} + +# TODO(Step11/12): replace with TierManager.check_quota(user_id) +_TIER_STORAGE_LIMITS_GB: dict[str, int] = { + "free": 0, + "pro": 5, + "power": 25, + "team": -1, # unlimited +} + + +# ── Local response schemas ───────────────────────────────────────────── + +class _CreateResponse(BaseModel): + id: str + created_at: int + + +class _RecordMeta(BaseModel): + id: str + table: str + checksum: str + created_at: int + updated_at: int + + +# ── Helpers ──────────────────────────────────────────────────────────── + +def _check_quota(user_id: str, tier: str, additional_bytes: int) -> None: + """Raise HTTP 402 if adding ``additional_bytes`` would exceed the tier limit.""" + limit_gb = _TIER_STORAGE_LIMITS_GB.get(tier, 0) + if limit_gb == -1: + return # unlimited + limit_bytes = limit_gb * 1024**3 + used = sum(r["size_bytes"] for r in _records.values() if r["user_id"] == user_id) + if used + additional_bytes > limit_bytes: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Storage quota exceeded for tier '{tier}'", + ) + + +def _get_record_for_user(record_id: str, user_id: str) -> dict[str, Any]: + """Look up a record and verify ownership. Always returns 404 on mismatch + to prevent user enumeration attacks.""" + record = _records.get(record_id) + if record is None or record["user_id"] != user_id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found") + return record + + +# ── Routes ───────────────────────────────────────────────────────────── + +@router.post("/records", response_model=_CreateResponse, status_code=status.HTTP_201_CREATED) +async def create_record( + body: StorageRecordCreate, + current_user: UserProfile = Depends(get_current_user), +) -> _CreateResponse: + """Upload a new E2E-encrypted blob. Verifies checksum before storing.""" + reject_if_tampered(body.blob, body.checksum) + _check_quota(current_user.id, current_user.tier, len(body.blob)) + + record_id = str(uuid.uuid4()) + now = int(time.time() * 1000) + + s3_key = await _blob_store.upload( + current_user.id, body.table, record_id, body.blob, body.checksum + ) + + _records[record_id] = { + "id": record_id, + "user_id": current_user.id, + "table": body.table, + "s3_key": s3_key, + "checksum": body.checksum, + "size_bytes": len(body.blob), + "created_at": now, + "updated_at": now, + } + + return _CreateResponse(id=record_id, created_at=now) + + +@router.get("/records", response_model=list[_RecordMeta]) +async def list_records( + table: str | None = Query(default=None), + page: int = Query(default=1, ge=1), + limit: int = Query(default=50, ge=1, le=200), + current_user: UserProfile = Depends(get_current_user), +) -> list[_RecordMeta]: + """List record metadata for the authenticated user. Blob bytes are never returned.""" + all_records = [ + r for r in _records.values() + if r["user_id"] == current_user.id and (table is None or r["table"] == table) + ] + start = (page - 1) * limit + page_records = all_records[start : start + limit] + return [ + _RecordMeta( + id=r["id"], + table=r["table"], + checksum=r["checksum"], + created_at=r["created_at"], + updated_at=r["updated_at"], + ) + for r in page_records + ] + + +@router.get("/records/{record_id}") +async def download_record( + record_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> Response: + """Download an E2E-encrypted blob. Returns raw bytes with ``X-Checksum`` header.""" + record = _get_record_for_user(record_id, current_user.id) + blob = await _blob_store.download(current_user.id, record["s3_key"]) + return Response( + content=blob, + media_type="application/octet-stream", + headers={"X-Checksum": record["checksum"]}, + ) + + +@router.put("/records/{record_id}", response_model=dict) +async def update_record( + record_id: str, + body: StorageRecordUpdate, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Replace the blob for an existing record. Verifies checksum before storing.""" + record = _get_record_for_user(record_id, current_user.id) + reject_if_tampered(body.blob, body.checksum) + + delta = len(body.blob) - record["size_bytes"] + if delta > 0: + _check_quota(current_user.id, current_user.tier, delta) + + s3_key = await _blob_store.upload( + current_user.id, record["table"], record_id, body.blob, body.checksum + ) + + record["s3_key"] = s3_key + record["checksum"] = body.checksum + record["size_bytes"] = len(body.blob) + record["updated_at"] = int(time.time() * 1000) + + return {"ok": True} + + +@router.delete("/records/{record_id}", response_model=dict) +async def delete_record( + record_id: str, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Delete a record and its S3 blob.""" + record = _get_record_for_user(record_id, current_user.id) + await _blob_store.delete(current_user.id, record["s3_key"]) + del _records[record_id] + return {"ok": True} diff --git a/app/api/routes/vectors.py b/app/api/routes/vectors.py new file mode 100644 index 0000000..588d5c0 --- /dev/null +++ b/app/api/routes/vectors.py @@ -0,0 +1,56 @@ +"""Vectors routes: upsert, search, and delete cloud vector store entries.""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends +from pydantic import BaseModel + +from app.api.deps import get_current_user +from app.schemas import ( + UserProfile, + VectorSearchRequest, + VectorSearchResponse, + VectorUpsertRequest, +) +from app.storage.encryption import reject_if_tampered +from app.storage.vector_store import VectorStore + +router = APIRouter(prefix="/storage", tags=["vectors"]) + +_vector_store = VectorStore() + + +class _VectorDeleteRequest(BaseModel): + ids: list[str] + + +@router.post("/vectors/upsert", response_model=dict) +async def upsert_vectors( + body: VectorUpsertRequest, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, int]: + """Verify checksums and store encrypted vectors in the user-scoped namespace.""" + for item in body.vectors: + reject_if_tampered(item.blob, item.checksum) + await _vector_store.upsert(current_user.id, body.vectors) + return {"upserted": len(body.vectors)} + + +@router.post("/vectors/search", response_model=VectorSearchResponse) +async def search_vectors( + body: VectorSearchRequest, + current_user: UserProfile = Depends(get_current_user), +) -> VectorSearchResponse: + """Search the user-scoped vector namespace with an encrypted query blob.""" + results = await _vector_store.search(current_user.id, body.query_blob, body.top_k) + return VectorSearchResponse(results=results) + + +@router.delete("/vectors", response_model=dict) +async def delete_vectors( + body: _VectorDeleteRequest, + current_user: UserProfile = Depends(get_current_user), +) -> dict[str, bool]: + """Delete vectors by ID, scoped to the authenticated user.""" + await _vector_store.delete(current_user.id, body.ids) + return {"ok": True} diff --git a/app/main.py b/app/main.py index 0724d85..30f42b8 100644 --- a/app/main.py +++ b/app/main.py @@ -34,13 +34,16 @@ def create_app() -> FastAPI: allow_headers=["*"], ) - # Routers (registered when implemented) - # from app.api.routes import auth, chat, plans, backup, billing - # app.include_router(auth.router, prefix="/api/v1") - # app.include_router(chat.router, prefix="/api/v1") - # app.include_router(plans.router, prefix="/api/v1") - # app.include_router(backup.router, prefix="/api/v1") - # app.include_router(billing.router, prefix="/api/v1") + from app.api.routes import auth, backup, billing, chat, plans, plugins, storage, vectors + + app.include_router(auth.router, prefix="/api/v1") + app.include_router(chat.router, prefix="/api/v1") + app.include_router(plans.router, prefix="/api/v1") + app.include_router(storage.router, prefix="/api/v1") + app.include_router(vectors.router, prefix="/api/v1") + app.include_router(backup.router, prefix="/api/v1") + app.include_router(plugins.router, prefix="/api/v1") + app.include_router(billing.router, prefix="/api/v1") @app.get("/api/v1/health", tags=["health"]) async def health() -> dict: