11 Commits

Author SHA1 Message Date
Roberto
7c9e8296bf Spostati i file del Repo api nella sua sottocartella per l'unione 2026-06-12 17:31:58 +02:00
f36ca72396 Merge pull request 'develop' (#2) from develop into main
Reviewed-on: #2
2026-06-12 15:27:23 +00:00
Roberto
79a926e4d8 feat(scouts): debug scripts + deliver_pending diagnostic logs
- scripts/trigger_gmail_scout.py: manually fire ScoutEngine.trigger_scout
- scripts/inspect_gmail_scout_token.py: decrypt + show stored OAuth scopes
- scripts/show_gmail_scout_state.py: print scout config + queue/log counts
- scripts/reset_triage_queue_to_queued.py: revert delivered → queued for re-delivery
- engine.py: info logs around deliver_pending (rows found, send_json roundtrip)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-11 00:27:04 +02:00
Roberto
f64ca11888 feat(scouts): pending-session Gmail OAuth — create cloud scout at finalize
Refactor _pending_scout_oauth_states from a tuple to a dict carrying
mode (reconnect|create), draft fields, and a transient encrypted token.
Add authorize-draft, session-labels, and cloud/finalize endpoints so the
scout row is created only when the flow completes — abandoned flows leave
no orphan rows. Zero-trust: the encrypted token lives only in the in-memory
session (<=15 min) until finalize persists it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 18:23:52 +02:00
Roberto
95d4e4be75 fix(scouts): delete cloud scout via Core delete to avoid varchar=uuid cascade error
The run_logs relationship joins scout_run_logs.scout_id (varchar) to
cloud_scout_configs.id (uuid); Postgres has no varchar=uuid operator so the
ORM cascade on db.delete(scout) 500'd. Core deletes bypass it; triage queue
rows cascade via FK ondelete.
2026-06-10 18:16:59 +02:00
Roberto
b9b0a10139 feat(scouts): add gmail label-list + disconnect routes 2026-06-10 16:09:10 +02:00
Roberto
78767512f9 feat(scouts): add GmailConnector list_labels + stop_watch 2026-06-10 15:36:29 +02:00
Roberto
6e12429f92 feat(scouts): persist connected gmail_address on oauth callback 2026-06-10 15:34:56 +02:00
Roberto
e87b64cd68 feat(scouts): add gmail_address column to cloud_scout_configs 2026-06-10 15:34:23 +02:00
Roberto
1c65bbfe75 feat(scouts): add cloud scout CRUD routes + serializer 2026-06-10 15:29:02 +02:00
Roberto
4cd1ac11cc feat(scouts): add cloud scout CRUD pydantic schemas 2026-06-10 15:15:05 +02:00
150 changed files with 982 additions and 34 deletions

View File

View File

@@ -0,0 +1,25 @@
"""Add gmail_address to cloud_scout_configs.
Revision ID: 009
Revises: 008
Create Date: 2026-05-16
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "009"
down_revision: Union[str, None] = "008"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("cloud_scout_configs", sa.Column("gmail_address", sa.String(320), nullable=True))
def downgrade() -> None:
op.drop_column("cloud_scout_configs", "gmail_address")

View File

@@ -26,7 +26,7 @@ from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from sqlalchemy import func, select from sqlalchemy import delete as sa_delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel from pydantic import BaseModel
@@ -39,9 +39,13 @@ from app.core.scout_runner import is_agent_running, run_local_agent
from app.core.device_manager import device_manager from app.core.device_manager import device_manager
from app.core.note_summarizer import generate_note_summary from app.core.note_summarizer import generate_note_summary
from app.db import get_session from app.db import get_session
from app.integrations import encrypt_token from app.integrations import decrypt_token, encrypt_token
from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig
from app.scouts.connectors.registry import get_connector
from app.schemas import ( from app.schemas import (
CloudScoutCreateRequest,
CloudScoutResponse,
CloudScoutUpdateRequest,
ScoutCatalogItem, ScoutCatalogItem,
ScoutCreationCheckRequest, ScoutCreationCheckRequest,
ScoutCreationCheckResponse, ScoutCreationCheckResponse,
@@ -269,6 +273,154 @@ async def summarize_note(
return NoteSummarizeResponse(summary=summary) return NoteSummarizeResponse(summary=summary)
# ── Cloud scout CRUD ──────────────────────────────────────────────────────────
_DEFAULT_CLOUD_SCHEDULE = "0 */6 * * *"
def _to_cloud_response(scout: CloudScoutConfig) -> dict:
return {
"id": scout.id,
"user_id": scout.user_id,
"provider": scout.provider,
"name": scout.name,
"data_types": scout.data_types or [],
"prompt_template": scout.prompt_template or "",
"schedule_cron": scout.schedule_cron,
"filter_config": scout.filter_config,
"auto_trash_spam": scout.auto_trash_spam,
"enabled": scout.enabled,
"last_run_at": _dt_ms_opt(scout.last_run_at),
"gmail_address": scout.gmail_address,
"oauth_connected": scout.oauth_token_encrypted is not None,
"created_at": _dt_ms(scout.created_at),
"updated_at": _dt_ms(scout.updated_at),
}
@router.get("/cloud", response_model=list[CloudScoutResponse])
async def list_cloud_scouts(
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
rows = (await db.execute(
select(CloudScoutConfig).where(CloudScoutConfig.user_id == current_user.id)
)).scalars().all()
return [_to_cloud_response(s) for s in rows]
@router.post("/cloud", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
async def create_cloud_scout(
body: CloudScoutCreateRequest,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
provider=body.provider,
name=body.name,
data_types=body.data_types,
prompt_template=body.prompt_template,
filter_config=body.filter_config,
schedule_cron=body.schedule_cron or _DEFAULT_CLOUD_SCHEDULE,
auto_trash_spam=body.auto_trash_spam,
enabled=True,
)
db.add(scout)
await db.commit()
await db.refresh(scout)
return _to_cloud_response(scout)
@router.put("/cloud/{scout_id}", response_model=CloudScoutResponse)
async def update_cloud_scout(
scout_id: str,
body: CloudScoutUpdateRequest,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
if body.name is not None:
scout.name = body.name
if body.data_types is not None:
scout.data_types = body.data_types
if body.prompt_template is not None:
scout.prompt_template = body.prompt_template
if body.schedule_cron is not None:
scout.schedule_cron = body.schedule_cron
if body.filter_config is not None:
scout.filter_config = body.filter_config
if body.auto_trash_spam is not None:
scout.auto_trash_spam = body.auto_trash_spam
if body.enabled is not None:
scout.enabled = body.enabled
await db.commit()
await db.refresh(scout)
return _to_cloud_response(scout)
@router.delete("/cloud/{scout_id}")
async def delete_cloud_scout(
scout_id: str,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
# Core deletes bypass the polymorphic ScoutRunLog relationship whose
# varchar scout_id vs uuid id join is not directly comparable in Postgres.
# scout_run_logs.scout_id is a plain string (matches the str scout_id);
# scout_triage_queue rows cascade automatically via their FK ondelete.
await db.execute(sa_delete(ScoutRunLog).where(ScoutRunLog.scout_id == scout_id))
await db.execute(sa_delete(CloudScoutConfig).where(CloudScoutConfig.id == scout_id))
await db.commit()
return {"ok": True}
@router.get("/cloud/{scout_id}/gmail-labels")
async def list_gmail_labels(
scout_id: str,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
try:
connector = get_connector("gmail")
except KeyError:
return []
return await connector.list_labels(scout)
@router.post("/cloud/{scout_id}/gmail-disconnect", response_model=CloudScoutResponse)
async def disconnect_gmail(
scout_id: str,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
try:
connector = get_connector("gmail")
await connector.stop_watch(scout)
except KeyError:
pass
scout.oauth_token_encrypted = None
scout.gmail_history_id = None
scout.gmail_watch_expires_at = None
scout.gmail_address = None
scout.enabled = False
await db.commit()
await db.refresh(scout)
return _to_cloud_response(scout)
# ── Gmail OAuth setup (scout-specific) ─────────────────────────────────────── # ── Gmail OAuth setup (scout-specific) ───────────────────────────────────────
# Scopes required for Gmail scout connectivity. # Scopes required for Gmail scout connectivity.
@@ -283,11 +435,35 @@ _GMAIL_SCOUT_SCOPES = [
_GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" _GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
# In-memory pending OAuth states for scout Gmail consent: # In-memory pending OAuth states for scout Gmail consent.
# state → (code_verifier, scout_id, user_id, expires_at_epoch_s) #
# Production note: replace with Redis for multi-process deployments. # state → {
_pending_scout_oauth_states: dict[str, tuple[str, str, str, float]] = {} # "code_verifier": str,
_SCOUT_OAUTH_TTL_SECONDS = 600 # 10 minutes # "user_id": str,
# "expires_at": float (epoch seconds),
# "mode": "reconnect" | "create",
# "scout_id": str | None, # set for reconnect mode
# "draft": {name, prompt_template, auto_trash_spam} | None, # set for create mode
# "token_encrypted": str | None, # populated after a successful create-mode callback
# "gmail_address": str | None,
# }
#
# Zero-trust: in create mode the encrypted Gmail token lives ONLY here, in
# process memory, for at most _SCOUT_OAUTH_TTL_SECONDS. It is persisted to the
# DB only when the user finalizes the scout (POST /scouts/cloud/finalize).
# An abandoned/errored flow leaves no scout row and no stored token.
#
# Production note: this in-memory store is single-process only — replace with
# Redis (keyed by state, TTL'd) for multi-worker deployments.
_pending_scout_oauth_states: dict[str, dict] = {}
_SCOUT_OAUTH_TTL_SECONDS = 900 # 15 minutes
def _purge_expired_oauth_states() -> None:
now = time.time()
expired = [s for s, e in _pending_scout_oauth_states.items() if e.get("expires_at", 0) < now]
for s in expired:
del _pending_scout_oauth_states[s]
def _scout_gmail_redirect_uri() -> str: def _scout_gmail_redirect_uri() -> str:
@@ -311,6 +487,34 @@ class _ScoutGmailCallbackBody(BaseModel):
state: str state: str
class _ScoutGmailAuthorizeDraftBody(BaseModel):
name: str
prompt_template: str = ""
auto_trash_spam: bool = False
class _ScoutGmailFinalizeBody(BaseModel):
session: str
filter_config: dict | None = None
def _build_gmail_authorize_url(state: str, code_challenge: str) -> str:
"""Build the Google consent URL for the scout Gmail flow (shared by both modes)."""
redirect_uri = _scout_gmail_redirect_uri()
params = {
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(_GMAIL_SCOUT_SCOPES),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
}
return f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
@router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse) @router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize( async def scout_gmail_oauth_authorize(
scout_id: str, scout_id: str,
@@ -331,28 +535,63 @@ async def scout_gmail_oauth_authorize(
code_verifier, code_challenge = generate_pkce_pair() code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32) state = secrets.token_urlsafe(32)
# Purge expired states to prevent unbounded growth. _purge_expired_oauth_states()
now = time.time()
expired = [s for s, (_, _, _, exp) in _pending_scout_oauth_states.items() if exp < now]
for s in expired:
del _pending_scout_oauth_states[s]
_pending_scout_oauth_states[state] = (code_verifier, scout_id, current_user.id, now + _SCOUT_OAUTH_TTL_SECONDS) _pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
redirect_uri = _scout_gmail_redirect_uri() "user_id": current_user.id,
params = { "expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"client_id": settings.GOOGLE_AUTH_CLIENT_ID, "mode": "reconnect",
"redirect_uri": redirect_uri, "scout_id": scout_id,
"response_type": "code", "draft": None,
"scope": " ".join(_GMAIL_SCOUT_SCOPES), "token_encrypted": None,
"state": state, "gmail_address": None,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
} }
authorize_url = f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
return _ScoutGmailAuthorizeResponse(authorize_url=authorize_url) return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.post("/oauth/gmail/authorize-draft", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize_draft(
body: _ScoutGmailAuthorizeDraftBody,
current_user: UserProfile = Depends(get_current_user),
) -> _ScoutGmailAuthorizeResponse:
"""Start the Gmail OAuth flow in *creation* mode — no scout row exists yet.
The draft scout fields are held in the pending OAuth session; the scout is
only created once the user finalizes (POST /scouts/cloud/finalize).
"""
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
"Google OAuth is not configured on this server",
)
code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32)
_purge_expired_oauth_states()
_pending_scout_oauth_states[state] = {
"code_verifier": code_verifier,
"user_id": current_user.id,
"expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
"mode": "create",
"scout_id": None,
"draft": {
"name": body.name,
"prompt_template": body.prompt_template,
"auto_trash_spam": body.auto_trash_spam,
},
"token_encrypted": None,
"gmail_address": None,
}
return _ScoutGmailAuthorizeResponse(
authorize_url=_build_gmail_authorize_url(state, code_challenge)
)
@router.get("/oauth/gmail/web-callback", include_in_schema=False) @router.get("/oauth/gmail/web-callback", include_in_schema=False)
@@ -379,10 +618,16 @@ async def scout_gmail_oauth_callback(
the ``code`` and ``state`` params. the ``code`` and ``state`` params.
""" """
entry = _pending_scout_oauth_states.pop(body.state, None) entry = _pending_scout_oauth_states.pop(body.state, None)
if entry is None or entry[3] < time.time() or entry[2] != current_user.id: if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
code_verifier, scout_id, _, _ = entry code_verifier = entry["code_verifier"]
mode = entry["mode"]
scout_id = entry.get("scout_id")
redirect_uri = _scout_gmail_redirect_uri() redirect_uri = _scout_gmail_redirect_uri()
@@ -420,14 +665,48 @@ async def scout_gmail_oauth_callback(
} }
encrypted = encrypt_token(creds_dict) encrypted = encrypt_token(creds_dict)
# Fetch the connected Gmail address for display.
gmail_address: str | None = None
try:
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
def _fetch_email() -> str | None:
creds = Credentials(
token=creds_dict["token"],
refresh_token=creds_dict.get("refresh_token"),
token_uri=creds_dict["token_uri"],
client_id=creds_dict["client_id"],
client_secret=creds_dict["client_secret"],
scopes=creds_dict["scopes"],
)
service = build("gmail", "v1", credentials=creds, cache_discovery=False)
profile = service.users().getProfile(userId="me").execute()
return profile.get("emailAddress")
gmail_address = await asyncio.to_thread(_fetch_email)
except Exception:
logger.exception("failed to fetch gmail address (mode=%s)", mode)
if mode == "create":
# Do NOT create a scout yet. Hold the encrypted token + address in the
# transient in-memory session; the scout is created at finalize.
entry["token_encrypted"] = encrypted
entry["gmail_address"] = gmail_address
entry["expires_at"] = time.time() + _SCOUT_OAUTH_TTL_SECONDS
_pending_scout_oauth_states[body.state] = entry
return {"ok": True, "session_id": body.state, "gmail_address": gmail_address}
# mode == "reconnect": update the existing scout in place.
scout = await db.get(CloudScoutConfig, scout_id) scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id: if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found") raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted scout.oauth_token_encrypted = encrypted
scout.gmail_address = gmail_address
await db.commit() await db.commit()
# Attempt to set up Gmail push watch so we start receiving Pub/Sub notifications. # Attempt to set up Gmail push watch so we start receiving Pub/Sub notifications.
from app.scouts.connectors.registry import get_connector
try: try:
connector = get_connector("gmail") connector = get_connector("gmail")
await connector.setup_watch(scout) await connector.setup_watch(scout)
@@ -437,4 +716,92 @@ async def scout_gmail_oauth_callback(
except Exception: except Exception:
logger.exception("setup_watch failed for scout %s", scout_id) logger.exception("setup_watch failed for scout %s", scout_id)
return {"ok": True} return {"ok": True, "session_id": None, "gmail_address": gmail_address}
@router.get("/oauth/gmail/session-labels")
async def scout_gmail_session_labels(
session: str,
current_user: UserProfile = Depends(get_current_user),
) -> list[dict]:
"""List Gmail labels for a pending create-mode OAuth session (no scout row yet).
Builds a Gmail service from the session's transient decrypted token.
Returns [] on any error.
"""
entry = _pending_scout_oauth_states.get(session)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_404_NOT_FOUND, "Session not found or expired")
try:
from app.scouts.connectors.gmail import _gmail_service_from_token
creds = decrypt_token(entry["token_encrypted"])
def _sync() -> list[dict]:
service = _gmail_service_from_token(creds)
resp = service.users().labels().list(userId="me").execute()
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
return await asyncio.to_thread(_sync)
except Exception:
logger.exception("session-labels failed for session %s", session)
return []
@router.post("/cloud/finalize", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
async def finalize_cloud_scout(
body: _ScoutGmailFinalizeBody,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
"""Create the cloud scout from a completed create-mode OAuth session.
This is the only path that persists the Gmail token for a newly-created
scout. Abandoned flows never reach here, so they leave no orphan rows.
"""
entry = _pending_scout_oauth_states.pop(body.session, None)
if (
entry is None
or entry["expires_at"] < time.time()
or entry["user_id"] != current_user.id
or entry.get("mode") != "create"
or entry.get("token_encrypted") is None
):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth session")
draft = entry["draft"] or {}
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
provider="gmail",
name=draft.get("name", ""),
data_types=[],
prompt_template=draft.get("prompt_template", ""),
filter_config=body.filter_config,
schedule_cron=_DEFAULT_CLOUD_SCHEDULE,
auto_trash_spam=draft.get("auto_trash_spam", False),
enabled=True,
oauth_token_encrypted=entry["token_encrypted"],
gmail_address=entry.get("gmail_address"),
)
db.add(scout)
await db.commit()
await db.refresh(scout)
# Best-effort Gmail push watch — failure must not block scout creation.
try:
connector = get_connector("gmail")
await connector.setup_watch(scout)
await db.commit()
except KeyError:
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout.id)
except Exception:
logger.exception("setup_watch failed for scout %s", scout.id)
return _to_cloud_response(scout)

View File

@@ -223,6 +223,7 @@ class CloudScoutConfig(Base):
gmail_history_id: Mapped[str | None] = mapped_column(String(64), nullable=True) gmail_history_id: Mapped[str | None] = mapped_column(String(64), nullable=True)
gmail_watch_expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) gmail_watch_expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
device_inactivity_pause_days: Mapped[int] = mapped_column(Integer, nullable=False, default=14, server_default="14") device_inactivity_pause_days: Mapped[int] = mapped_column(Integer, nullable=False, default=14, server_default="14")
gmail_address: Mapped[str | None] = mapped_column(String(320), nullable=True)
run_logs: Mapped[list["ScoutRunLog"]] = relationship( run_logs: Mapped[list["ScoutRunLog"]] = relationship(
back_populates="cloud_scout", back_populates="cloud_scout",

View File

@@ -276,6 +276,46 @@ class ScoutRunLogResponse(BaseModel):
completed_at: int | None completed_at: int | None
# ── Cloud Scout CRUD ──────────────────────────────────────────────────
class CloudScoutCreateRequest(BaseModel):
name: str
provider: Literal["gmail", "teams", "outlook"]
data_types: list[str] = Field(default_factory=list)
prompt_template: str = ""
schedule_cron: str | None = None # None → server default
filter_config: dict | None = None
auto_trash_spam: bool = False
class CloudScoutUpdateRequest(BaseModel):
name: str | None = None
data_types: list[str] | None = None
prompt_template: str | None = None
schedule_cron: str | None = None
filter_config: dict | None = None
auto_trash_spam: bool | None = None
enabled: bool | None = None
class CloudScoutResponse(BaseModel):
id: str
user_id: str
provider: str
name: str
data_types: list[str]
prompt_template: str
schedule_cron: str
filter_config: dict | None
auto_trash_spam: bool
enabled: bool
last_run_at: int | None
gmail_address: str | None
oauth_connected: bool
created_at: int
updated_at: int
# ── Chatbot Journey ─────────────────────────────────────────────────── # ── Chatbot Journey ───────────────────────────────────────────────────

View File

@@ -45,12 +45,15 @@ def _extract_plain_text_body(payload: dict) -> str:
return "" return ""
def _get_gmail_service(scout): def _gmail_service_from_token(creds_info: dict):
"""Return a synchronous Google API client for low-level metadata/history calls.""" """Build a synchronous Gmail API client from a decrypted credentials dict.
Shared by ``_get_gmail_service`` (scout-backed) and the pending-session
OAuth flow which has a raw token but no scout row yet.
"""
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
creds_info = decrypt_token(scout.oauth_token_encrypted)
credentials = Credentials( credentials = Credentials(
token=creds_info.get("token"), token=creds_info.get("token"),
refresh_token=creds_info.get("refresh_token"), refresh_token=creds_info.get("refresh_token"),
@@ -62,6 +65,12 @@ def _get_gmail_service(scout):
return build("gmail", "v1", credentials=credentials, cache_discovery=False) return build("gmail", "v1", credentials=credentials, cache_discovery=False)
def _get_gmail_service(scout):
"""Return a synchronous Google API client for low-level metadata/history calls."""
creds_info = decrypt_token(scout.oauth_token_encrypted)
return _gmail_service_from_token(creds_info)
class GmailConnector: class GmailConnector:
source_type = "gmail" source_type = "gmail"
@@ -211,3 +220,29 @@ class GmailConnector:
async def renew_watch(self, scout) -> None: async def renew_watch(self, scout) -> None:
"""Renew an existing Gmail Pub/Sub watch (same as setup_watch).""" """Renew an existing Gmail Pub/Sub watch (same as setup_watch)."""
await self.setup_watch(scout) await self.setup_watch(scout)
async def list_labels(self, scout) -> list[dict]:
"""Return the account's Gmail labels as [{id, name}]. Empty if no token."""
if not scout.oauth_token_encrypted:
return []
def _sync() -> list[dict]:
service = _get_gmail_service(scout)
resp = service.users().labels().list(userId="me").execute()
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
return await asyncio.to_thread(_sync)
async def stop_watch(self, scout) -> None:
"""Stop Gmail push notifications. Swallows errors (watch may be gone)."""
if not scout.oauth_token_encrypted:
return
def _sync() -> None:
service = _get_gmail_service(scout)
service.users().stop(userId="me").execute()
try:
await asyncio.to_thread(_sync)
except Exception:
logger.exception("stop_watch failed for scout %s", scout.id)

View File

@@ -144,6 +144,7 @@ class ScoutEngine:
ScoutTriageQueue.status == "queued", ScoutTriageQueue.status == "queued",
) )
)).scalars().all() )).scalars().all()
logger.info("deliver_pending: user=%s found %d queued rows", user_id, len(rows))
for row in rows: for row in rows:
try: try:
@@ -173,7 +174,9 @@ class ScoutEngine:
"payload": None, "payload": None,
}, },
} }
logger.info("deliver_pending: sending proposal id=%s subject=%r", row.id, meta.subject)
await ws.send_json(payload) await ws.send_json(payload)
logger.info("deliver_pending: send_json returned for proposal id=%s", row.id)
row.status = "delivered" row.status = "delivered"
row.delivered_at = datetime.now(tz=timezone.utc) row.delivered_at = datetime.now(tz=timezone.utc)

View File

@@ -0,0 +1,56 @@
"""Decrypt and inspect the Gmail scout's stored OAuth token.
Shows what scopes were granted at consent time. If gmail.readonly / gmail.modify
are missing, the consent screen didn't actually grant them.
Usage:
python scripts/inspect_gmail_scout_token.py
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
_API_ROOT = Path(__file__).resolve().parent.parent
if str(_API_ROOT) not in sys.path:
sys.path.insert(0, str(_API_ROOT))
from sqlalchemy import select
from app.db import async_session
from app.integrations import decrypt_token
from app.models import CloudScoutConfig
async def main() -> None:
async with async_session() as session:
scouts = (
await session.execute(
select(CloudScoutConfig).where(CloudScoutConfig.provider == "gmail")
)
).scalars().all()
if not scouts:
print("No Gmail scouts found.")
return
for scout in scouts:
print(f"\nScout: {scout.name} (id={scout.id})")
if not scout.oauth_token_encrypted:
print(" (no token stored)")
continue
try:
creds = decrypt_token(scout.oauth_token_encrypted)
except Exception as exc:
print(f" decrypt failed: {exc}")
continue
print(f" has refresh_token: {bool(creds.get('refresh_token'))}")
print(f" stored scopes: {creds.get('scopes')}")
print(f" token_uri: {creds.get('token_uri')}")
print(f" client_id (last 8): ...{(creds.get('client_id') or '')[-8:]}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,35 @@
"""Re-queue all delivered (but not acked) triage rows so deliver_pending sends them again.
Usage:
python scripts/reset_triage_queue_to_queued.py
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
_API_ROOT = Path(__file__).resolve().parent.parent
if str(_API_ROOT) not in sys.path:
sys.path.insert(0, str(_API_ROOT))
from sqlalchemy import update
from app.db import async_session
from app.models import ScoutTriageQueue
async def main() -> None:
async with async_session() as session:
result = await session.execute(
update(ScoutTriageQueue)
.where(ScoutTriageQueue.status == "delivered")
.values(status="queued", delivered_at=None)
)
await session.commit()
print(f"Reset {result.rowcount} rows from delivered → queued")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,59 @@
"""Print Gmail scout state for debugging.
Usage:
python scripts/show_gmail_scout_state.py
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
_API_ROOT = Path(__file__).resolve().parent.parent
if str(_API_ROOT) not in sys.path:
sys.path.insert(0, str(_API_ROOT))
from sqlalchemy import select, func
from app.db import async_session
from app.models import CloudScoutConfig, ScoutTriageQueue, ScoutRunLog
async def main() -> None:
async with async_session() as session:
scouts = (
await session.execute(
select(CloudScoutConfig).where(CloudScoutConfig.provider == "gmail")
)
).scalars().all()
for scout in scouts:
print(f"\nScout: {scout.name} (id={scout.id})")
print(f" enabled: {scout.enabled}")
print(f" gmail_history_id: {scout.gmail_history_id}")
print(f" gmail_watch_expires_at: {scout.gmail_watch_expires_at}")
print(f" auto_trash_spam: {scout.auto_trash_spam}")
print(f" last_run_at: {scout.last_run_at}")
queued_count = (
await session.execute(
select(func.count())
.select_from(ScoutTriageQueue)
.where(ScoutTriageQueue.scout_id == scout.id)
)
).scalar()
print(f" triage_queue rows: {queued_count}")
run_count = (
await session.execute(
select(func.count())
.select_from(ScoutRunLog)
.where(ScoutRunLog.scout_id == scout.id)
)
).scalar()
print(f" scout_run_logs: {run_count}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,74 @@
"""Manually trigger the user's Gmail scout for testing.
Usage:
python scripts/trigger_gmail_scout.py [user_email]
If user_email omitted, picks the first user with a Gmail scout.
Runs ScoutEngine.trigger_scout — which calls Gmail history.list since last
gmail_history_id, fetches each new message, runs LLM triage, inserts queue rows
for relevant items.
After running, check the queue:
psql -d adiuvai -c "select source_msg_ref, triage_verdict, status from scout_triage_queue order by triaged_at desc limit 10"
Then restart the Electron app to trigger deliver_pending → frames → local
scout_suggestions rows.
"""
from __future__ import annotations
import asyncio
import sys
import uuid
from pathlib import Path
# Ensure api/ root is importable when running from scripts/ subdir
_API_ROOT = Path(__file__).resolve().parent.parent
if str(_API_ROOT) not in sys.path:
sys.path.insert(0, str(_API_ROOT))
from sqlalchemy import select
from app.db import async_session
from app.models import CloudScoutConfig, User
from app.scouts.connectors.gmail import GmailConnector
from app.scouts.connectors.registry import register_connector
from app.scouts.engine import ScoutEngine
async def main() -> None:
register_connector(GmailConnector())
target_email = sys.argv[1] if len(sys.argv) > 1 else None
async with async_session() as session:
q = select(CloudScoutConfig).where(
CloudScoutConfig.provider == "gmail",
CloudScoutConfig.enabled.is_(True),
)
if target_email:
user = (
await session.execute(select(User).where(User.email == target_email))
).scalar_one_or_none()
if user is None:
print(f"No user with email {target_email}")
return
q = q.where(CloudScoutConfig.user_id == user.id)
scouts = (await session.execute(q)).scalars().all()
if not scouts:
print("No enabled Gmail scouts found. Create one in Settings → Scouts first.")
return
for scout in scouts:
print(f"Triggering scout id={scout.id} name={scout.name!r} user={scout.user_id}")
try:
await ScoutEngine().trigger_scout(uuid.UUID(scout.id))
print(" → done")
except Exception as exc:
print(f" → failed: {exc}")
if __name__ == "__main__":
asyncio.run(main())

Some files were not shown because too many files have changed in this diff Show More