196 lines
6.6 KiB
Python
196 lines
6.6 KiB
Python
"""Storage routes: CRUD for E2E-encrypted cloud records.
|
|
|
|
Blobs are stored in S3 via BlobStore. Record metadata is persisted in the
|
|
PostgreSQL ``storage_records`` table.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api.deps import get_current_user
|
|
from app.billing.tier_manager import tier_manager
|
|
from app.db import get_session
|
|
from app.models import StorageRecord
|
|
from app.schemas import StorageRecordCreate, StorageRecordUpdate, UserProfile
|
|
from app.storage.blob_store import BlobStore
|
|
from app.storage.encryption import reject_if_tampered
|
|
|
|
router = APIRouter(prefix="/storage", tags=["storage"])
|
|
|
|
_blob_store = BlobStore()
|
|
|
|
|
|
# ── Local response schemas ─────────────────────────────────────────────
|
|
|
|
class _CreateResponse(BaseModel):
|
|
id: str
|
|
created_at: int
|
|
|
|
|
|
class _RecordMeta(BaseModel):
|
|
id: str
|
|
table: str
|
|
checksum: str
|
|
created_at: int
|
|
updated_at: int
|
|
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────────
|
|
|
|
async def _current_usage_bytes(user_id: str, db: AsyncSession) -> int:
|
|
"""Return total bytes stored by *user_id*."""
|
|
result = await db.execute(
|
|
select(func.coalesce(func.sum(StorageRecord.size_bytes), 0)).where(
|
|
StorageRecord.user_id == user_id
|
|
)
|
|
)
|
|
return int(result.scalar_one())
|
|
|
|
|
|
async def _check_quota(user: UserProfile, additional_bytes: int, db: AsyncSession) -> None:
|
|
"""Raise HTTP 402 if adding *additional_bytes* would exceed the tier limit."""
|
|
current = await _current_usage_bytes(user.id, db)
|
|
tier_manager.enforce_quota(user.tier, current_bytes=current, additional_bytes=additional_bytes)
|
|
|
|
|
|
async def _get_record_for_user(
|
|
record_id: str, user_id: str, db: AsyncSession
|
|
) -> StorageRecord:
|
|
"""Look up a record and verify ownership. Returns 404 on mismatch
|
|
to prevent user enumeration attacks."""
|
|
result = await db.execute(
|
|
select(StorageRecord).where(
|
|
StorageRecord.id == record_id, StorageRecord.user_id == user_id
|
|
)
|
|
)
|
|
record = result.scalar_one_or_none()
|
|
if record is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found")
|
|
return record
|
|
|
|
|
|
# ── Routes ─────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/records", response_model=_CreateResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_record(
|
|
body: StorageRecordCreate,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> _CreateResponse:
|
|
"""Upload a new E2E-encrypted blob. Verifies checksum before storing."""
|
|
reject_if_tampered(body.blob, body.checksum)
|
|
await _check_quota(current_user, len(body.blob), db)
|
|
|
|
record_id = str(uuid.uuid4())
|
|
|
|
s3_key = await _blob_store.upload(
|
|
current_user.id, body.table, record_id, body.blob, body.checksum
|
|
)
|
|
|
|
record = StorageRecord(
|
|
id=record_id,
|
|
user_id=current_user.id,
|
|
table_name=body.table,
|
|
s3_key=s3_key,
|
|
checksum=body.checksum,
|
|
size_bytes=len(body.blob),
|
|
)
|
|
db.add(record)
|
|
await db.commit()
|
|
await db.refresh(record)
|
|
|
|
created_at_ms = int(record.created_at.timestamp() * 1000)
|
|
return _CreateResponse(id=record_id, created_at=created_at_ms)
|
|
|
|
|
|
@router.get("/records", response_model=list[_RecordMeta])
|
|
async def list_records(
|
|
table: str | None = Query(default=None),
|
|
page: int = Query(default=1, ge=1),
|
|
limit: int = Query(default=50, ge=1, le=200),
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> list[_RecordMeta]:
|
|
"""List record metadata for the authenticated user. Blob bytes are never returned."""
|
|
query = select(StorageRecord).where(StorageRecord.user_id == current_user.id)
|
|
if table is not None:
|
|
query = query.where(StorageRecord.table_name == table)
|
|
query = query.offset((page - 1) * limit).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
rows = result.scalars().all()
|
|
|
|
return [
|
|
_RecordMeta(
|
|
id=r.id,
|
|
table=r.table_name,
|
|
checksum=r.checksum,
|
|
created_at=int(r.created_at.timestamp() * 1000),
|
|
updated_at=int(r.updated_at.timestamp() * 1000),
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
@router.get("/records/{record_id}")
|
|
async def download_record(
|
|
record_id: str,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> Response:
|
|
"""Download an E2E-encrypted blob. Returns raw bytes with ``X-Checksum`` header."""
|
|
record = await _get_record_for_user(record_id, current_user.id, db)
|
|
blob = await _blob_store.download(current_user.id, record.s3_key)
|
|
return Response(
|
|
content=blob,
|
|
media_type="application/octet-stream",
|
|
headers={"X-Checksum": record.checksum},
|
|
)
|
|
|
|
|
|
@router.put("/records/{record_id}", response_model=dict)
|
|
async def update_record(
|
|
record_id: str,
|
|
body: StorageRecordUpdate,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> dict[str, bool]:
|
|
"""Replace the blob for an existing record. Verifies checksum before storing."""
|
|
record = await _get_record_for_user(record_id, current_user.id, db)
|
|
reject_if_tampered(body.blob, body.checksum)
|
|
|
|
delta = len(body.blob) - record.size_bytes
|
|
if delta > 0:
|
|
await _check_quota(current_user, delta, db)
|
|
|
|
s3_key = await _blob_store.upload(
|
|
current_user.id, record.table_name, record_id, body.blob, body.checksum
|
|
)
|
|
|
|
record.s3_key = s3_key
|
|
record.checksum = body.checksum
|
|
record.size_bytes = len(body.blob)
|
|
await db.commit()
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/records/{record_id}", response_model=dict)
|
|
async def delete_record(
|
|
record_id: str,
|
|
current_user: UserProfile = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_session),
|
|
) -> dict[str, bool]:
|
|
"""Delete a record and its S3 blob."""
|
|
record = await _get_record_for_user(record_id, current_user.id, db)
|
|
await _blob_store.delete(current_user.id, record.s3_key)
|
|
await db.delete(record)
|
|
await db.commit()
|
|
return {"ok": True}
|