From f64ca1188898748431d582e57bcdfa6be91a950c Mon Sep 17 00:00:00 2001 From: Roberto Date: Wed, 10 Jun 2026 18:23:52 +0200 Subject: [PATCH] =?UTF-8?q?feat(scouts):=20pending-session=20Gmail=20OAuth?= =?UTF-8?q?=20=E2=80=94=20create=20cloud=20scout=20at=20finalize?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor _pending_scout_oauth_states from a tuple to a dict carrying mode (reconnect|create), draft fields, and a transient encrypted token. Add authorize-draft, session-labels, and cloud/finalize endpoints so the scout row is created only when the flow completes — abandoned flows leave no orphan rows. Zero-trust: the encrypted token lives only in the in-memory session (<=15 min) until finalize persists it. Co-Authored-By: Claude Opus 4.8 --- app/api/routes/scouts.py | 265 ++++++++++++++++++++++++++++----- app/scouts/connectors/gmail.py | 15 +- tests/test_scout_cloud_crud.py | 82 +++++++++- 3 files changed, 322 insertions(+), 40 deletions(-) diff --git a/app/api/routes/scouts.py b/app/api/routes/scouts.py index 68a64b4..9c07932 100644 --- a/app/api/routes/scouts.py +++ b/app/api/routes/scouts.py @@ -39,7 +39,7 @@ from app.core.scout_runner import is_agent_running, run_local_agent from app.core.device_manager import device_manager from app.core.note_summarizer import generate_note_summary from app.db import get_session -from app.integrations import encrypt_token +from app.integrations import decrypt_token, encrypt_token from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig from app.scouts.connectors.registry import get_connector from app.schemas import ( @@ -435,11 +435,35 @@ _GMAIL_SCOUT_SCOPES = [ _GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" -# In-memory pending OAuth states for scout Gmail consent: -# state → (code_verifier, scout_id, user_id, expires_at_epoch_s) -# Production note: replace with Redis for multi-process deployments. -_pending_scout_oauth_states: dict[str, tuple[str, str, str, float]] = {} -_SCOUT_OAUTH_TTL_SECONDS = 600 # 10 minutes +# In-memory pending OAuth states for scout Gmail consent. +# +# state → { +# "code_verifier": str, +# "user_id": str, +# "expires_at": float (epoch seconds), +# "mode": "reconnect" | "create", +# "scout_id": str | None, # set for reconnect mode +# "draft": {name, prompt_template, auto_trash_spam} | None, # set for create mode +# "token_encrypted": str | None, # populated after a successful create-mode callback +# "gmail_address": str | None, +# } +# +# Zero-trust: in create mode the encrypted Gmail token lives ONLY here, in +# process memory, for at most _SCOUT_OAUTH_TTL_SECONDS. It is persisted to the +# DB only when the user finalizes the scout (POST /scouts/cloud/finalize). +# An abandoned/errored flow leaves no scout row and no stored token. +# +# Production note: this in-memory store is single-process only — replace with +# Redis (keyed by state, TTL'd) for multi-worker deployments. +_pending_scout_oauth_states: dict[str, dict] = {} +_SCOUT_OAUTH_TTL_SECONDS = 900 # 15 minutes + + +def _purge_expired_oauth_states() -> None: + now = time.time() + expired = [s for s, e in _pending_scout_oauth_states.items() if e.get("expires_at", 0) < now] + for s in expired: + del _pending_scout_oauth_states[s] def _scout_gmail_redirect_uri() -> str: @@ -463,6 +487,34 @@ class _ScoutGmailCallbackBody(BaseModel): state: str +class _ScoutGmailAuthorizeDraftBody(BaseModel): + name: str + prompt_template: str = "" + auto_trash_spam: bool = False + + +class _ScoutGmailFinalizeBody(BaseModel): + session: str + filter_config: dict | None = None + + +def _build_gmail_authorize_url(state: str, code_challenge: str) -> str: + """Build the Google consent URL for the scout Gmail flow (shared by both modes).""" + redirect_uri = _scout_gmail_redirect_uri() + params = { + "client_id": settings.GOOGLE_AUTH_CLIENT_ID, + "redirect_uri": redirect_uri, + "response_type": "code", + "scope": " ".join(_GMAIL_SCOUT_SCOPES), + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "access_type": "offline", + "prompt": "consent", + } + return f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}" + + @router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse) async def scout_gmail_oauth_authorize( scout_id: str, @@ -483,28 +535,63 @@ async def scout_gmail_oauth_authorize( code_verifier, code_challenge = generate_pkce_pair() state = secrets.token_urlsafe(32) - # Purge expired states to prevent unbounded growth. - now = time.time() - expired = [s for s, (_, _, _, exp) in _pending_scout_oauth_states.items() if exp < now] - for s in expired: - del _pending_scout_oauth_states[s] + _purge_expired_oauth_states() - _pending_scout_oauth_states[state] = (code_verifier, scout_id, current_user.id, now + _SCOUT_OAUTH_TTL_SECONDS) - - redirect_uri = _scout_gmail_redirect_uri() - params = { - "client_id": settings.GOOGLE_AUTH_CLIENT_ID, - "redirect_uri": redirect_uri, - "response_type": "code", - "scope": " ".join(_GMAIL_SCOUT_SCOPES), - "state": state, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - "access_type": "offline", - "prompt": "consent", + _pending_scout_oauth_states[state] = { + "code_verifier": code_verifier, + "user_id": current_user.id, + "expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS, + "mode": "reconnect", + "scout_id": scout_id, + "draft": None, + "token_encrypted": None, + "gmail_address": None, } - authorize_url = f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}" - return _ScoutGmailAuthorizeResponse(authorize_url=authorize_url) + + return _ScoutGmailAuthorizeResponse( + authorize_url=_build_gmail_authorize_url(state, code_challenge) + ) + + +@router.post("/oauth/gmail/authorize-draft", response_model=_ScoutGmailAuthorizeResponse) +async def scout_gmail_oauth_authorize_draft( + body: _ScoutGmailAuthorizeDraftBody, + current_user: UserProfile = Depends(get_current_user), +) -> _ScoutGmailAuthorizeResponse: + """Start the Gmail OAuth flow in *creation* mode — no scout row exists yet. + + The draft scout fields are held in the pending OAuth session; the scout is + only created once the user finalizes (POST /scouts/cloud/finalize). + """ + if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET: + raise HTTPException( + status.HTTP_503_SERVICE_UNAVAILABLE, + "Google OAuth is not configured on this server", + ) + + code_verifier, code_challenge = generate_pkce_pair() + state = secrets.token_urlsafe(32) + + _purge_expired_oauth_states() + + _pending_scout_oauth_states[state] = { + "code_verifier": code_verifier, + "user_id": current_user.id, + "expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS, + "mode": "create", + "scout_id": None, + "draft": { + "name": body.name, + "prompt_template": body.prompt_template, + "auto_trash_spam": body.auto_trash_spam, + }, + "token_encrypted": None, + "gmail_address": None, + } + + return _ScoutGmailAuthorizeResponse( + authorize_url=_build_gmail_authorize_url(state, code_challenge) + ) @router.get("/oauth/gmail/web-callback", include_in_schema=False) @@ -531,10 +618,16 @@ async def scout_gmail_oauth_callback( the ``code`` and ``state`` params. """ entry = _pending_scout_oauth_states.pop(body.state, None) - if entry is None or entry[3] < time.time() or entry[2] != current_user.id: + if ( + entry is None + or entry["expires_at"] < time.time() + or entry["user_id"] != current_user.id + ): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state") - code_verifier, scout_id, _, _ = entry + code_verifier = entry["code_verifier"] + mode = entry["mode"] + scout_id = entry.get("scout_id") redirect_uri = _scout_gmail_redirect_uri() @@ -572,12 +665,8 @@ async def scout_gmail_oauth_callback( } encrypted = encrypt_token(creds_dict) - scout = await db.get(CloudScoutConfig, scout_id) - if scout is None or scout.user_id != current_user.id: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found") - scout.oauth_token_encrypted = encrypted - # Fetch the connected Gmail address for display. + gmail_address: str | None = None try: from googleapiclient.discovery import build from google.oauth2.credentials import Credentials @@ -595,9 +684,25 @@ async def scout_gmail_oauth_callback( profile = service.users().getProfile(userId="me").execute() return profile.get("emailAddress") - scout.gmail_address = await asyncio.to_thread(_fetch_email) + gmail_address = await asyncio.to_thread(_fetch_email) except Exception: - logger.exception("failed to fetch gmail address for scout %s", scout_id) + logger.exception("failed to fetch gmail address (mode=%s)", mode) + + if mode == "create": + # Do NOT create a scout yet. Hold the encrypted token + address in the + # transient in-memory session; the scout is created at finalize. + entry["token_encrypted"] = encrypted + entry["gmail_address"] = gmail_address + entry["expires_at"] = time.time() + _SCOUT_OAUTH_TTL_SECONDS + _pending_scout_oauth_states[body.state] = entry + return {"ok": True, "session_id": body.state, "gmail_address": gmail_address} + + # mode == "reconnect": update the existing scout in place. + scout = await db.get(CloudScoutConfig, scout_id) + if scout is None or scout.user_id != current_user.id: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found") + scout.oauth_token_encrypted = encrypted + scout.gmail_address = gmail_address await db.commit() @@ -611,4 +716,92 @@ async def scout_gmail_oauth_callback( except Exception: logger.exception("setup_watch failed for scout %s", scout_id) - return {"ok": True} + return {"ok": True, "session_id": None, "gmail_address": gmail_address} + + +@router.get("/oauth/gmail/session-labels") +async def scout_gmail_session_labels( + session: str, + current_user: UserProfile = Depends(get_current_user), +) -> list[dict]: + """List Gmail labels for a pending create-mode OAuth session (no scout row yet). + + Builds a Gmail service from the session's transient decrypted token. + Returns [] on any error. + """ + entry = _pending_scout_oauth_states.get(session) + if ( + entry is None + or entry["expires_at"] < time.time() + or entry["user_id"] != current_user.id + or entry.get("token_encrypted") is None + ): + raise HTTPException(status.HTTP_404_NOT_FOUND, "Session not found or expired") + + try: + from app.scouts.connectors.gmail import _gmail_service_from_token + + creds = decrypt_token(entry["token_encrypted"]) + + def _sync() -> list[dict]: + service = _gmail_service_from_token(creds) + resp = service.users().labels().list(userId="me").execute() + return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])] + + return await asyncio.to_thread(_sync) + except Exception: + logger.exception("session-labels failed for session %s", session) + return [] + + +@router.post("/cloud/finalize", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED) +async def finalize_cloud_scout( + body: _ScoutGmailFinalizeBody, + db: AsyncSession = Depends(get_session), + current_user: UserProfile = Depends(get_current_user), +): + """Create the cloud scout from a completed create-mode OAuth session. + + This is the only path that persists the Gmail token for a newly-created + scout. Abandoned flows never reach here, so they leave no orphan rows. + """ + entry = _pending_scout_oauth_states.pop(body.session, None) + if ( + entry is None + or entry["expires_at"] < time.time() + or entry["user_id"] != current_user.id + or entry.get("mode") != "create" + or entry.get("token_encrypted") is None + ): + raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth session") + + draft = entry["draft"] or {} + scout = CloudScoutConfig( + id=str(uuid.uuid4()), + user_id=current_user.id, + provider="gmail", + name=draft.get("name", ""), + data_types=[], + prompt_template=draft.get("prompt_template", ""), + filter_config=body.filter_config, + schedule_cron=_DEFAULT_CLOUD_SCHEDULE, + auto_trash_spam=draft.get("auto_trash_spam", False), + enabled=True, + oauth_token_encrypted=entry["token_encrypted"], + gmail_address=entry.get("gmail_address"), + ) + db.add(scout) + await db.commit() + await db.refresh(scout) + + # Best-effort Gmail push watch — failure must not block scout creation. + try: + connector = get_connector("gmail") + await connector.setup_watch(scout) + await db.commit() + except KeyError: + logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout.id) + except Exception: + logger.exception("setup_watch failed for scout %s", scout.id) + + return _to_cloud_response(scout) diff --git a/app/scouts/connectors/gmail.py b/app/scouts/connectors/gmail.py index 8dd7c65..7b5d7cc 100644 --- a/app/scouts/connectors/gmail.py +++ b/app/scouts/connectors/gmail.py @@ -45,12 +45,15 @@ def _extract_plain_text_body(payload: dict) -> str: return "" -def _get_gmail_service(scout): - """Return a synchronous Google API client for low-level metadata/history calls.""" +def _gmail_service_from_token(creds_info: dict): + """Build a synchronous Gmail API client from a decrypted credentials dict. + + Shared by ``_get_gmail_service`` (scout-backed) and the pending-session + OAuth flow which has a raw token but no scout row yet. + """ from googleapiclient.discovery import build from google.oauth2.credentials import Credentials - creds_info = decrypt_token(scout.oauth_token_encrypted) credentials = Credentials( token=creds_info.get("token"), refresh_token=creds_info.get("refresh_token"), @@ -62,6 +65,12 @@ def _get_gmail_service(scout): return build("gmail", "v1", credentials=credentials, cache_discovery=False) +def _get_gmail_service(scout): + """Return a synchronous Google API client for low-level metadata/history calls.""" + creds_info = decrypt_token(scout.oauth_token_encrypted) + return _gmail_service_from_token(creds_info) + + class GmailConnector: source_type = "gmail" diff --git a/tests/test_scout_cloud_crud.py b/tests/test_scout_cloud_crud.py index e8ea12b..6310850 100644 --- a/tests/test_scout_cloud_crud.py +++ b/tests/test_scout_cloud_crud.py @@ -2,16 +2,19 @@ from __future__ import annotations +import time import uuid from unittest.mock import AsyncMock, patch import pytest from httpx import ASGITransport, AsyncClient +from sqlalchemy import select from app.db import get_session +from app.integrations import encrypt_token from app.main import app from app.models import CloudScoutConfig -from tests.conftest import _TestSessionLocal, make_jwt +from tests.conftest import _TestSessionLocal, make_jwt, TEST_USER_IDS def _auth_headers(tier: str = "power") -> dict: @@ -147,3 +150,80 @@ async def test_gmail_disconnect_clears_token(): assert body["oauth_connected"] is False assert body["gmail_address"] is None assert body["enabled"] is False + + +# ── Pending-session create-at-finalize flow ─────────────────────────────────── + + +@pytest.mark.asyncio +async def test_authorize_draft_returns_url_and_no_scout_created(): + from app.config.settings import settings as app_settings + + with patch.object(app_settings, "GOOGLE_AUTH_CLIENT_ID", "cid"), \ + patch.object(app_settings, "GOOGLE_AUTH_CLIENT_SECRET", "secret"): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post( + "/api/v1/scouts/oauth/gmail/authorize-draft", + json={"name": "Draft Inbox", "prompt_template": "invoices", "auto_trash_spam": True}, + headers=_auth_headers(), + ) + assert resp.status_code == 200, resp.text + assert resp.json()["authorize_url"].startswith("https://accounts.google.com/") + + # No scout row should have been created by authorize-draft. + async with _TestSessionLocal() as session: + rows = (await session.execute( + select(CloudScoutConfig).where( + CloudScoutConfig.user_id == TEST_USER_IDS["power"], + CloudScoutConfig.name == "Draft Inbox", + ) + )).scalars().all() + assert rows == [] + + +@pytest.mark.asyncio +async def test_finalize_creates_scout_from_session(): + from app.api.routes import scouts as scouts_mod + + state = "test-session-" + uuid.uuid4().hex + token = encrypt_token({"token": "x", "refresh_token": "y", "client_id": "c", "client_secret": "s"}) + scouts_mod._pending_scout_oauth_states[state] = { + "code_verifier": "v", + "user_id": TEST_USER_IDS["power"], + "expires_at": time.time() + 600, + "mode": "create", + "scout_id": None, + "draft": {"name": "Finalized", "prompt_template": "tasks", "auto_trash_spam": True}, + "token_encrypted": token, + "gmail_address": "me@gmail.com", + } + + # Patch get_connector to raise KeyError so setup_watch is skipped (best-effort). + with patch("app.api.routes.scouts.get_connector", side_effect=KeyError("gmail")): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post( + "/api/v1/scouts/cloud/finalize", + json={"session": state, "filter_config": {"labels": ["INBOX"]}}, + headers=_auth_headers(), + ) + assert resp.status_code == 201, resp.text + body = resp.json() + assert body["name"] == "Finalized" + assert body["auto_trash_spam"] is True + assert body["filter_config"] == {"labels": ["INBOX"]} + assert body["gmail_address"] == "me@gmail.com" + assert body["oauth_connected"] is True + + # Session must have been popped. + assert state not in scouts_mod._pending_scout_oauth_states + + +@pytest.mark.asyncio +async def test_finalize_rejects_unknown_session(): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post( + "/api/v1/scouts/cloud/finalize", + json={"session": "does-not-exist", "filter_config": None}, + headers=_auth_headers(), + ) + assert resp.status_code == 401, resp.text