"""Tests for backup routes: upload, download, history, delete. Exercises the backup lifecycle through the FastAPI TestClient against the in-memory SQLite test database and moto-mocked S3 bucket. """ from __future__ import annotations import hashlib import pytest from tests.conftest import auth_header, TEST_USER_IDS # ── Helpers ─────────────────────────────────────────────────────────── _BLOB = b"encrypted-backup-blob-opaque-bytes" _CHECKSUM = hashlib.sha256(_BLOB).hexdigest() _VERSION = 1 _TIMESTAMP = 1700000000000 # arbitrary ms timestamp def _backup_headers(tier: str = "power", **overrides) -> dict[str, str]: """Return auth + backup metadata headers.""" headers = auth_header(tier) headers["X-Backup-Version"] = str(overrides.get("version", _VERSION)) headers["X-Backup-Timestamp"] = str(overrides.get("timestamp", _TIMESTAMP)) headers["X-Backup-Checksum"] = overrides.get("checksum", _CHECKSUM) headers["Content-Type"] = "application/octet-stream" return headers def _upload(client, tier="power", **overrides) -> "Response": # noqa: F821 """Upload a backup blob and return the response.""" return client.put( "/api/v1/backup", content=overrides.pop("blob", _BLOB), headers=_backup_headers(tier, **overrides), ) # ── TestUploadBackup ────────────────────────────────────────────────── class TestUploadBackup: """PUT /api/v1/backup""" def test_upload_success(self, client, s3_bucket) -> None: resp = _upload(client, tier="power") assert resp.status_code == 200 assert resp.json() == {"ok": True} def test_upload_creates_history_entry(self, client, s3_bucket) -> None: _upload(client, tier="power") history = client.get( "/api/v1/backup/history", headers=auth_header("power") ).json() assert len(history) == 1 assert history[0]["version"] == _VERSION assert history[0]["timestamp"] == _TIMESTAMP assert history[0]["checksum"] == _CHECKSUM def test_upload_bad_checksum(self, client, s3_bucket) -> None: resp = _upload(client, tier="power", checksum="0" * 64) assert resp.status_code == 400 def test_upload_free_tier_blocked(self, client, s3_bucket) -> None: """Free tier has backup_gb=0 → should return 402.""" resp = _upload(client, tier="free") assert resp.status_code == 402 def test_upload_pro_tier_allowed(self, client, s3_bucket) -> None: """Pro tier has backup_gb=5 → small blob succeeds.""" resp = _upload(client, tier="pro") assert resp.status_code == 200 # ── TestDownloadBackup ──────────────────────────────────────────────── class TestDownloadBackup: """GET /api/v1/backup""" def test_download_latest(self, client, s3_bucket) -> None: _upload(client, tier="power") resp = client.get("/api/v1/backup", headers=auth_header("power")) assert resp.status_code == 200 assert resp.content == _BLOB assert resp.headers["X-Checksum"] == _CHECKSUM assert resp.headers["X-Backup-Version"] == str(_VERSION) def test_download_no_backup_returns_404(self, client, s3_bucket) -> None: resp = client.get("/api/v1/backup", headers=auth_header("power")) assert resp.status_code == 404 def test_download_if_modified_since_returns_304(self, client, s3_bucket) -> None: """When If-Modified-Since is after the backup timestamp → 304.""" _upload(client, tier="power", timestamp=1700000000000) resp = client.get( "/api/v1/backup", headers={ **auth_header("power"), "If-Modified-Since": "Thu, 01 Jan 2099 00:00:00 GMT", }, ) assert resp.status_code == 304 def test_download_if_modified_since_returns_200(self, client, s3_bucket) -> None: """When If-Modified-Since is before the backup timestamp → serve blob.""" _upload(client, tier="power", timestamp=1700000000000) resp = client.get( "/api/v1/backup", headers={ **auth_header("power"), "If-Modified-Since": "Thu, 01 Jan 2000 00:00:00 GMT", }, ) assert resp.status_code == 200 assert resp.content == _BLOB def test_download_multiple_returns_latest(self, client, s3_bucket) -> None: """When multiple backups exist, GET returns the one with the highest timestamp.""" _upload(client, tier="power", timestamp=1000) blob2 = b"second-encrypted-backup" checksum2 = hashlib.sha256(blob2).hexdigest() _upload(client, tier="power", timestamp=2000, blob=blob2, checksum=checksum2) resp = client.get("/api/v1/backup", headers=auth_header("power")) assert resp.status_code == 200 assert resp.content == blob2 # ── TestBackupHistory ───────────────────────────────────────────────── class TestBackupHistory: """GET /api/v1/backup/history""" def test_history_empty(self, client, s3_bucket) -> None: resp = client.get("/api/v1/backup/history", headers=auth_header("power")) assert resp.status_code == 200 assert resp.json() == [] def test_history_returns_entries(self, client, s3_bucket) -> None: _upload(client, tier="power", timestamp=1000) _upload(client, tier="power", timestamp=2000) history = client.get( "/api/v1/backup/history", headers=auth_header("power") ).json() assert len(history) == 2 # Ordered by timestamp descending assert history[0]["timestamp"] == 2000 assert history[1]["timestamp"] == 1000 def test_history_isolated_per_user(self, client, s3_bucket) -> None: """One user's backups should not appear in another user's history.""" _upload(client, tier="power") resp = client.get("/api/v1/backup/history", headers=auth_header("team")) assert resp.json() == [] # ── TestDeleteBackup ────────────────────────────────────────────────── class TestDeleteBackup: """DELETE /api/v1/backup/{backup_id}""" def _get_backup_id(self, client, tier="power") -> str: """Upload a backup and return its DB id from history.""" _upload(client, tier=tier) history = client.get( "/api/v1/backup/history", headers=auth_header(tier) ).json() # History returns BackupMetadata schema which doesn't have `id`. # We need to look it up via a different means. # Since there's only 1 backup, find via history length. # Actually the schema doesn't return id — let's verify via re-download. # We'll use a workaround: upload, then list history to confirm it exists, # then try to delete — but we need the id... # Let's check if history includes an id field. # The schema is: version, timestamp, checksum, chunk_count — no id. # We'll need to query the DB directly or use a known ID. # For testing, we'll search history then use the DB. return None # pragma: no cover — overridden below def test_delete_success(self, client, s3_bucket, db_session) -> None: _upload(client, tier="power") # Discover the backup_id via direct DB query import asyncio from sqlalchemy import select from app.models import BackupMetadata async def _get_id(): result = await db_session.execute( select(BackupMetadata.id).where( BackupMetadata.user_id == TEST_USER_IDS["power"] ) ) return result.scalar_one() backup_id = asyncio.get_event_loop().run_until_complete(_get_id()) resp = client.delete( f"/api/v1/backup/{backup_id}", headers=auth_header("power") ) assert resp.status_code == 200 assert resp.json() == {"ok": True} # History should now be empty history = client.get( "/api/v1/backup/history", headers=auth_header("power") ).json() assert history == [] def test_delete_nonexistent(self, client, s3_bucket) -> None: resp = client.delete( "/api/v1/backup/no-such-id", headers=auth_header("power") ) assert resp.status_code == 404 def test_delete_other_users_backup(self, client, s3_bucket, db_session) -> None: """Cannot delete another user's backup (ownership check returns 404).""" _upload(client, tier="power") import asyncio from sqlalchemy import select from app.models import BackupMetadata async def _get_id(): result = await db_session.execute( select(BackupMetadata.id).where( BackupMetadata.user_id == TEST_USER_IDS["power"] ) ) return result.scalar_one() backup_id = asyncio.get_event_loop().run_until_complete(_get_id()) # team user tries to delete power user's backup → 404 resp = client.delete( f"/api/v1/backup/{backup_id}", headers=auth_header("team") ) assert resp.status_code == 404