"""Tests for auth routes: register, login, refresh, me. Exercises the full auth lifecycle through the FastAPI TestClient against the in-memory SQLite test database seeded by ``conftest.py``. """ from __future__ import annotations import time import pytest from jose import jwt from app.config.settings import settings from tests.conftest import auth_header, make_jwt, TEST_USER_IDS # ── TestRegister ────────────────────────────────────────────────────── class TestRegister: """POST /api/v1/auth/register""" def test_register_success(self, client) -> None: resp = client.post( "/api/v1/auth/register", json={"email": "new@example.com", "password": "Str0ngP@ss!"}, ) assert resp.status_code == 201 data = resp.json() assert "access_token" in data assert "refresh_token" in data assert "expires_at" in data # expires_at should be a future millisecond timestamp assert data["expires_at"] > int(time.time() * 1000) def test_register_returns_valid_jwt(self, client) -> None: resp = client.post( "/api/v1/auth/register", json={"email": "jwt-check@example.com", "password": "P@ss1234"}, ) assert resp.status_code == 201 token = resp.json()["access_token"] payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) assert payload["email"] == "jwt-check@example.com" assert payload["tier"] == "free" assert "sub" in payload def test_register_duplicate_email(self, client) -> None: client.post( "/api/v1/auth/register", json={"email": "dupe@example.com", "password": "Pass1234"}, ) resp = client.post( "/api/v1/auth/register", json={"email": "dupe@example.com", "password": "Pass5678"}, ) assert resp.status_code == 409 def test_register_missing_password(self, client) -> None: resp = client.post( "/api/v1/auth/register", json={"email": "no-pass@example.com"}, ) assert resp.status_code == 422 def test_register_missing_email(self, client) -> None: resp = client.post( "/api/v1/auth/register", json={"password": "OnlyPass"}, ) assert resp.status_code == 422 # ── TestLogin ───────────────────────────────────────────────────────── class TestLogin: """POST /api/v1/auth/login""" def _register(self, client, email="login@example.com", password="MyP@ss123"): client.post( "/api/v1/auth/register", json={"email": email, "password": password}, ) def test_login_success(self, client) -> None: self._register(client) resp = client.post( "/api/v1/auth/login", json={"email": "login@example.com", "password": "MyP@ss123"}, ) assert resp.status_code == 200 data = resp.json() assert "access_token" in data assert "refresh_token" in data assert "expires_at" in data def test_login_wrong_password(self, client) -> None: self._register(client) resp = client.post( "/api/v1/auth/login", json={"email": "login@example.com", "password": "WrongPass!"}, ) assert resp.status_code == 401 def test_login_unknown_email(self, client) -> None: resp = client.post( "/api/v1/auth/login", json={"email": "ghost@example.com", "password": "Whatever"}, ) assert resp.status_code == 401 # ── TestRefresh ─────────────────────────────────────────────────────── class TestRefresh: """POST /api/v1/auth/refresh""" def _register_and_get_tokens(self, client, email="refresh@example.com"): resp = client.post( "/api/v1/auth/register", json={"email": email, "password": "RefPass123!"}, ) return resp.json() def test_refresh_returns_new_tokens(self, client) -> None: tokens = self._register_and_get_tokens(client) resp = client.post( "/api/v1/auth/refresh", json={"refresh_token": tokens["refresh_token"]}, ) assert resp.status_code == 200 data = resp.json() assert "access_token" in data assert "refresh_token" in data # New refresh token should differ from old one (rotation) assert data["refresh_token"] != tokens["refresh_token"] def test_refresh_old_token_rejected(self, client) -> None: """After rotation, the original refresh token must be rejected.""" tokens = self._register_and_get_tokens(client, email="rotate@example.com") old_rt = tokens["refresh_token"] # First refresh succeeds and rotates the token client.post("/api/v1/auth/refresh", json={"refresh_token": old_rt}) # Second attempt with the old token must fail resp = client.post("/api/v1/auth/refresh", json={"refresh_token": old_rt}) assert resp.status_code == 401 def test_refresh_bogus_token(self, client) -> None: resp = client.post( "/api/v1/auth/refresh", json={"refresh_token": "not-a-real-token"}, ) assert resp.status_code == 401 # ── TestMe ──────────────────────────────────────────────────────────── class TestMe: """GET /api/v1/auth/me""" def test_me_with_valid_jwt(self, client) -> None: resp = client.get("/api/v1/auth/me", headers=auth_header("power")) assert resp.status_code == 200 data = resp.json() assert data["id"] == TEST_USER_IDS["power"] assert data["email"] == "power@test.com" assert data["tier"] == "power" def test_me_returns_correct_tier(self, client) -> None: """Tier comes from the live subscription row, not the JWT claim.""" resp = client.get("/api/v1/auth/me", headers=auth_header("free")) assert resp.json()["tier"] == "free" def test_me_missing_token(self, client) -> None: resp = client.get("/api/v1/auth/me") assert resp.status_code == 401 def test_me_expired_token(self, client) -> None: """A JWT with ``exp`` in the past must be rejected.""" payload = { "sub": TEST_USER_IDS["power"], "email": "power@test.com", "tier": "power", "exp": int(time.time()) - 3600, # 1 hour ago "iat": int(time.time()) - 7200, } token = jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) resp = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) assert resp.status_code == 401 def test_me_invalid_signature(self, client) -> None: payload = { "sub": TEST_USER_IDS["power"], "email": "power@test.com", "tier": "power", "exp": int(time.time()) + 3600, "iat": int(time.time()), } token = jwt.encode(payload, "wrong-secret", algorithm="HS256") resp = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) assert resp.status_code == 401