2 Commits

Author SHA1 Message Date
Roberto
f64ca11888 feat(scouts): pending-session Gmail OAuth — create cloud scout at finalize
Refactor _pending_scout_oauth_states from a tuple to a dict carrying
mode (reconnect|create), draft fields, and a transient encrypted token.
Add authorize-draft, session-labels, and cloud/finalize endpoints so the
scout row is created only when the flow completes — abandoned flows leave
no orphan rows. Zero-trust: the encrypted token lives only in the in-memory
session (<=15 min) until finalize persists it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 18:23:52 +02:00
Roberto
95d4e4be75 fix(scouts): delete cloud scout via Core delete to avoid varchar=uuid cascade error
The run_logs relationship joins scout_run_logs.scout_id (varchar) to
cloud_scout_configs.id (uuid); Postgres has no varchar=uuid operator so the
ORM cascade on db.delete(scout) 500'd. Core deletes bypass it; triage queue
rows cascade via FK ondelete.
2026-06-10 18:16:59 +02:00
3 changed files with 329 additions and 42 deletions

View File

@@ -26,7 +26,7 @@ from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from sqlalchemy import func, select from sqlalchemy import delete as sa_delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel from pydantic import BaseModel
@@ -39,7 +39,7 @@ from app.core.scout_runner import is_agent_running, run_local_agent
from app.core.device_manager import device_manager from app.core.device_manager import device_manager
from app.core.note_summarizer import generate_note_summary from app.core.note_summarizer import generate_note_summary
from app.db import get_session from app.db import get_session
from app.integrations import encrypt_token from app.integrations import decrypt_token, encrypt_token
from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig
from app.scouts.connectors.registry import get_connector from app.scouts.connectors.registry import get_connector
from app.schemas import ( from app.schemas import (
@@ -371,7 +371,12 @@ async def delete_cloud_scout(
scout = await db.get(CloudScoutConfig, scout_id) scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id: if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found") raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
await db.delete(scout) # Core deletes bypass the polymorphic ScoutRunLog relationship whose
# varchar scout_id vs uuid id join is not directly comparable in Postgres.
# scout_run_logs.scout_id is a plain string (matches the str scout_id);
# scout_triage_queue rows cascade automatically via their FK ondelete.
await db.execute(sa_delete(ScoutRunLog).where(ScoutRunLog.scout_id == scout_id))
await db.execute(sa_delete(CloudScoutConfig).where(CloudScoutConfig.id == scout_id))
await db.commit() await db.commit()
return {"ok": True} return {"ok": True}
@@ -430,11 +435,35 @@ _GMAIL_SCOUT_SCOPES = [
_GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" _GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
# In-memory pending OAuth states for scout Gmail consent: # In-memory pending OAuth states for scout Gmail consent.
# state → (code_verifier, scout_id, user_id, expires_at_epoch_s) #
# Production note: replace with Redis for multi-process deployments. # state → {
_pending_scout_oauth_states: dict[str, tuple[str, str, str, float]] = {} # "code_verifier": str,
_SCOUT_OAUTH_TTL_SECONDS = 600 # 10 minutes # "user_id": str,
# "expires_at": float (epoch seconds),
# "mode": "reconnect" | "create",
# "scout_id": str | None, # set for reconnect mode
# "draft": {name, prompt_template, auto_trash_spam} | None, # set for create mode
# "token_encrypted": str | None, # populated after a successful create-mode callback
# "gmail_address": str | None,
# }
#
# Zero-trust: in create mode the encrypted Gmail token lives ONLY here, in
# process memory, for at most _SCOUT_OAUTH_TTL_SECONDS. It is persisted to the
# DB only when the user finalizes the scout (POST /scouts/cloud/finalize).
# An abandoned/errored flow leaves no scout row and no stored token.
#
# Production note: this in-memory store is single-process only — replace with
# Redis (keyed by state, TTL'd) for multi-worker deployments.
_pending_scout_oauth_states: dict[str, dict] = {}
_SCOUT_OAUTH_TTL_SECONDS = 900 # 15 minutes
def _purge_expired_oauth_states() -> None:
now = time.time()
expired = [s for s, e in _pending_scout_oauth_states.items() if e.get("expires_at", 0) < now]
for s in expired:
del _pending_scout_oauth_states[s]
def _scout_gmail_redirect_uri() -> str: def _scout_gmail_redirect_uri() -> str:
@@ -458,6 +487,34 @@ class _ScoutGmailCallbackBody(BaseModel):
state: str state: str
class _ScoutGmailAuthorizeDraftBody(BaseModel):
name: str
prompt_template: str = ""
auto_trash_spam: bool = False
class _ScoutGmailFinalizeBody(BaseModel):
session: str
filter_config: dict | None = None
def _build_gmail_authorize_url(state: str, code_challenge: str) -> str:
"""Build the Google consent URL for the scout Gmail flow (shared by both modes)."""
redirect_uri = _scout_gmail_redirect_uri()
params = {
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(_GMAIL_SCOUT_SCOPES),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
}
return f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
@router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse) @router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize( async def scout_gmail_oauth_authorize(
scout_id: str, scout_id: str,
@@ -478,28 +535,63 @@ async def scout_gmail_oauth_authorize(
code_verifier, code_challenge = generate_pkce_pair() code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32) state = secrets.token_urlsafe(32)
# Purge expired states to prevent unbounded growth. _purge_expired_oauth_states()
now = time.time()
expired = [s for s, (_, _, _, exp) in _pending_scout_oauth_states.items() if exp < now]
for s in expired:
del _pending_scout_oauth_states[s]
_pending_scout_oauth_states[state] = (code_verifier, scout_id, current_user.id, now + _SCOUT_OAUTH_TTL_SECONDS) _pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
redirect_uri = _scout_gmail_redirect_uri() "user_id": current_user.id,
params = { "expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"client_id": settings.GOOGLE_AUTH_CLIENT_ID, "mode": "reconnect",
"redirect_uri": redirect_uri, "scout_id": scout_id,
"response_type": "code", "draft": None,
"scope": " ".join(_GMAIL_SCOUT_SCOPES), "token_encrypted": None,
"state": state, "gmail_address": None,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
} }
authorize_url = f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
return _ScoutGmailAuthorizeResponse(authorize_url=authorize_url) return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.post("/oauth/gmail/authorize-draft", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize_draft(
body: _ScoutGmailAuthorizeDraftBody,
current_user: UserProfile = Depends(get_current_user),
) -> _ScoutGmailAuthorizeResponse:
"""Start the Gmail OAuth flow in *creation* mode — no scout row exists yet.
The draft scout fields are held in the pending OAuth session; the scout is
only created once the user finalizes (POST /scouts/cloud/finalize).
"""
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
"Google OAuth is not configured on this server",
)
code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32)
_purge_expired_oauth_states()
_pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
"user_id": current_user.id,
"expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"mode": "create",
"scout_id": None,
"draft": {
"name": body.name,
"prompt_template": body.prompt_template,
"auto_trash_spam": body.auto_trash_spam,
},
"token_encrypted": None,
"gmail_address": None,
}
return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.get("/oauth/gmail/web-callback", include_in_schema=False) @router.get("/oauth/gmail/web-callback", include_in_schema=False)
@@ -526,10 +618,16 @@ async def scout_gmail_oauth_callback(
the ``code`` and ``state`` params. the ``code`` and ``state`` params.
""" """
entry = _pending_scout_oauth_states.pop(body.state, None) entry = _pending_scout_oauth_states.pop(body.state, None)
if entry is None or entry[3] < time.time() or entry[2] != current_user.id: if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
code_verifier, scout_id, _, _ = entry code_verifier = entry["code_verifier"]
mode = entry["mode"]
scout_id = entry.get("scout_id")
redirect_uri = _scout_gmail_redirect_uri() redirect_uri = _scout_gmail_redirect_uri()
@@ -567,12 +665,8 @@ async def scout_gmail_oauth_callback(
} }
encrypted = encrypt_token(creds_dict) encrypted = encrypt_token(creds_dict)
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted
# Fetch the connected Gmail address for display. # Fetch the connected Gmail address for display.
gmail_address: str | None = None
try: try:
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
@@ -590,9 +684,25 @@ async def scout_gmail_oauth_callback(
profile = service.users().getProfile(userId="me").execute() profile = service.users().getProfile(userId="me").execute()
return profile.get("emailAddress") return profile.get("emailAddress")
scout.gmail_address = await asyncio.to_thread(_fetch_email) gmail_address = await asyncio.to_thread(_fetch_email)
except Exception: except Exception:
logger.exception("failed to fetch gmail address for scout %s", scout_id) logger.exception("failed to fetch gmail address (mode=%s)", mode)
if mode == "create":
# Do NOT create a scout yet. Hold the encrypted token + address in the
# transient in-memory session; the scout is created at finalize.
entry["token_encrypted"] = encrypted
entry["gmail_address"] = gmail_address
entry["expires_at"] = time.time() + _SCOUT_OAUTH_TTL_SECONDS
_pending_scout_oauth_states[body.state] = entry
return {"ok": True, "session_id": body.state, "gmail_address": gmail_address}
# mode == "reconnect": update the existing scout in place.
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted
scout.gmail_address = gmail_address
await db.commit() await db.commit()
@@ -606,4 +716,92 @@ async def scout_gmail_oauth_callback(
except Exception: except Exception:
logger.exception("setup_watch failed for scout %s", scout_id) logger.exception("setup_watch failed for scout %s", scout_id)
return {"ok": True} return {"ok": True, "session_id": None, "gmail_address": gmail_address}
@router.get("/oauth/gmail/session-labels")
async def scout_gmail_session_labels(
session: str,
current_user: UserProfile = Depends(get_current_user),
) -> list[dict]:
"""List Gmail labels for a pending create-mode OAuth session (no scout row yet).
Builds a Gmail service from the session's transient decrypted token.
Returns [] on any error.
"""
entry = _pending_scout_oauth_states.get(session)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_404_NOT_FOUND, "Session not found or expired")
try:
from app.scouts.connectors.gmail import _gmail_service_from_token
creds = decrypt_token(entry["token_encrypted"])
def _sync() -> list[dict]:
service = _gmail_service_from_token(creds)
resp = service.users().labels().list(userId="me").execute()
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
return await asyncio.to_thread(_sync)
except Exception:
logger.exception("session-labels failed for session %s", session)
return []
@router.post("/cloud/finalize", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
async def finalize_cloud_scout(
body: _ScoutGmailFinalizeBody,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
"""Create the cloud scout from a completed create-mode OAuth session.
This is the only path that persists the Gmail token for a newly-created
scout. Abandoned flows never reach here, so they leave no orphan rows.
"""
entry = _pending_scout_oauth_states.pop(body.session, None)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("mode") != "create"
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth session")
draft = entry["draft"] or {}
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
provider="gmail",
name=draft.get("name", ""),
data_types=[],
prompt_template=draft.get("prompt_template", ""),
filter_config=body.filter_config,
schedule_cron=_DEFAULT_CLOUD_SCHEDULE,
auto_trash_spam=draft.get("auto_trash_spam", False),
enabled=True,
oauth_token_encrypted=entry["token_encrypted"],
gmail_address=entry.get("gmail_address"),
)
db.add(scout)
await db.commit()
await db.refresh(scout)
# Best-effort Gmail push watch — failure must not block scout creation.
try:
connector = get_connector("gmail")
await connector.setup_watch(scout)
await db.commit()
except KeyError:
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout.id)
except Exception:
logger.exception("setup_watch failed for scout %s", scout.id)
return _to_cloud_response(scout)

View File

@@ -45,12 +45,15 @@ def _extract_plain_text_body(payload: dict) -> str:
return "" return ""
def _get_gmail_service(scout): def _gmail_service_from_token(creds_info: dict):
"""Return a synchronous Google API client for low-level metadata/history calls.""" """Build a synchronous Gmail API client from a decrypted credentials dict.
Shared by ``_get_gmail_service`` (scout-backed) and the pending-session
OAuth flow which has a raw token but no scout row yet.
"""
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
creds_info = decrypt_token(scout.oauth_token_encrypted)
credentials = Credentials( credentials = Credentials(
token=creds_info.get("token"), token=creds_info.get("token"),
refresh_token=creds_info.get("refresh_token"), refresh_token=creds_info.get("refresh_token"),
@@ -62,6 +65,12 @@ def _get_gmail_service(scout):
return build("gmail", "v1", credentials=credentials, cache_discovery=False) return build("gmail", "v1", credentials=credentials, cache_discovery=False)
def _get_gmail_service(scout):
"""Return a synchronous Google API client for low-level metadata/history calls."""
creds_info = decrypt_token(scout.oauth_token_encrypted)
return _gmail_service_from_token(creds_info)
class GmailConnector: class GmailConnector:
source_type = "gmail" source_type = "gmail"

View File

@@ -2,16 +2,19 @@
from __future__ import annotations from __future__ import annotations
import time
import uuid import uuid
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
from sqlalchemy import select
from app.db import get_session from app.db import get_session
from app.integrations import encrypt_token
from app.main import app from app.main import app
from app.models import CloudScoutConfig from app.models import CloudScoutConfig
from tests.conftest import _TestSessionLocal, make_jwt from tests.conftest import _TestSessionLocal, make_jwt, TEST_USER_IDS
def _auth_headers(tier: str = "power") -> dict: def _auth_headers(tier: str = "power") -> dict:
@@ -147,3 +150,80 @@ async def test_gmail_disconnect_clears_token():
assert body["oauth_connected"] is False assert body["oauth_connected"] is False
assert body["gmail_address"] is None assert body["gmail_address"] is None
assert body["enabled"] is False assert body["enabled"] is False
# ── Pending-session create-at-finalize flow ───────────────────────────────────
@pytest.mark.asyncio
async def test_authorize_draft_returns_url_and_no_scout_created():
from app.config.settings import settings as app_settings
with patch.object(app_settings, "GOOGLE_AUTH_CLIENT_ID", "cid"), \
patch.object(app_settings, "GOOGLE_AUTH_CLIENT_SECRET", "secret"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/oauth/gmail/authorize-draft",
json={"name": "Draft Inbox", "prompt_template": "invoices", "auto_trash_spam": True},
headers=_auth_headers(),
)
assert resp.status_code == 200, resp.text
assert resp.json()["authorize_url"].startswith("https://accounts.google.com/")
# No scout row should have been created by authorize-draft.
async with _TestSessionLocal() as session:
rows = (await session.execute(
select(CloudScoutConfig).where(
CloudScoutConfig.user_id == TEST_USER_IDS["power"],
CloudScoutConfig.name == "Draft Inbox",
)
)).scalars().all()
assert rows == []
@pytest.mark.asyncio
async def test_finalize_creates_scout_from_session():
from app.api.routes import scouts as scouts_mod
state = "test-session-" + uuid.uuid4().hex
token = encrypt_token({"token": "x", "refresh_token": "y", "client_id": "c", "client_secret": "s"})
scouts_mod._pending_scout_oauth_states[state] = {
"code_verifier": "v",
"user_id": TEST_USER_IDS["power"],
"expires_at": time.time() + 600,
"mode": "create",
"scout_id": None,
"draft": {"name": "Finalized", "prompt_template": "tasks", "auto_trash_spam": True},
"token_encrypted": token,
"gmail_address": "me@gmail.com",
}
# Patch get_connector to raise KeyError so setup_watch is skipped (best-effort).
with patch("app.api.routes.scouts.get_connector", side_effect=KeyError("gmail")):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/cloud/finalize",
json={"session": state, "filter_config": {"labels": ["INBOX"]}},
headers=_auth_headers(),
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["name"] == "Finalized"
assert body["auto_trash_spam"] is True
assert body["filter_config"] == {"labels": ["INBOX"]}
assert body["gmail_address"] == "me@gmail.com"
assert body["oauth_connected"] is True
# Session must have been popped.
assert state not in scouts_mod._pending_scout_oauth_states
@pytest.mark.asyncio
async def test_finalize_rejects_unknown_session():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post(
"/api/v1/scouts/cloud/finalize",
json={"session": "does-not-exist", "filter_config": None},
headers=_auth_headers(),
)
assert resp.status_code == 401, resp.text