Step 1 of Google login integration: Alembic migration for oauth_accounts + avatar_url on users, OAuthAccount model with User relationship, UserProfile schema extended with avatar_url, get_current_user updated to include avatar_url. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
"""OAuth 2.0 + PKCE provider abstractions.
|
|
|
|
Each provider implements a three-step flow designed for a desktop (public) client:
|
|
|
|
1. get_authorization_url(state, code_challenge) → str
|
|
Build the provider's consent-screen URL. State and code_challenge are
|
|
generated server-side; the client opens this URL in the system browser.
|
|
|
|
2. exchange_code(code, code_verifier, redirect_uri) → dict
|
|
Exchange the short-lived authorization code for an access token.
|
|
The code_verifier proves ownership of the PKCE challenge.
|
|
|
|
3. get_userinfo(access_token) → OAuthUserInfo
|
|
Fetch the canonical user identity from the provider.
|
|
|
|
Currently supported providers:
|
|
- GoogleOAuthProvider (scope: openid email profile)
|
|
|
|
Adding a new provider:
|
|
- Implement the three methods above.
|
|
- Register in _PROVIDERS inside routes/auth.py.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import urllib.parse
|
|
from dataclasses import dataclass
|
|
|
|
import httpx
|
|
|
|
|
|
# ── Data transfer objects ─────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class OAuthUserInfo:
|
|
"""Normalized user identity returned by any provider."""
|
|
|
|
provider_user_id: str
|
|
email: str
|
|
email_verified: bool
|
|
avatar_url: str | None
|
|
name: str | None
|
|
|
|
|
|
# ── PKCE helpers ──────────────────────────────────────────────────────
|
|
|
|
|
|
def generate_pkce_pair() -> tuple[str, str]:
|
|
"""Generate a (code_verifier, code_challenge) pair for PKCE S256.
|
|
|
|
The code_verifier is a random 32-byte URL-safe base64 string.
|
|
The code_challenge is SHA-256(code_verifier) base64url-encoded (no padding).
|
|
"""
|
|
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode()
|
|
digest = hashlib.sha256(code_verifier.encode()).digest()
|
|
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
|
|
return code_verifier, code_challenge
|
|
|
|
|
|
# ── Google provider ───────────────────────────────────────────────────
|
|
|
|
|
|
class GoogleOAuthProvider:
|
|
"""Google OAuth 2.0 provider (openid email profile scope).
|
|
|
|
Uses Google's standard authorization endpoint with PKCE S256.
|
|
Does NOT use google-auth-oauthlib to keep the flow generic and async.
|
|
"""
|
|
|
|
name = "google"
|
|
|
|
_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
|
|
_TOKEN_URL = "https://oauth2.googleapis.com/token"
|
|
_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
|
|
|
|
def __init__(self, client_id: str, client_secret: str, redirect_uri: str) -> None:
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.redirect_uri = redirect_uri
|
|
|
|
def get_authorization_url(self, state: str, code_challenge: str) -> str:
|
|
"""Build the Google consent-screen URL."""
|
|
params = {
|
|
"client_id": self.client_id,
|
|
"redirect_uri": self.redirect_uri,
|
|
"response_type": "code",
|
|
"scope": "openid email profile",
|
|
"state": state,
|
|
"code_challenge": code_challenge,
|
|
"code_challenge_method": "S256",
|
|
"access_type": "offline",
|
|
"prompt": "select_account",
|
|
}
|
|
return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
|
|
|
|
async def exchange_code(
|
|
self, code: str, code_verifier: str, redirect_uri: str
|
|
) -> dict:
|
|
"""Exchange authorization code for an access token."""
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
self._TOKEN_URL,
|
|
data={
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"code": code,
|
|
"code_verifier": code_verifier,
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": redirect_uri,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_userinfo(self, access_token: str) -> OAuthUserInfo:
|
|
"""Fetch the authenticated user's identity from Google."""
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
self._USERINFO_URL,
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return OAuthUserInfo(
|
|
provider_user_id=data["sub"],
|
|
email=data["email"],
|
|
email_verified=data.get("email_verified", False),
|
|
avatar_url=data.get("picture"),
|
|
name=data.get("name"),
|
|
)
|