Files
api/app/scouts/connectors/gmail.py
Roberto 0833db239c fix(scouts): fetch single Gmail message instead of bulk in fetch_content
Replace bulk GmailClient.fetch_messages() + linear search with a direct
service.users().messages().get(format="full") call. Adds _extract_plain_text_body
helper for recursive MIME part walking. Update test to patch _get_gmail_service.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 05:39:39 +02:00

214 lines
8.6 KiB
Python

"""Gmail SourceConnector — wraps the existing GmailClient.
Responsibilities:
* list_new: incremental fetch since the scout's stored gmail_history_id
* fetch_metadata: subject + sender + snippet only (Gmail metadata format)
* fetch_content: full body text — transient, never persisted by engine
* archive: move a message to Gmail Trash (recoverable for 30 days)
* setup_watch / renew_watch: Gmail push notifications via Pub/Sub
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from app.config.settings import settings
from app.integrations import decrypt_token
from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef
logger = logging.getLogger(__name__)
def _extract_plain_text_body(payload: dict) -> str:
"""Recursively walk a Gmail message payload to find text/plain content."""
import base64
mime_type = payload.get("mimeType", "")
if mime_type == "text/plain":
data = payload.get("body", {}).get("data", "")
if data:
return base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
return ""
if mime_type.startswith("multipart/"):
for part in payload.get("parts", []):
text = _extract_plain_text_body(part)
if text:
return text
# text/html fallback: strip tags rudimentarily if no text/plain part
if mime_type == "text/html":
data = payload.get("body", {}).get("data", "")
if data:
import re
html = base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
return re.sub(r"<[^>]+>", " ", html)
return ""
def _get_gmail_service(scout):
"""Return a synchronous Google API client for low-level metadata/history calls."""
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
creds_info = decrypt_token(scout.oauth_token_encrypted)
credentials = Credentials(
token=creds_info.get("token"),
refresh_token=creds_info.get("refresh_token"),
token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=creds_info.get("client_id"),
client_secret=creds_info.get("client_secret"),
scopes=creds_info.get("scopes"),
)
return build("gmail", "v1", credentials=credentials, cache_discovery=False)
class GmailConnector:
source_type = "gmail"
# ── list_new ──────────────────────────────────────────────────────────
async def list_new(self, scout) -> list[ItemRef]:
"""Return new message refs since scout.gmail_history_id.
On first run (gmail_history_id is None/empty), records the current
historyId without backfilling — avoids flooding the user with old mail.
Updates scout.gmail_history_id in-place (caller must persist to DB).
"""
def _sync() -> tuple[list[ItemRef], str | None]:
service = _get_gmail_service(scout)
history_id = scout.gmail_history_id
refs: list[ItemRef] = []
new_history_id = history_id
if history_id:
resp = (
service.users()
.history()
.list(
userId="me",
startHistoryId=history_id,
historyTypes=["messageAdded"],
)
.execute()
)
for entry in resp.get("history", []):
for added in entry.get("messagesAdded", []):
refs.append(ItemRef(source_msg_ref=added["message"]["id"]))
new_history_id = resp.get("historyId", history_id)
else:
# First run: capture baseline history id without backfilling.
profile = service.users().getProfile(userId="me").execute()
new_history_id = profile["historyId"]
return refs, new_history_id
refs, new_history_id = await asyncio.to_thread(_sync)
if new_history_id and new_history_id != scout.gmail_history_id:
scout.gmail_history_id = new_history_id
return refs
# ── fetch_metadata ────────────────────────────────────────────────────
async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata:
"""Fetch subject, sender, snippet only — uses Gmail metadata format (no body)."""
def _sync() -> ItemMetadata:
service = _get_gmail_service(scout)
msg = (
service.users()
.messages()
.get(
userId="me",
id=ref.source_msg_ref,
format="metadata",
metadataHeaders=["Subject", "From", "Date"],
)
.execute()
)
headers = {
h["name"]: h["value"]
for h in msg.get("payload", {}).get("headers", [])
}
return ItemMetadata(
subject=headers.get("Subject"),
sender=headers.get("From"),
snippet=msg.get("snippet"),
received_at=None,
)
return await asyncio.to_thread(_sync)
# ── fetch_content ─────────────────────────────────────────────────────
async def fetch_content(self, scout, ref: ItemRef) -> ItemContent:
"""Fetch full body text for a single message — transient, must not be persisted."""
def _sync() -> ItemContent:
service = _get_gmail_service(scout)
msg = service.users().messages().get(
userId="me", id=ref.source_msg_ref, format="full",
).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
body_text = _extract_plain_text_body(msg.get("payload", {}))
return ItemContent(
metadata=ItemMetadata(
subject=headers.get("Subject"),
sender=headers.get("From"),
snippet=msg.get("snippet"),
received_at=None,
),
body_text=body_text,
raw_headers=headers,
)
return await asyncio.to_thread(_sync)
# ── archive ───────────────────────────────────────────────────────────
async def archive(self, scout, ref: ItemRef) -> None:
"""Move the message to Gmail Trash (recoverable for 30 days)."""
def _sync() -> None:
service = _get_gmail_service(scout)
service.users().messages().trash(
userId="me", id=ref.source_msg_ref
).execute()
await asyncio.to_thread(_sync)
# ── watch management ──────────────────────────────────────────────────
async def setup_watch(self, scout) -> None:
"""Register a Gmail Pub/Sub push watch for the INBOX label.
Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic
resource name (e.g. ``projects/my-project/topics/gmail-push``).
Logs a warning and returns without error if the topic is not configured.
"""
topic = settings.GMAIL_PUBSUB_TOPIC
if not topic:
logger.warning(
"setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup"
)
return
def _sync() -> None:
service = _get_gmail_service(scout)
request_body = {
"labelIds": ["INBOX"],
"topicName": topic,
}
resp = service.users().watch(userId="me", body=request_body).execute()
scout.gmail_history_id = resp.get("historyId")
expiration_ms = resp.get("expiration")
if expiration_ms:
scout.gmail_watch_expires_at = datetime.fromtimestamp(
int(expiration_ms) / 1000, tz=timezone.utc
)
await asyncio.to_thread(_sync)
async def renew_watch(self, scout) -> None:
"""Renew an existing Gmail Pub/Sub watch (same as setup_watch)."""
await self.setup_watch(scout)