diff --git a/app/config/settings.py b/app/config/settings.py index 0afa351..2c3ef41 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -58,6 +58,11 @@ class Settings(BaseSettings): # Prod: https://api.adiuvai.com/api/v1/auth/oauth/google/web-callback OAUTH_REDIRECT_URI: str = "http://localhost:8000/api/v1/auth/oauth/google/web-callback" + # Gmail Pub/Sub topic for push notifications. + # Full resource name, e.g. "projects/my-project/topics/gmail-push". + # Leave empty in dev — setup_watch will skip registration gracefully. + GMAIL_PUBSUB_TOPIC: str = "" + # Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth # tokens stored in cloud_agent_configs.oauth_token_encrypted. # Generate with: from cryptography.fernet import Fernet; Fernet.generate_key() diff --git a/app/scouts/connectors/gmail.py b/app/scouts/connectors/gmail.py new file mode 100644 index 0000000..d3abb3c --- /dev/null +++ b/app/scouts/connectors/gmail.py @@ -0,0 +1,195 @@ +"""Gmail SourceConnector — wraps the existing GmailClient. + +Responsibilities: + * list_new: incremental fetch since the scout's stored gmail_history_id + * fetch_metadata: subject + sender + snippet only (Gmail metadata format) + * fetch_content: full body text — transient, never persisted by engine + * archive: move a message to Gmail Trash (recoverable for 30 days) + * setup_watch / renew_watch: Gmail push notifications via Pub/Sub +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone + +from app.config.settings import settings +from app.integrations import decrypt_token +from app.integrations.gmail import GmailClient +from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef + +logger = logging.getLogger(__name__) + + +def _get_gmail_service(scout): + """Return a synchronous Google API client for low-level metadata/history calls.""" + from googleapiclient.discovery import build + from google.oauth2.credentials import Credentials + + creds_info = decrypt_token(scout.oauth_token_encrypted) + credentials = Credentials( + token=creds_info.get("token"), + refresh_token=creds_info.get("refresh_token"), + token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"), + client_id=creds_info.get("client_id"), + client_secret=creds_info.get("client_secret"), + scopes=creds_info.get("scopes"), + ) + return build("gmail", "v1", credentials=credentials, cache_discovery=False) + + +class GmailConnector: + source_type = "gmail" + + # ── list_new ────────────────────────────────────────────────────────── + + async def list_new(self, scout) -> list[ItemRef]: + """Return new message refs since scout.gmail_history_id. + + On first run (gmail_history_id is None/empty), records the current + historyId without backfilling — avoids flooding the user with old mail. + Updates scout.gmail_history_id in-place (caller must persist to DB). + """ + def _sync() -> tuple[list[ItemRef], str | None]: + service = _get_gmail_service(scout) + history_id = scout.gmail_history_id + refs: list[ItemRef] = [] + new_history_id = history_id + + if history_id: + resp = ( + service.users() + .history() + .list( + userId="me", + startHistoryId=history_id, + historyTypes=["messageAdded"], + ) + .execute() + ) + for entry in resp.get("history", []): + for added in entry.get("messagesAdded", []): + refs.append(ItemRef(source_msg_ref=added["message"]["id"])) + new_history_id = resp.get("historyId", history_id) + else: + # First run: capture baseline history id without backfilling. + profile = service.users().getProfile(userId="me").execute() + new_history_id = profile["historyId"] + + return refs, new_history_id + + refs, new_history_id = await asyncio.to_thread(_sync) + if new_history_id and new_history_id != scout.gmail_history_id: + scout.gmail_history_id = new_history_id + return refs + + # ── fetch_metadata ──────────────────────────────────────────────────── + + async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata: + """Fetch subject, sender, snippet only — uses Gmail metadata format (no body).""" + + def _sync() -> ItemMetadata: + service = _get_gmail_service(scout) + msg = ( + service.users() + .messages() + .get( + userId="me", + id=ref.source_msg_ref, + format="metadata", + metadataHeaders=["Subject", "From", "Date"], + ) + .execute() + ) + headers = { + h["name"]: h["value"] + for h in msg.get("payload", {}).get("headers", []) + } + return ItemMetadata( + subject=headers.get("Subject"), + sender=headers.get("From"), + snippet=msg.get("snippet"), + received_at=None, + ) + + return await asyncio.to_thread(_sync) + + # ── fetch_content ───────────────────────────────────────────────────── + + async def fetch_content(self, scout, ref: ItemRef) -> ItemContent: + """Fetch full body text via GmailClient — transient, must not be persisted.""" + creds_info = decrypt_token(scout.oauth_token_encrypted) + client = GmailClient(creds_info) + # fetch_messages returns EmailMessage dataclasses with body_text already + # extracted and decoded. We pass an empty filter to avoid narrowing by + # date — callers should only invoke fetch_content for known-new messages. + messages = await client.fetch_messages(filter_config=None, since=None) + + # Pick the message matching our ref (or fall back to first if only one returned). + email_msg = next( + (m for m in messages if m.id == ref.source_msg_ref), + messages[0] if messages else None, + ) + if email_msg is None: + raise ValueError(f"Message {ref.source_msg_ref!r} not found via GmailClient") + + return ItemContent( + metadata=ItemMetadata( + subject=email_msg.subject, + sender=email_msg.sender, + snippet=None, + received_at=email_msg.date, + ), + body_text=email_msg.body_text, + raw_headers={}, + ) + + # ── archive ─────────────────────────────────────────────────────────── + + async def archive(self, scout, ref: ItemRef) -> None: + """Move the message to Gmail Trash (recoverable for 30 days).""" + + def _sync() -> None: + service = _get_gmail_service(scout) + service.users().messages().trash( + userId="me", id=ref.source_msg_ref + ).execute() + + await asyncio.to_thread(_sync) + + # ── watch management ────────────────────────────────────────────────── + + async def setup_watch(self, scout) -> None: + """Register a Gmail Pub/Sub push watch for the INBOX label. + + Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic + resource name (e.g. ``projects/my-project/topics/gmail-push``). + Logs a warning and returns without error if the topic is not configured. + """ + topic = settings.GMAIL_PUBSUB_TOPIC + if not topic: + logger.warning( + "setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup" + ) + return + + def _sync() -> None: + service = _get_gmail_service(scout) + request_body = { + "labelIds": ["INBOX"], + "topicName": topic, + } + resp = service.users().watch(userId="me", body=request_body).execute() + scout.gmail_history_id = resp.get("historyId") + expiration_ms = resp.get("expiration") + if expiration_ms: + scout.gmail_watch_expires_at = datetime.fromtimestamp( + int(expiration_ms) / 1000, tz=timezone.utc + ) + + await asyncio.to_thread(_sync) + + async def renew_watch(self, scout) -> None: + """Renew an existing Gmail Pub/Sub watch (same as setup_watch).""" + await self.setup_watch(scout) diff --git a/tests/test_scout_connectors_gmail.py b/tests/test_scout_connectors_gmail.py new file mode 100644 index 0000000..16c35aa --- /dev/null +++ b/tests/test_scout_connectors_gmail.py @@ -0,0 +1,77 @@ +"""Tests for GmailConnector.""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.models import CloudScoutConfig +from app.scouts.connectors.base import ItemRef +from app.scouts.connectors.gmail import GmailConnector + + +def _make_scout(): + return CloudScoutConfig( + id=str(uuid.uuid4()), + user_id="00000000-0000-0000-0000-000000000003", + provider="gmail", + name="Inbox", + data_types=[], + prompt_template="", + oauth_token_encrypted="encrypted-blob", + schedule_cron="0 * * * *", + enabled=True, + auto_trash_spam=False, + device_inactivity_pause_days=14, + gmail_history_id="100", + ) + + +@pytest.mark.asyncio +async def test_fetch_metadata_returns_subject_and_snippet(): + scout = _make_scout() + conn = GmailConnector() + fake_message = { + "id": "msg-1", + "snippet": "preview text", + "payload": {"headers": [ + {"name": "Subject", "value": "Hello"}, + {"name": "From", "value": "alice@example.com"}, + {"name": "Date", "value": "Wed, 14 May 2026 10:00:00 +0000"}, + ]}, + } + with patch("app.scouts.connectors.gmail._get_gmail_service") as mock_svc: + mock_svc.return_value.users().messages().get().execute.return_value = fake_message + meta = await conn.fetch_metadata(scout, ItemRef(source_msg_ref="msg-1")) + assert meta.subject == "Hello" + assert meta.sender == "alice@example.com" + assert meta.snippet == "preview text" + + +@pytest.mark.asyncio +async def test_fetch_content_returns_body_text(): + scout = _make_scout() + conn = GmailConnector() + # decrypt_token is patched because the test doesn't set OAUTH_ENCRYPTION_KEY. + with patch("app.scouts.connectors.gmail.decrypt_token", return_value={}), \ + patch("app.scouts.connectors.gmail.GmailClient") as MockClient: + instance = MockClient.return_value + instance.fetch_messages = AsyncMock(return_value=[ + MagicMock(id="msg-1", subject="S", sender="a@b", body_text="hello world", + date=datetime.now(tz=timezone.utc), labels=[]), + ]) + content = await conn.fetch_content(scout, ItemRef(source_msg_ref="msg-1")) + assert content.body_text == "hello world" + assert content.metadata.subject == "S" + + +@pytest.mark.asyncio +async def test_archive_calls_trash(): + scout = _make_scout() + conn = GmailConnector() + with patch("app.scouts.connectors.gmail._get_gmail_service") as mock_svc: + await conn.archive(scout, ItemRef(source_msg_ref="msg-1")) + mock_svc.return_value.users().messages().trash.assert_called()