"""Auth routes: register, login, refresh, me. Users and refresh tokens are kept in an in-memory dict until Step 12 migrates them to PostgreSQL. """ from __future__ import annotations import time import uuid from typing import Any import bcrypt from fastapi import APIRouter, Depends, HTTPException, status from jose import jwt from pydantic import BaseModel from app.api.deps import get_current_user from app.config.settings import settings from app.schemas import AuthTokens, UserProfile router = APIRouter(prefix="/auth", tags=["auth"]) # ── In-memory stores (replaced by PostgreSQL in Step 12) ───────────── _users: dict[str, dict[str, Any]] = {} # email → user record _refresh_tokens: dict[str, str] = {} # plain token → user_id # ── Internal helpers ───────────────────────────────────────────────── def _hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def _verify_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode(), hashed.encode()) def _make_tokens(user_id: str, email: str, tier: str) -> AuthTokens: now = int(time.time()) access_exp = now + settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60 access_payload = { "sub": user_id, "email": email, "tier": tier, "exp": access_exp, "iat": now, } access_token = jwt.encode( access_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM ) refresh_token = str(uuid.uuid4()) _refresh_tokens[refresh_token] = user_id return AuthTokens( access_token=access_token, refresh_token=refresh_token, expires_at=access_exp * 1000, # milliseconds for client ) # ── Request bodies ──────────────────────────────────────────────────── class _RegisterRequest(BaseModel): email: str password: str class _LoginRequest(BaseModel): email: str password: str class _RefreshRequest(BaseModel): refresh_token: str # ── Routes ──────────────────────────────────────────────────────────── @router.post("/register", response_model=AuthTokens, status_code=status.HTTP_201_CREATED) async def register(body: _RegisterRequest) -> AuthTokens: """Create a new account and return JWT tokens.""" if body.email in _users: raise HTTPException(status.HTTP_409_CONFLICT, "Email already registered") user_id = str(uuid.uuid4()) _users[body.email] = { "id": user_id, "email": body.email, "password_hash": _hash_password(body.password), "tier": "free", } return _make_tokens(user_id, body.email, "free") @router.post("/login", response_model=AuthTokens) async def login(body: _LoginRequest) -> AuthTokens: """Validate credentials and return JWT tokens.""" user = _users.get(body.email) if not user or not _verify_password(body.password, user["password_hash"]): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials") return _make_tokens(user["id"], user["email"], user["tier"]) @router.post("/refresh", response_model=AuthTokens) async def refresh(body: _RefreshRequest) -> AuthTokens: """Rotate a refresh token and return a new token pair.""" user_id = _refresh_tokens.pop(body.refresh_token, None) if user_id is None: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired refresh token") user = next((u for u in _users.values() if u["id"] == user_id), None) if user is None: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found") return _make_tokens(user["id"], user["email"], user["tier"]) @router.get("/me", response_model=UserProfile) async def me(current_user: UserProfile = Depends(get_current_user)) -> UserProfile: """Return the profile for the authenticated user.""" return current_user