"""Gmail SourceConnector — wraps the existing GmailClient. Responsibilities: * list_new: incremental fetch since the scout's stored gmail_history_id * fetch_metadata: subject + sender + snippet only (Gmail metadata format) * fetch_content: full body text — transient, never persisted by engine * archive: move a message to Gmail Trash (recoverable for 30 days) * setup_watch / renew_watch: Gmail push notifications via Pub/Sub """ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from app.config.settings import settings from app.integrations import decrypt_token from app.integrations.gmail import GmailClient from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef logger = logging.getLogger(__name__) def _get_gmail_service(scout): """Return a synchronous Google API client for low-level metadata/history calls.""" from googleapiclient.discovery import build from google.oauth2.credentials import Credentials creds_info = decrypt_token(scout.oauth_token_encrypted) credentials = Credentials( token=creds_info.get("token"), refresh_token=creds_info.get("refresh_token"), token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"), client_id=creds_info.get("client_id"), client_secret=creds_info.get("client_secret"), scopes=creds_info.get("scopes"), ) return build("gmail", "v1", credentials=credentials, cache_discovery=False) class GmailConnector: source_type = "gmail" # ── list_new ────────────────────────────────────────────────────────── async def list_new(self, scout) -> list[ItemRef]: """Return new message refs since scout.gmail_history_id. On first run (gmail_history_id is None/empty), records the current historyId without backfilling — avoids flooding the user with old mail. Updates scout.gmail_history_id in-place (caller must persist to DB). """ def _sync() -> tuple[list[ItemRef], str | None]: service = _get_gmail_service(scout) history_id = scout.gmail_history_id refs: list[ItemRef] = [] new_history_id = history_id if history_id: resp = ( service.users() .history() .list( userId="me", startHistoryId=history_id, historyTypes=["messageAdded"], ) .execute() ) for entry in resp.get("history", []): for added in entry.get("messagesAdded", []): refs.append(ItemRef(source_msg_ref=added["message"]["id"])) new_history_id = resp.get("historyId", history_id) else: # First run: capture baseline history id without backfilling. profile = service.users().getProfile(userId="me").execute() new_history_id = profile["historyId"] return refs, new_history_id refs, new_history_id = await asyncio.to_thread(_sync) if new_history_id and new_history_id != scout.gmail_history_id: scout.gmail_history_id = new_history_id return refs # ── fetch_metadata ──────────────────────────────────────────────────── async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata: """Fetch subject, sender, snippet only — uses Gmail metadata format (no body).""" def _sync() -> ItemMetadata: service = _get_gmail_service(scout) msg = ( service.users() .messages() .get( userId="me", id=ref.source_msg_ref, format="metadata", metadataHeaders=["Subject", "From", "Date"], ) .execute() ) headers = { h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", []) } return ItemMetadata( subject=headers.get("Subject"), sender=headers.get("From"), snippet=msg.get("snippet"), received_at=None, ) return await asyncio.to_thread(_sync) # ── fetch_content ───────────────────────────────────────────────────── async def fetch_content(self, scout, ref: ItemRef) -> ItemContent: """Fetch full body text via GmailClient — transient, must not be persisted.""" creds_info = decrypt_token(scout.oauth_token_encrypted) client = GmailClient(creds_info) # fetch_messages returns EmailMessage dataclasses with body_text already # extracted and decoded. We pass an empty filter to avoid narrowing by # date — callers should only invoke fetch_content for known-new messages. messages = await client.fetch_messages(filter_config=None, since=None) # Pick the message matching our ref (or fall back to first if only one returned). email_msg = next( (m for m in messages if m.id == ref.source_msg_ref), messages[0] if messages else None, ) if email_msg is None: raise ValueError(f"Message {ref.source_msg_ref!r} not found via GmailClient") return ItemContent( metadata=ItemMetadata( subject=email_msg.subject, sender=email_msg.sender, snippet=None, received_at=email_msg.date, ), body_text=email_msg.body_text, raw_headers={}, ) # ── archive ─────────────────────────────────────────────────────────── async def archive(self, scout, ref: ItemRef) -> None: """Move the message to Gmail Trash (recoverable for 30 days).""" def _sync() -> None: service = _get_gmail_service(scout) service.users().messages().trash( userId="me", id=ref.source_msg_ref ).execute() await asyncio.to_thread(_sync) # ── watch management ────────────────────────────────────────────────── async def setup_watch(self, scout) -> None: """Register a Gmail Pub/Sub push watch for the INBOX label. Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic resource name (e.g. ``projects/my-project/topics/gmail-push``). Logs a warning and returns without error if the topic is not configured. """ topic = settings.GMAIL_PUBSUB_TOPIC if not topic: logger.warning( "setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup" ) return def _sync() -> None: service = _get_gmail_service(scout) request_body = { "labelIds": ["INBOX"], "topicName": topic, } resp = service.users().watch(userId="me", body=request_body).execute() scout.gmail_history_id = resp.get("historyId") expiration_ms = resp.get("expiration") if expiration_ms: scout.gmail_watch_expires_at = datetime.fromtimestamp( int(expiration_ms) / 1000, tz=timezone.utc ) await asyncio.to_thread(_sync) async def renew_watch(self, scout) -> None: """Renew an existing Gmail Pub/Sub watch (same as setup_watch).""" await self.setup_watch(scout)