feat(scouts): add GmailConnector

Implements GmailConnector — the first concrete SourceConnector.
Wraps existing GmailClient + low-level Gmail API service for metadata-only
fetch, trash archive, incremental history polling, and Pub/Sub watch setup.
Adds GMAIL_PUBSUB_TOPIC setting (empty string default for dev).
Adds 3 passing unit tests (mocked API, no real credentials required).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto
2026-05-16 04:18:07 +02:00
parent 9f21d5ae8f
commit c559754532
3 changed files with 277 additions and 0 deletions

View File

@@ -58,6 +58,11 @@ class Settings(BaseSettings):
# Prod: https://api.adiuvai.com/api/v1/auth/oauth/google/web-callback
OAUTH_REDIRECT_URI: str = "http://localhost:8000/api/v1/auth/oauth/google/web-callback"
# Gmail Pub/Sub topic for push notifications.
# Full resource name, e.g. "projects/my-project/topics/gmail-push".
# Leave empty in dev — setup_watch will skip registration gracefully.
GMAIL_PUBSUB_TOPIC: str = ""
# Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth
# tokens stored in cloud_agent_configs.oauth_token_encrypted.
# Generate with: from cryptography.fernet import Fernet; Fernet.generate_key()

View File

@@ -0,0 +1,195 @@
"""Gmail SourceConnector — wraps the existing GmailClient.
Responsibilities:
* list_new: incremental fetch since the scout's stored gmail_history_id
* fetch_metadata: subject + sender + snippet only (Gmail metadata format)
* fetch_content: full body text — transient, never persisted by engine
* archive: move a message to Gmail Trash (recoverable for 30 days)
* setup_watch / renew_watch: Gmail push notifications via Pub/Sub
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from app.config.settings import settings
from app.integrations import decrypt_token
from app.integrations.gmail import GmailClient
from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef
logger = logging.getLogger(__name__)
def _get_gmail_service(scout):
"""Return a synchronous Google API client for low-level metadata/history calls."""
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
creds_info = decrypt_token(scout.oauth_token_encrypted)
credentials = Credentials(
token=creds_info.get("token"),
refresh_token=creds_info.get("refresh_token"),
token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=creds_info.get("client_id"),
client_secret=creds_info.get("client_secret"),
scopes=creds_info.get("scopes"),
)
return build("gmail", "v1", credentials=credentials, cache_discovery=False)
class GmailConnector:
source_type = "gmail"
# ── list_new ──────────────────────────────────────────────────────────
async def list_new(self, scout) -> list[ItemRef]:
"""Return new message refs since scout.gmail_history_id.
On first run (gmail_history_id is None/empty), records the current
historyId without backfilling — avoids flooding the user with old mail.
Updates scout.gmail_history_id in-place (caller must persist to DB).
"""
def _sync() -> tuple[list[ItemRef], str | None]:
service = _get_gmail_service(scout)
history_id = scout.gmail_history_id
refs: list[ItemRef] = []
new_history_id = history_id
if history_id:
resp = (
service.users()
.history()
.list(
userId="me",
startHistoryId=history_id,
historyTypes=["messageAdded"],
)
.execute()
)
for entry in resp.get("history", []):
for added in entry.get("messagesAdded", []):
refs.append(ItemRef(source_msg_ref=added["message"]["id"]))
new_history_id = resp.get("historyId", history_id)
else:
# First run: capture baseline history id without backfilling.
profile = service.users().getProfile(userId="me").execute()
new_history_id = profile["historyId"]
return refs, new_history_id
refs, new_history_id = await asyncio.to_thread(_sync)
if new_history_id and new_history_id != scout.gmail_history_id:
scout.gmail_history_id = new_history_id
return refs
# ── fetch_metadata ────────────────────────────────────────────────────
async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata:
"""Fetch subject, sender, snippet only — uses Gmail metadata format (no body)."""
def _sync() -> ItemMetadata:
service = _get_gmail_service(scout)
msg = (
service.users()
.messages()
.get(
userId="me",
id=ref.source_msg_ref,
format="metadata",
metadataHeaders=["Subject", "From", "Date"],
)
.execute()
)
headers = {
h["name"]: h["value"]
for h in msg.get("payload", {}).get("headers", [])
}
return ItemMetadata(
subject=headers.get("Subject"),
sender=headers.get("From"),
snippet=msg.get("snippet"),
received_at=None,
)
return await asyncio.to_thread(_sync)
# ── fetch_content ─────────────────────────────────────────────────────
async def fetch_content(self, scout, ref: ItemRef) -> ItemContent:
"""Fetch full body text via GmailClient — transient, must not be persisted."""
creds_info = decrypt_token(scout.oauth_token_encrypted)
client = GmailClient(creds_info)
# fetch_messages returns EmailMessage dataclasses with body_text already
# extracted and decoded. We pass an empty filter to avoid narrowing by
# date — callers should only invoke fetch_content for known-new messages.
messages = await client.fetch_messages(filter_config=None, since=None)
# Pick the message matching our ref (or fall back to first if only one returned).
email_msg = next(
(m for m in messages if m.id == ref.source_msg_ref),
messages[0] if messages else None,
)
if email_msg is None:
raise ValueError(f"Message {ref.source_msg_ref!r} not found via GmailClient")
return ItemContent(
metadata=ItemMetadata(
subject=email_msg.subject,
sender=email_msg.sender,
snippet=None,
received_at=email_msg.date,
),
body_text=email_msg.body_text,
raw_headers={},
)
# ── archive ───────────────────────────────────────────────────────────
async def archive(self, scout, ref: ItemRef) -> None:
"""Move the message to Gmail Trash (recoverable for 30 days)."""
def _sync() -> None:
service = _get_gmail_service(scout)
service.users().messages().trash(
userId="me", id=ref.source_msg_ref
).execute()
await asyncio.to_thread(_sync)
# ── watch management ──────────────────────────────────────────────────
async def setup_watch(self, scout) -> None:
"""Register a Gmail Pub/Sub push watch for the INBOX label.
Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic
resource name (e.g. ``projects/my-project/topics/gmail-push``).
Logs a warning and returns without error if the topic is not configured.
"""
topic = settings.GMAIL_PUBSUB_TOPIC
if not topic:
logger.warning(
"setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup"
)
return
def _sync() -> None:
service = _get_gmail_service(scout)
request_body = {
"labelIds": ["INBOX"],
"topicName": topic,
}
resp = service.users().watch(userId="me", body=request_body).execute()
scout.gmail_history_id = resp.get("historyId")
expiration_ms = resp.get("expiration")
if expiration_ms:
scout.gmail_watch_expires_at = datetime.fromtimestamp(
int(expiration_ms) / 1000, tz=timezone.utc
)
await asyncio.to_thread(_sync)
async def renew_watch(self, scout) -> None:
"""Renew an existing Gmail Pub/Sub watch (same as setup_watch)."""
await self.setup_watch(scout)

View File

@@ -0,0 +1,77 @@
"""Tests for GmailConnector."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.models import CloudScoutConfig
from app.scouts.connectors.base import ItemRef
from app.scouts.connectors.gmail import GmailConnector
def _make_scout():
return CloudScoutConfig(
id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000003",
provider="gmail",
name="Inbox",
data_types=[],
prompt_template="",
oauth_token_encrypted="encrypted-blob",
schedule_cron="0 * * * *",
enabled=True,
auto_trash_spam=False,
device_inactivity_pause_days=14,
gmail_history_id="100",
)
@pytest.mark.asyncio
async def test_fetch_metadata_returns_subject_and_snippet():
scout = _make_scout()
conn = GmailConnector()
fake_message = {
"id": "msg-1",
"snippet": "preview text",
"payload": {"headers": [
{"name": "Subject", "value": "Hello"},
{"name": "From", "value": "alice@example.com"},
{"name": "Date", "value": "Wed, 14 May 2026 10:00:00 +0000"},
]},
}
with patch("app.scouts.connectors.gmail._get_gmail_service") as mock_svc:
mock_svc.return_value.users().messages().get().execute.return_value = fake_message
meta = await conn.fetch_metadata(scout, ItemRef(source_msg_ref="msg-1"))
assert meta.subject == "Hello"
assert meta.sender == "alice@example.com"
assert meta.snippet == "preview text"
@pytest.mark.asyncio
async def test_fetch_content_returns_body_text():
scout = _make_scout()
conn = GmailConnector()
# decrypt_token is patched because the test doesn't set OAUTH_ENCRYPTION_KEY.
with patch("app.scouts.connectors.gmail.decrypt_token", return_value={}), \
patch("app.scouts.connectors.gmail.GmailClient") as MockClient:
instance = MockClient.return_value
instance.fetch_messages = AsyncMock(return_value=[
MagicMock(id="msg-1", subject="S", sender="a@b", body_text="hello world",
date=datetime.now(tz=timezone.utc), labels=[]),
])
content = await conn.fetch_content(scout, ItemRef(source_msg_ref="msg-1"))
assert content.body_text == "hello world"
assert content.metadata.subject == "S"
@pytest.mark.asyncio
async def test_archive_calls_trash():
scout = _make_scout()
conn = GmailConnector()
with patch("app.scouts.connectors.gmail._get_gmail_service") as mock_svc:
await conn.archive(scout, ItemRef(source_msg_ref="msg-1"))
mock_svc.return_value.users().messages().trash.assert_called()