Files
adiuva-api/app/api/routes/auth.py
roberto c8ef7b119b Refactor tests for execution plan and add comprehensive storage tests
- Updated `TestModuleSingletons` in `test_execution_plan.py` to reflect new agent templates and playbook names.
- Changed assertions in playbook tests to match updated templates and agents.
- Introduced `test_storage.py` to cover the storage layer, including encryption, BlobStore, and VectorStore functionalities.
- Added tests for S3 interactions, ensuring upload, download, delete, and list operations work as expected.
- Implemented mock tests for Pinecone and Qdrant vector stores to validate upsert, search, and delete operations.
2026-03-02 15:36:09 +01:00

119 lines
4.1 KiB
Python

"""Auth routes: register, login, refresh, me.
Users and refresh tokens are kept in an in-memory dict until Step 12
migrates them to PostgreSQL.
"""
from __future__ import annotations
import time
import uuid
from typing import Any
import bcrypt
from fastapi import APIRouter, Depends, HTTPException, status
from jose import jwt
from pydantic import BaseModel
from app.api.deps import get_current_user
from app.config.settings import settings
from app.schemas import AuthTokens, UserProfile
router = APIRouter(prefix="/auth", tags=["auth"])
# ── In-memory stores (replaced by PostgreSQL in Step 12) ─────────────
_users: dict[str, dict[str, Any]] = {} # email → user record
_refresh_tokens: dict[str, str] = {} # plain token → user_id
# ── Internal helpers ─────────────────────────────────────────────────
def _hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def _verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
def _make_tokens(user_id: str, email: str, tier: str) -> AuthTokens:
now = int(time.time())
access_exp = now + settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60
access_payload = {
"sub": user_id,
"email": email,
"tier": tier,
"exp": access_exp,
"iat": now,
}
access_token = jwt.encode(
access_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM
)
refresh_token = str(uuid.uuid4())
_refresh_tokens[refresh_token] = user_id
return AuthTokens(
access_token=access_token,
refresh_token=refresh_token,
expires_at=access_exp * 1000, # milliseconds for client
)
# ── Request bodies ────────────────────────────────────────────────────
class _RegisterRequest(BaseModel):
email: str
password: str
class _LoginRequest(BaseModel):
email: str
password: str
class _RefreshRequest(BaseModel):
refresh_token: str
# ── Routes ────────────────────────────────────────────────────────────
@router.post("/register", response_model=AuthTokens, status_code=status.HTTP_201_CREATED)
async def register(body: _RegisterRequest) -> AuthTokens:
"""Create a new account and return JWT tokens."""
if body.email in _users:
raise HTTPException(status.HTTP_409_CONFLICT, "Email already registered")
user_id = str(uuid.uuid4())
_users[body.email] = {
"id": user_id,
"email": body.email,
"password_hash": _hash_password(body.password),
"tier": "free",
}
return _make_tokens(user_id, body.email, "free")
@router.post("/login", response_model=AuthTokens)
async def login(body: _LoginRequest) -> AuthTokens:
"""Validate credentials and return JWT tokens."""
user = _users.get(body.email)
if not user or not _verify_password(body.password, user["password_hash"]):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
return _make_tokens(user["id"], user["email"], user["tier"])
@router.post("/refresh", response_model=AuthTokens)
async def refresh(body: _RefreshRequest) -> AuthTokens:
"""Rotate a refresh token and return a new token pair."""
user_id = _refresh_tokens.pop(body.refresh_token, None)
if user_id is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired refresh token")
user = next((u for u in _users.values() if u["id"] == user_id), None)
if user is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
return _make_tokens(user["id"], user["email"], user["tier"])
@router.get("/me", response_model=UserProfile)
async def me(current_user: UserProfile = Depends(get_current_user)) -> UserProfile:
"""Return the profile for the authenticated user."""
return current_user