Replace bulk GmailClient.fetch_messages() + linear search with a direct service.users().messages().get(format="full") call. Adds _extract_plain_text_body helper for recursive MIME part walking. Update test to patch _get_gmail_service. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
214 lines
8.6 KiB
Python
214 lines
8.6 KiB
Python
"""Gmail SourceConnector — wraps the existing GmailClient.
|
|
|
|
Responsibilities:
|
|
* list_new: incremental fetch since the scout's stored gmail_history_id
|
|
* fetch_metadata: subject + sender + snippet only (Gmail metadata format)
|
|
* fetch_content: full body text — transient, never persisted by engine
|
|
* archive: move a message to Gmail Trash (recoverable for 30 days)
|
|
* setup_watch / renew_watch: Gmail push notifications via Pub/Sub
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from app.config.settings import settings
|
|
from app.integrations import decrypt_token
|
|
from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _extract_plain_text_body(payload: dict) -> str:
|
|
"""Recursively walk a Gmail message payload to find text/plain content."""
|
|
import base64
|
|
mime_type = payload.get("mimeType", "")
|
|
if mime_type == "text/plain":
|
|
data = payload.get("body", {}).get("data", "")
|
|
if data:
|
|
return base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
|
|
return ""
|
|
if mime_type.startswith("multipart/"):
|
|
for part in payload.get("parts", []):
|
|
text = _extract_plain_text_body(part)
|
|
if text:
|
|
return text
|
|
# text/html fallback: strip tags rudimentarily if no text/plain part
|
|
if mime_type == "text/html":
|
|
data = payload.get("body", {}).get("data", "")
|
|
if data:
|
|
import re
|
|
html = base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
|
|
return re.sub(r"<[^>]+>", " ", html)
|
|
return ""
|
|
|
|
|
|
def _get_gmail_service(scout):
|
|
"""Return a synchronous Google API client for low-level metadata/history calls."""
|
|
from googleapiclient.discovery import build
|
|
from google.oauth2.credentials import Credentials
|
|
|
|
creds_info = decrypt_token(scout.oauth_token_encrypted)
|
|
credentials = Credentials(
|
|
token=creds_info.get("token"),
|
|
refresh_token=creds_info.get("refresh_token"),
|
|
token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"),
|
|
client_id=creds_info.get("client_id"),
|
|
client_secret=creds_info.get("client_secret"),
|
|
scopes=creds_info.get("scopes"),
|
|
)
|
|
return build("gmail", "v1", credentials=credentials, cache_discovery=False)
|
|
|
|
|
|
class GmailConnector:
|
|
source_type = "gmail"
|
|
|
|
# ── list_new ──────────────────────────────────────────────────────────
|
|
|
|
async def list_new(self, scout) -> list[ItemRef]:
|
|
"""Return new message refs since scout.gmail_history_id.
|
|
|
|
On first run (gmail_history_id is None/empty), records the current
|
|
historyId without backfilling — avoids flooding the user with old mail.
|
|
Updates scout.gmail_history_id in-place (caller must persist to DB).
|
|
"""
|
|
def _sync() -> tuple[list[ItemRef], str | None]:
|
|
service = _get_gmail_service(scout)
|
|
history_id = scout.gmail_history_id
|
|
refs: list[ItemRef] = []
|
|
new_history_id = history_id
|
|
|
|
if history_id:
|
|
resp = (
|
|
service.users()
|
|
.history()
|
|
.list(
|
|
userId="me",
|
|
startHistoryId=history_id,
|
|
historyTypes=["messageAdded"],
|
|
)
|
|
.execute()
|
|
)
|
|
for entry in resp.get("history", []):
|
|
for added in entry.get("messagesAdded", []):
|
|
refs.append(ItemRef(source_msg_ref=added["message"]["id"]))
|
|
new_history_id = resp.get("historyId", history_id)
|
|
else:
|
|
# First run: capture baseline history id without backfilling.
|
|
profile = service.users().getProfile(userId="me").execute()
|
|
new_history_id = profile["historyId"]
|
|
|
|
return refs, new_history_id
|
|
|
|
refs, new_history_id = await asyncio.to_thread(_sync)
|
|
if new_history_id and new_history_id != scout.gmail_history_id:
|
|
scout.gmail_history_id = new_history_id
|
|
return refs
|
|
|
|
# ── fetch_metadata ────────────────────────────────────────────────────
|
|
|
|
async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata:
|
|
"""Fetch subject, sender, snippet only — uses Gmail metadata format (no body)."""
|
|
|
|
def _sync() -> ItemMetadata:
|
|
service = _get_gmail_service(scout)
|
|
msg = (
|
|
service.users()
|
|
.messages()
|
|
.get(
|
|
userId="me",
|
|
id=ref.source_msg_ref,
|
|
format="metadata",
|
|
metadataHeaders=["Subject", "From", "Date"],
|
|
)
|
|
.execute()
|
|
)
|
|
headers = {
|
|
h["name"]: h["value"]
|
|
for h in msg.get("payload", {}).get("headers", [])
|
|
}
|
|
return ItemMetadata(
|
|
subject=headers.get("Subject"),
|
|
sender=headers.get("From"),
|
|
snippet=msg.get("snippet"),
|
|
received_at=None,
|
|
)
|
|
|
|
return await asyncio.to_thread(_sync)
|
|
|
|
# ── fetch_content ─────────────────────────────────────────────────────
|
|
|
|
async def fetch_content(self, scout, ref: ItemRef) -> ItemContent:
|
|
"""Fetch full body text for a single message — transient, must not be persisted."""
|
|
|
|
def _sync() -> ItemContent:
|
|
service = _get_gmail_service(scout)
|
|
msg = service.users().messages().get(
|
|
userId="me", id=ref.source_msg_ref, format="full",
|
|
).execute()
|
|
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
|
body_text = _extract_plain_text_body(msg.get("payload", {}))
|
|
return ItemContent(
|
|
metadata=ItemMetadata(
|
|
subject=headers.get("Subject"),
|
|
sender=headers.get("From"),
|
|
snippet=msg.get("snippet"),
|
|
received_at=None,
|
|
),
|
|
body_text=body_text,
|
|
raw_headers=headers,
|
|
)
|
|
|
|
return await asyncio.to_thread(_sync)
|
|
|
|
# ── archive ───────────────────────────────────────────────────────────
|
|
|
|
async def archive(self, scout, ref: ItemRef) -> None:
|
|
"""Move the message to Gmail Trash (recoverable for 30 days)."""
|
|
|
|
def _sync() -> None:
|
|
service = _get_gmail_service(scout)
|
|
service.users().messages().trash(
|
|
userId="me", id=ref.source_msg_ref
|
|
).execute()
|
|
|
|
await asyncio.to_thread(_sync)
|
|
|
|
# ── watch management ──────────────────────────────────────────────────
|
|
|
|
async def setup_watch(self, scout) -> None:
|
|
"""Register a Gmail Pub/Sub push watch for the INBOX label.
|
|
|
|
Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic
|
|
resource name (e.g. ``projects/my-project/topics/gmail-push``).
|
|
Logs a warning and returns without error if the topic is not configured.
|
|
"""
|
|
topic = settings.GMAIL_PUBSUB_TOPIC
|
|
if not topic:
|
|
logger.warning(
|
|
"setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup"
|
|
)
|
|
return
|
|
|
|
def _sync() -> None:
|
|
service = _get_gmail_service(scout)
|
|
request_body = {
|
|
"labelIds": ["INBOX"],
|
|
"topicName": topic,
|
|
}
|
|
resp = service.users().watch(userId="me", body=request_body).execute()
|
|
scout.gmail_history_id = resp.get("historyId")
|
|
expiration_ms = resp.get("expiration")
|
|
if expiration_ms:
|
|
scout.gmail_watch_expires_at = datetime.fromtimestamp(
|
|
int(expiration_ms) / 1000, tz=timezone.utc
|
|
)
|
|
|
|
await asyncio.to_thread(_sync)
|
|
|
|
async def renew_watch(self, scout) -> None:
|
|
"""Renew an existing Gmail Pub/Sub watch (same as setup_watch)."""
|
|
await self.setup_watch(scout)
|