186 lines
6.2 KiB
Python
186 lines
6.2 KiB
Python
"""Storage routes: CRUD for E2E-encrypted cloud records.
|
|
|
|
Blobs are stored in S3 via BlobStore. Record metadata is kept in an
|
|
in-memory dict until Step 12 migrates it to PostgreSQL (storage_records table).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
|
|
from pydantic import BaseModel
|
|
|
|
from app.api.deps import get_current_user
|
|
from app.schemas import StorageRecordCreate, StorageRecordUpdate, UserProfile
|
|
from app.storage.blob_store import BlobStore
|
|
from app.storage.encryption import reject_if_tampered
|
|
|
|
router = APIRouter(prefix="/storage", tags=["storage"])
|
|
|
|
_blob_store = BlobStore()
|
|
|
|
# In-memory record metadata — replaced by PostgreSQL storage_records table in Step 12
|
|
_records: dict[str, dict[str, Any]] = {}
|
|
|
|
# TODO(Step11/12): replace with TierManager.check_quota(user_id)
|
|
_TIER_STORAGE_LIMITS_GB: dict[str, int] = {
|
|
"free": 0,
|
|
"pro": 5,
|
|
"power": 25,
|
|
"team": -1, # unlimited
|
|
}
|
|
|
|
|
|
# ── Local response schemas ─────────────────────────────────────────────
|
|
|
|
class _CreateResponse(BaseModel):
|
|
id: str
|
|
created_at: int
|
|
|
|
|
|
class _RecordMeta(BaseModel):
|
|
id: str
|
|
table: str
|
|
checksum: str
|
|
created_at: int
|
|
updated_at: int
|
|
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────────
|
|
|
|
def _check_quota(user_id: str, tier: str, additional_bytes: int) -> None:
|
|
"""Raise HTTP 402 if adding ``additional_bytes`` would exceed the tier limit."""
|
|
limit_gb = _TIER_STORAGE_LIMITS_GB.get(tier, 0)
|
|
if limit_gb == -1:
|
|
return # unlimited
|
|
limit_bytes = limit_gb * 1024**3
|
|
used = sum(r["size_bytes"] for r in _records.values() if r["user_id"] == user_id)
|
|
if used + additional_bytes > limit_bytes:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_402_PAYMENT_REQUIRED,
|
|
detail=f"Storage quota exceeded for tier '{tier}'",
|
|
)
|
|
|
|
|
|
def _get_record_for_user(record_id: str, user_id: str) -> dict[str, Any]:
|
|
"""Look up a record and verify ownership. Always returns 404 on mismatch
|
|
to prevent user enumeration attacks."""
|
|
record = _records.get(record_id)
|
|
if record is None or record["user_id"] != user_id:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found")
|
|
return record
|
|
|
|
|
|
# ── Routes ─────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/records", response_model=_CreateResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_record(
|
|
body: StorageRecordCreate,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
) -> _CreateResponse:
|
|
"""Upload a new E2E-encrypted blob. Verifies checksum before storing."""
|
|
reject_if_tampered(body.blob, body.checksum)
|
|
_check_quota(current_user.id, current_user.tier, len(body.blob))
|
|
|
|
record_id = str(uuid.uuid4())
|
|
now = int(time.time() * 1000)
|
|
|
|
s3_key = await _blob_store.upload(
|
|
current_user.id, body.table, record_id, body.blob, body.checksum
|
|
)
|
|
|
|
_records[record_id] = {
|
|
"id": record_id,
|
|
"user_id": current_user.id,
|
|
"table": body.table,
|
|
"s3_key": s3_key,
|
|
"checksum": body.checksum,
|
|
"size_bytes": len(body.blob),
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
return _CreateResponse(id=record_id, created_at=now)
|
|
|
|
|
|
@router.get("/records", response_model=list[_RecordMeta])
|
|
async def list_records(
|
|
table: str | None = Query(default=None),
|
|
page: int = Query(default=1, ge=1),
|
|
limit: int = Query(default=50, ge=1, le=200),
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
) -> list[_RecordMeta]:
|
|
"""List record metadata for the authenticated user. Blob bytes are never returned."""
|
|
all_records = [
|
|
r for r in _records.values()
|
|
if r["user_id"] == current_user.id and (table is None or r["table"] == table)
|
|
]
|
|
start = (page - 1) * limit
|
|
page_records = all_records[start : start + limit]
|
|
return [
|
|
_RecordMeta(
|
|
id=r["id"],
|
|
table=r["table"],
|
|
checksum=r["checksum"],
|
|
created_at=r["created_at"],
|
|
updated_at=r["updated_at"],
|
|
)
|
|
for r in page_records
|
|
]
|
|
|
|
|
|
@router.get("/records/{record_id}")
|
|
async def download_record(
|
|
record_id: str,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
) -> Response:
|
|
"""Download an E2E-encrypted blob. Returns raw bytes with ``X-Checksum`` header."""
|
|
record = _get_record_for_user(record_id, current_user.id)
|
|
blob = await _blob_store.download(current_user.id, record["s3_key"])
|
|
return Response(
|
|
content=blob,
|
|
media_type="application/octet-stream",
|
|
headers={"X-Checksum": record["checksum"]},
|
|
)
|
|
|
|
|
|
@router.put("/records/{record_id}", response_model=dict)
|
|
async def update_record(
|
|
record_id: str,
|
|
body: StorageRecordUpdate,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
) -> dict[str, bool]:
|
|
"""Replace the blob for an existing record. Verifies checksum before storing."""
|
|
record = _get_record_for_user(record_id, current_user.id)
|
|
reject_if_tampered(body.blob, body.checksum)
|
|
|
|
delta = len(body.blob) - record["size_bytes"]
|
|
if delta > 0:
|
|
_check_quota(current_user.id, current_user.tier, delta)
|
|
|
|
s3_key = await _blob_store.upload(
|
|
current_user.id, record["table"], record_id, body.blob, body.checksum
|
|
)
|
|
|
|
record["s3_key"] = s3_key
|
|
record["checksum"] = body.checksum
|
|
record["size_bytes"] = len(body.blob)
|
|
record["updated_at"] = int(time.time() * 1000)
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/records/{record_id}", response_model=dict)
|
|
async def delete_record(
|
|
record_id: str,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
) -> dict[str, bool]:
|
|
"""Delete a record and its S3 blob."""
|
|
record = _get_record_for_user(record_id, current_user.id)
|
|
await _blob_store.delete(current_user.id, record["s3_key"])
|
|
del _records[record_id]
|
|
return {"ok": True}
|