feat(scouts): pending-session Gmail OAuth — create cloud scout at finalize

Refactor _pending_scout_oauth_states from a tuple to a dict carrying
mode (reconnect|create), draft fields, and a transient encrypted token.
Add authorize-draft, session-labels, and cloud/finalize endpoints so the
scout row is created only when the flow completes — abandoned flows leave
no orphan rows. Zero-trust: the encrypted token lives only in the in-memory
session (<=15 min) until finalize persists it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Roberto
2026-06-10 18:23:52 +02:00
parent 95d4e4be75
commit f64ca11888
3 changed files with 322 additions and 40 deletions

View File

@@ -39,7 +39,7 @@ from app.core.scout_runner import is_agent_running, run_local_agent
from app.core.device_manager import device_manager from app.core.device_manager import device_manager
from app.core.note_summarizer import generate_note_summary from app.core.note_summarizer import generate_note_summary
from app.db import get_session from app.db import get_session
from app.integrations import encrypt_token from app.integrations import decrypt_token, encrypt_token
from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig
from app.scouts.connectors.registry import get_connector from app.scouts.connectors.registry import get_connector
from app.schemas import ( from app.schemas import (
@@ -435,11 +435,35 @@ _GMAIL_SCOUT_SCOPES = [
_GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" _GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
# In-memory pending OAuth states for scout Gmail consent: # In-memory pending OAuth states for scout Gmail consent.
# state → (code_verifier, scout_id, user_id, expires_at_epoch_s) #
# Production note: replace with Redis for multi-process deployments. # state → {
_pending_scout_oauth_states: dict[str, tuple[str, str, str, float]] = {} # "code_verifier": str,
_SCOUT_OAUTH_TTL_SECONDS = 600 # 10 minutes # "user_id": str,
# "expires_at": float (epoch seconds),
# "mode": "reconnect" | "create",
# "scout_id": str | None, # set for reconnect mode
# "draft": {name, prompt_template, auto_trash_spam} | None, # set for create mode
# "token_encrypted": str | None, # populated after a successful create-mode callback
# "gmail_address": str | None,
# }
#
# Zero-trust: in create mode the encrypted Gmail token lives ONLY here, in
# process memory, for at most _SCOUT_OAUTH_TTL_SECONDS. It is persisted to the
# DB only when the user finalizes the scout (POST /scouts/cloud/finalize).
# An abandoned/errored flow leaves no scout row and no stored token.
#
# Production note: this in-memory store is single-process only — replace with
# Redis (keyed by state, TTL'd) for multi-worker deployments.
_pending_scout_oauth_states: dict[str, dict] = {}
_SCOUT_OAUTH_TTL_SECONDS = 900 # 15 minutes
def _purge_expired_oauth_states() -> None:
now = time.time()
expired = [s for s, e in _pending_scout_oauth_states.items() if e.get("expires_at", 0) < now]
for s in expired:
del _pending_scout_oauth_states[s]
def _scout_gmail_redirect_uri() -> str: def _scout_gmail_redirect_uri() -> str:
@@ -463,6 +487,34 @@ class _ScoutGmailCallbackBody(BaseModel):
state: str state: str
class _ScoutGmailAuthorizeDraftBody(BaseModel):
name: str
prompt_template: str = ""
auto_trash_spam: bool = False
class _ScoutGmailFinalizeBody(BaseModel):
session: str
filter_config: dict | None = None
def _build_gmail_authorize_url(state: str, code_challenge: str) -> str:
"""Build the Google consent URL for the scout Gmail flow (shared by both modes)."""
redirect_uri = _scout_gmail_redirect_uri()
params = {
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(_GMAIL_SCOUT_SCOPES),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
}
return f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
@router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse) @router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize( async def scout_gmail_oauth_authorize(
scout_id: str, scout_id: str,
@@ -483,28 +535,63 @@ async def scout_gmail_oauth_authorize(
code_verifier, code_challenge = generate_pkce_pair() code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32) state = secrets.token_urlsafe(32)
# Purge expired states to prevent unbounded growth. _purge_expired_oauth_states()
now = time.time()
expired = [s for s, (_, _, _, exp) in _pending_scout_oauth_states.items() if exp < now]
for s in expired:
del _pending_scout_oauth_states[s]
_pending_scout_oauth_states[state] = (code_verifier, scout_id, current_user.id, now + _SCOUT_OAUTH_TTL_SECONDS) _pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
redirect_uri = _scout_gmail_redirect_uri() "user_id": current_user.id,
params = { "expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"client_id": settings.GOOGLE_AUTH_CLIENT_ID, "mode": "reconnect",
"redirect_uri": redirect_uri, "scout_id": scout_id,
"response_type": "code", "draft": None,
"scope": " ".join(_GMAIL_SCOUT_SCOPES), "token_encrypted": None,
"state": state, "gmail_address": None,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
} }
authorize_url = f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
return _ScoutGmailAuthorizeResponse(authorize_url=authorize_url) return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.post("/oauth/gmail/authorize-draft", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize_draft(
body: _ScoutGmailAuthorizeDraftBody,
current_user: UserProfile = Depends(get_current_user),
) -> _ScoutGmailAuthorizeResponse:
"""Start the Gmail OAuth flow in *creation* mode — no scout row exists yet.
The draft scout fields are held in the pending OAuth session; the scout is
only created once the user finalizes (POST /scouts/cloud/finalize).
"""
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
"Google OAuth is not configured on this server",
)
code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32)
_purge_expired_oauth_states()
_pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
"user_id": current_user.id,
"expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"mode": "create",
"scout_id": None,
"draft": {
"name": body.name,
"prompt_template": body.prompt_template,
"auto_trash_spam": body.auto_trash_spam,
},
"token_encrypted": None,
"gmail_address": None,
}
return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.get("/oauth/gmail/web-callback", include_in_schema=False) @router.get("/oauth/gmail/web-callback", include_in_schema=False)
@@ -531,10 +618,16 @@ async def scout_gmail_oauth_callback(
the ``code`` and ``state`` params. the ``code`` and ``state`` params.
""" """
entry = _pending_scout_oauth_states.pop(body.state, None) entry = _pending_scout_oauth_states.pop(body.state, None)
if entry is None or entry[3] < time.time() or entry[2] != current_user.id: if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
code_verifier, scout_id, _, _ = entry code_verifier = entry["code_verifier"]
mode = entry["mode"]
scout_id = entry.get("scout_id")
redirect_uri = _scout_gmail_redirect_uri() redirect_uri = _scout_gmail_redirect_uri()
@@ -572,12 +665,8 @@ async def scout_gmail_oauth_callback(
} }
encrypted = encrypt_token(creds_dict) encrypted = encrypt_token(creds_dict)
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted
# Fetch the connected Gmail address for display. # Fetch the connected Gmail address for display.
gmail_address: str | None = None
try: try:
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
@@ -595,9 +684,25 @@ async def scout_gmail_oauth_callback(
profile = service.users().getProfile(userId="me").execute() profile = service.users().getProfile(userId="me").execute()
return profile.get("emailAddress") return profile.get("emailAddress")
scout.gmail_address = await asyncio.to_thread(_fetch_email) gmail_address = await asyncio.to_thread(_fetch_email)
except Exception: except Exception:
logger.exception("failed to fetch gmail address for scout %s", scout_id) logger.exception("failed to fetch gmail address (mode=%s)", mode)
if mode == "create":
# Do NOT create a scout yet. Hold the encrypted token + address in the
# transient in-memory session; the scout is created at finalize.
entry["token_encrypted"] = encrypted
entry["gmail_address"] = gmail_address
entry["expires_at"] = time.time() + _SCOUT_OAUTH_TTL_SECONDS
_pending_scout_oauth_states[body.state] = entry
return {"ok": True, "session_id": body.state, "gmail_address": gmail_address}
# mode == "reconnect": update the existing scout in place.
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted
scout.gmail_address = gmail_address
await db.commit() await db.commit()
@@ -611,4 +716,92 @@ async def scout_gmail_oauth_callback(
except Exception: except Exception:
logger.exception("setup_watch failed for scout %s", scout_id) logger.exception("setup_watch failed for scout %s", scout_id)
return {"ok": True} return {"ok": True, "session_id": None, "gmail_address": gmail_address}
@router.get("/oauth/gmail/session-labels")
async def scout_gmail_session_labels(
session: str,
current_user: UserProfile = Depends(get_current_user),
) -> list[dict]:
"""List Gmail labels for a pending create-mode OAuth session (no scout row yet).
Builds a Gmail service from the session's transient decrypted token.
Returns [] on any error.
"""
entry = _pending_scout_oauth_states.get(session)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_404_NOT_FOUND, "Session not found or expired")
try:
from app.scouts.connectors.gmail import _gmail_service_from_token
creds = decrypt_token(entry["token_encrypted"])
def _sync() -> list[dict]:
service = _gmail_service_from_token(creds)
resp = service.users().labels().list(userId="me").execute()
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
return await asyncio.to_thread(_sync)
except Exception:
logger.exception("session-labels failed for session %s", session)
return []
@router.post("/cloud/finalize", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
async def finalize_cloud_scout(
body: _ScoutGmailFinalizeBody,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
"""Create the cloud scout from a completed create-mode OAuth session.
This is the only path that persists the Gmail token for a newly-created
scout. Abandoned flows never reach here, so they leave no orphan rows.
"""
entry = _pending_scout_oauth_states.pop(body.session, None)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("mode") != "create"
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth session")
draft = entry["draft"] or {}
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
provider="gmail",
name=draft.get("name", ""),
data_types=[],
prompt_template=draft.get("prompt_template", ""),
filter_config=body.filter_config,
schedule_cron=_DEFAULT_CLOUD_SCHEDULE,
auto_trash_spam=draft.get("auto_trash_spam", False),
enabled=True,
oauth_token_encrypted=entry["token_encrypted"],
gmail_address=entry.get("gmail_address"),
)
db.add(scout)
await db.commit()
await db.refresh(scout)
# Best-effort Gmail push watch — failure must not block scout creation.
try:
connector = get_connector("gmail")
await connector.setup_watch(scout)
await db.commit()
except KeyError:
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout.id)
except Exception:
logger.exception("setup_watch failed for scout %s", scout.id)
return _to_cloud_response(scout)

View File

@@ -45,12 +45,15 @@ def _extract_plain_text_body(payload: dict) -> str:
return "" return ""
def _get_gmail_service(scout): def _gmail_service_from_token(creds_info: dict):
"""Return a synchronous Google API client for low-level metadata/history calls.""" """Build a synchronous Gmail API client from a decrypted credentials dict.
Shared by ``_get_gmail_service`` (scout-backed) and the pending-session
OAuth flow which has a raw token but no scout row yet.
"""
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
creds_info = decrypt_token(scout.oauth_token_encrypted)
credentials = Credentials( credentials = Credentials(
token=creds_info.get("token"), token=creds_info.get("token"),
refresh_token=creds_info.get("refresh_token"), refresh_token=creds_info.get("refresh_token"),
@@ -62,6 +65,12 @@ def _get_gmail_service(scout):
return build("gmail", "v1", credentials=credentials, cache_discovery=False) return build("gmail", "v1", credentials=credentials, cache_discovery=False)
def _get_gmail_service(scout):
"""Return a synchronous Google API client for low-level metadata/history calls."""
creds_info = decrypt_token(scout.oauth_token_encrypted)
return _gmail_service_from_token(creds_info)
class GmailConnector: class GmailConnector:
source_type = "gmail" source_type = "gmail"

View File

@@ -2,16 +2,19 @@
from __future__ import annotations from __future__ import annotations
import time
import uuid import uuid
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
from sqlalchemy import select
from app.db import get_session from app.db import get_session
from app.integrations import encrypt_token
from app.main import app from app.main import app
from app.models import CloudScoutConfig from app.models import CloudScoutConfig
from tests.conftest import _TestSessionLocal, make_jwt from tests.conftest import _TestSessionLocal, make_jwt, TEST_USER_IDS
def _auth_headers(tier: str = "power") -> dict: def _auth_headers(tier: str = "power") -> dict:
@@ -147,3 +150,80 @@ async def test_gmail_disconnect_clears_token():
assert body["oauth_connected"] is False assert body["oauth_connected"] is False
assert body["gmail_address"] is None assert body["gmail_address"] is None
assert body["enabled"] is False assert body["enabled"] is False
# ── Pending-session create-at-finalize flow ───────────────────────────────────
@pytest.mark.asyncio
async def test_authorize_draft_returns_url_and_no_scout_created():
from app.config.settings import settings as app_settings
with patch.object(app_settings, "GOOGLE_AUTH_CLIENT_ID", "cid"), \
patch.object(app_settings, "GOOGLE_AUTH_CLIENT_SECRET", "secret"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/oauth/gmail/authorize-draft",
json={"name": "Draft Inbox", "prompt_template": "invoices", "auto_trash_spam": True},
headers=_auth_headers(),
)
assert resp.status_code == 200, resp.text
assert resp.json()["authorize_url"].startswith("https://accounts.google.com/")
# No scout row should have been created by authorize-draft.
async with _TestSessionLocal() as session:
rows = (await session.execute(
select(CloudScoutConfig).where(
CloudScoutConfig.user_id == TEST_USER_IDS["power"],
CloudScoutConfig.name == "Draft Inbox",
)
)).scalars().all()
assert rows == []
@pytest.mark.asyncio
async def test_finalize_creates_scout_from_session():
from app.api.routes import scouts as scouts_mod
state = "test-session-" + uuid.uuid4().hex
token = encrypt_token({"token": "x", "refresh_token": "y", "client_id": "c", "client_secret": "s"})
scouts_mod._pending_scout_oauth_states[state] = {
"code_verifier": "v",
"user_id": TEST_USER_IDS["power"],
"expires_at": time.time() + 600,
"mode": "create",
"scout_id": None,
"draft": {"name": "Finalized", "prompt_template": "tasks", "auto_trash_spam": True},
"token_encrypted": token,
"gmail_address": "me@gmail.com",
}
# Patch get_connector to raise KeyError so setup_watch is skipped (best-effort).
with patch("app.api.routes.scouts.get_connector", side_effect=KeyError("gmail")):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/cloud/finalize",
json={"session": state, "filter_config": {"labels": ["INBOX"]}},
headers=_auth_headers(),
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["name"] == "Finalized"
assert body["auto_trash_spam"] is True
assert body["filter_config"] == {"labels": ["INBOX"]}
assert body["gmail_address"] == "me@gmail.com"
assert body["oauth_connected"] is True
# Session must have been popped.
assert state not in scouts_mod._pending_scout_oauth_states
@pytest.mark.asyncio
async def test_finalize_rejects_unknown_session():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/cloud/finalize",
json={"session": "does-not-exist", "filter_config": None},
headers=_auth_headers(),
)
assert resp.status_code == 401, resp.text