"""OAuth 2.0 + PKCE provider abstractions. Each provider implements a three-step flow designed for a desktop (public) client: 1. get_authorization_url(state, code_challenge) → str Build the provider's consent-screen URL. State and code_challenge are generated server-side; the client opens this URL in the system browser. 2. exchange_code(code, code_verifier, redirect_uri) → dict Exchange the short-lived authorization code for an access token. The code_verifier proves ownership of the PKCE challenge. 3. get_userinfo(access_token) → OAuthUserInfo Fetch the canonical user identity from the provider. Currently supported providers: - GoogleOAuthProvider (scope: openid email profile) Adding a new provider: - Implement the three methods above. - Register in _PROVIDERS inside routes/auth.py. """ from __future__ import annotations import base64 import hashlib import os import urllib.parse from dataclasses import dataclass import httpx # ── Data transfer objects ───────────────────────────────────────────── @dataclass class OAuthUserInfo: """Normalized user identity returned by any provider.""" provider_user_id: str email: str email_verified: bool avatar_url: str | None name: str | None # ── PKCE helpers ────────────────────────────────────────────────────── def generate_pkce_pair() -> tuple[str, str]: """Generate a (code_verifier, code_challenge) pair for PKCE S256. The code_verifier is a random 32-byte URL-safe base64 string. The code_challenge is SHA-256(code_verifier) base64url-encoded (no padding). """ code_verifier = base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode() digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() return code_verifier, code_challenge # ── Google provider ─────────────────────────────────────────────────── class GoogleOAuthProvider: """Google OAuth 2.0 provider (openid email profile scope). Uses Google's standard authorization endpoint with PKCE S256. Does NOT use google-auth-oauthlib to keep the flow generic and async. """ name = "google" _AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _TOKEN_URL = "https://oauth2.googleapis.com/token" _USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo" def __init__(self, client_id: str, client_secret: str, redirect_uri: str) -> None: self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri def get_authorization_url(self, state: str, code_challenge: str) -> str: """Build the Google consent-screen URL.""" params = { "client_id": self.client_id, "redirect_uri": self.redirect_uri, "response_type": "code", "scope": "openid email profile", "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "access_type": "offline", "prompt": "select_account", } return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}" async def exchange_code( self, code: str, code_verifier: str, redirect_uri: str ) -> dict: """Exchange authorization code for an access token.""" async with httpx.AsyncClient() as client: response = await client.post( self._TOKEN_URL, data={ "client_id": self.client_id, "client_secret": self.client_secret, "code": code, "code_verifier": code_verifier, "grant_type": "authorization_code", "redirect_uri": redirect_uri, }, ) response.raise_for_status() return response.json() async def get_userinfo(self, access_token: str) -> OAuthUserInfo: """Fetch the authenticated user's identity from Google.""" async with httpx.AsyncClient() as client: response = await client.get( self._USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}, ) response.raise_for_status() data = response.json() return OAuthUserInfo( provider_user_id=data["sub"], email=data["email"], email_verified=data.get("email_verified", False), avatar_url=data.get("picture"), name=data.get("name"), )