From d3497a1908ea24f4133e2ab4ec1c72d420e61e5b Mon Sep 17 00:00:00 2001 From: Roberto Date: Sat, 16 May 2026 04:31:57 +0200 Subject: [PATCH] feat(scouts): gmail pub/sub webhook with JWT verification --- app/api/routes/scout_webhooks.py | 120 +++++++++++++++++++++++++++++++ app/config/settings.py | 5 ++ app/main.py | 15 ++-- tests/test_scout_webhook.py | 106 +++++++++++++++++++++++++++ 4 files changed, 239 insertions(+), 7 deletions(-) create mode 100644 app/api/routes/scout_webhooks.py create mode 100644 tests/test_scout_webhook.py diff --git a/app/api/routes/scout_webhooks.py b/app/api/routes/scout_webhooks.py new file mode 100644 index 0000000..cf89020 --- /dev/null +++ b/app/api/routes/scout_webhooks.py @@ -0,0 +1,120 @@ +"""Gmail Pub/Sub push receiver. + +Google Pub/Sub push subscriptions deliver Gmail watch notifications as POST +requests with a JSON envelope. The body payload contains a base64-encoded +JSON blob with ``emailAddress`` + ``historyId``. We resolve the user by +email, look up their cloud_scout_configs row for provider='gmail', and +hand off to ScoutEngine.trigger_scout. + +Authentication: Pub/Sub push includes an OIDC JWT in the Authorization +header. We verify it against Google's public keys with the audience +configured in our Pub/Sub subscription. + +Dev mode: when ``GMAIL_PUBSUB_AUDIENCE`` is empty, JWT verification is +skipped and a warning is logged. Production must set this env var. +""" + +from __future__ import annotations + +import base64 +import json +import logging +import uuid + +from fastapi import APIRouter, Header, HTTPException, Request, status +from sqlalchemy import select + +from app.config.settings import settings +from app.db import async_session +from app.models import CloudScoutConfig, User +from app.scouts.engine import ScoutEngine + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/scouts/webhooks", tags=["scout-webhooks"]) + + +def _verify_pubsub_jwt(token: str) -> bool: + """Verify the Google Pub/Sub OIDC JWT. + + Returns True when valid, False on any verification failure. + + Dev skip: if ``settings.GMAIL_PUBSUB_AUDIENCE`` is empty, logs a + warning and returns True so local development works without a real + Pub/Sub subscription. Production must configure the audience. + """ + if not token: + return False + + if not settings.GMAIL_PUBSUB_AUDIENCE: + logger.warning( + "GMAIL_PUBSUB_AUDIENCE not set — skipping Pub/Sub JWT verification (dev mode only)" + ) + return True + + try: + from google.auth.transport import requests as g_requests # noqa: PLC0415 + from google.oauth2 import id_token # noqa: PLC0415 + + id_token.verify_oauth2_token( + token, + g_requests.Request(), + audience=settings.GMAIL_PUBSUB_AUDIENCE, + ) + return True + except Exception: + logger.warning("pubsub jwt verification failed", exc_info=True) + return False + + +@router.post("/gmail", status_code=status.HTTP_204_NO_CONTENT) +async def gmail_pubsub( + request: Request, + authorization: str = Header(default=""), +) -> None: + """Receive a Gmail Pub/Sub push notification. + + Verifies the OIDC JWT, decodes the Pub/Sub envelope, resolves the user + by email, and triggers ScoutEngine.trigger_scout for each enabled Gmail + scout belonging to that user. + + Returns 204 No Content on success (including benign no-ops like unknown + email or empty message data). Returns 401 on JWT verification failure. + """ + token = authorization.removeprefix("Bearer ").strip() + if not _verify_pubsub_jwt(token): + raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid Pub/Sub JWT") + + body = await request.json() + msg = body.get("message") or {} + raw = msg.get("data") + if not raw: + return # ack without action — empty message data + + try: + decoded = json.loads(base64.b64decode(raw).decode()) + except Exception: + logger.warning("pubsub payload decode failed") + return + + email = decoded.get("emailAddress") + if not email: + return + + async with async_session() as session: + user_q = await session.execute(select(User).where(User.email == email)) + user = user_q.scalar_one_or_none() + if user is None: + logger.info("pubsub: no user for %s — ignoring", email) + return + scouts_q = await session.execute( + select(CloudScoutConfig).where( + CloudScoutConfig.user_id == user.id, + CloudScoutConfig.provider == "gmail", + CloudScoutConfig.enabled == True, # noqa: E712 + ) + ) + scouts = scouts_q.scalars().all() + + engine = ScoutEngine() + for scout in scouts: + await engine.trigger_scout(uuid.UUID(str(scout.id))) diff --git a/app/config/settings.py b/app/config/settings.py index 2c3ef41..f3ede2c 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -62,6 +62,11 @@ class Settings(BaseSettings): # Full resource name, e.g. "projects/my-project/topics/gmail-push". # Leave empty in dev — setup_watch will skip registration gracefully. GMAIL_PUBSUB_TOPIC: str = "" + # OIDC token audience for Pub/Sub push subscription JWT verification. + # Set to the service account email or audience string configured in the + # Pub/Sub push subscription. Leave empty in dev to skip verification + # (a warning is logged — never silent in production). + GMAIL_PUBSUB_AUDIENCE: str = "" # Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth # tokens stored in cloud_agent_configs.oauth_token_encrypted. diff --git a/app/main.py b/app/main.py index 1e1be3d..62b6954 100644 --- a/app/main.py +++ b/app/main.py @@ -129,14 +129,15 @@ def create_app() -> FastAPI: app.add_middleware(SanitizerMiddleware) app.add_middleware(TierRateLimitMiddleware) - from app.api.routes import scouts, auth, billing, chat, device_ws, memory + from app.api.routes import scouts, auth, billing, chat, device_ws, memory, scout_webhooks - app.include_router(auth.router, prefix="/api/v1") - app.include_router(chat.router, prefix="/api/v1") - app.include_router(billing.router, prefix="/api/v1") - app.include_router(scouts.router, prefix="/api/v1") - app.include_router(device_ws.router, prefix="/api/v1") - app.include_router(memory.router, prefix="/api/v1") + app.include_router(auth.router, prefix="/api/v1") + app.include_router(chat.router, prefix="/api/v1") + app.include_router(billing.router, prefix="/api/v1") + app.include_router(scouts.router, prefix="/api/v1") + app.include_router(scout_webhooks.router, prefix="/api/v1") + app.include_router(device_ws.router, prefix="/api/v1") + app.include_router(memory.router, prefix="/api/v1") @app.get("/api/v1/health", tags=["health"]) async def health() -> dict: diff --git a/tests/test_scout_webhook.py b/tests/test_scout_webhook.py new file mode 100644 index 0000000..bec5b1d --- /dev/null +++ b/tests/test_scout_webhook.py @@ -0,0 +1,106 @@ +"""Tests for the Gmail Pub/Sub webhook route. + +Covers: + - Happy path: valid JWT + known user + enabled scout → 204, engine triggered. + - Rejection: invalid JWT → 401. +""" + +from __future__ import annotations + +import base64 +import json +import uuid +from unittest.mock import AsyncMock, patch + +import pytest +from httpx import ASGITransport, AsyncClient + +from app.main import app +from app.models import CloudScoutConfig, User +from tests.conftest import _TestSessionLocal + + +def _pubsub_payload(email: str, history_id: str) -> dict: + """Build a minimal Pub/Sub push envelope.""" + inner = json.dumps({"emailAddress": email, "historyId": history_id}).encode() + return { + "message": {"data": base64.b64encode(inner).decode(), "messageId": "m1"}, + "subscription": "projects/x/subscriptions/gmail-watch-sub", + } + + +@pytest.mark.asyncio +async def test_webhook_triggers_scout_for_matching_user(): + """204 returned and ScoutEngine.trigger_scout awaited for the matching scout.""" + user_id = "00000000-0000-0000-0000-000000000003" # seeded 'power' user + scout_id = str(uuid.uuid4()) + + # Mutate the seeded user email so the webhook can resolve it, + # and add a cloud scout config for gmail. + async with _TestSessionLocal() as session: + user = await session.get(User, user_id) + user.email = "alice@example.com" + session.add( + CloudScoutConfig( + id=scout_id, + user_id=user_id, + provider="gmail", + name="Inbox", + data_types=[], + prompt_template="", + schedule_cron="0 * * * *", + enabled=True, + auto_trash_spam=False, + device_inactivity_pause_days=14, + ) + ) + await session.commit() + + payload = _pubsub_payload("alice@example.com", "200") + + with ( + patch( + "app.api.routes.scout_webhooks._verify_pubsub_jwt", + return_value=True, + ), + patch( + "app.api.routes.scout_webhooks.async_session", + _TestSessionLocal, + ), + patch( + "app.scouts.engine.ScoutEngine.trigger_scout", + new=AsyncMock(), + ) as mock_trigger, + ): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.post( + "/api/v1/scouts/webhooks/gmail", + json=payload, + headers={"Authorization": "Bearer fake-google-jwt"}, + ) + + assert resp.status_code == 204 + mock_trigger.assert_awaited_once_with(uuid.UUID(scout_id)) + + +@pytest.mark.asyncio +async def test_webhook_rejects_unverified_jwt(): + """401 returned when JWT verification fails.""" + payload = _pubsub_payload("alice@example.com", "200") + + with patch( + "app.api.routes.scout_webhooks._verify_pubsub_jwt", + return_value=False, + ): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + resp = await client.post( + "/api/v1/scouts/webhooks/gmail", + json=payload, + headers={"Authorization": "Bearer bogus"}, + ) + + assert resp.status_code == 401