Compare commits
18 Commits
9f21d5ae8f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c9e8296bf | ||
| f36ca72396 | |||
|
|
79a926e4d8 | ||
|
|
f64ca11888 | ||
|
|
95d4e4be75 | ||
|
|
b9b0a10139 | ||
|
|
78767512f9 | ||
|
|
6e12429f92 | ||
|
|
e87b64cd68 | ||
|
|
1c65bbfe75 | ||
|
|
4cd1ac11cc | ||
|
|
0833db239c | ||
|
|
11b31e5814 | ||
|
|
cb274c9728 | ||
|
|
d3497a1908 | ||
|
|
0c0299808c | ||
|
|
d1016fd65a | ||
|
|
c559754532 |
0
.gitignore → api/.gitignore
vendored
0
.gitignore → api/.gitignore
vendored
25
api/alembic/versions/009_cloud_scout_gmail_address.py
Normal file
25
api/alembic/versions/009_cloud_scout_gmail_address.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Add gmail_address to cloud_scout_configs.
|
||||
|
||||
Revision ID: 009
|
||||
Revises: 008
|
||||
Create Date: 2026-05-16
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
|
||||
revision: str = "009"
|
||||
down_revision: Union[str, None] = "008"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column("cloud_scout_configs", sa.Column("gmail_address", sa.String(320), nullable=True))
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("cloud_scout_configs", "gmail_address")
|
||||
120
api/app/api/routes/scout_webhooks.py
Normal file
120
api/app/api/routes/scout_webhooks.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Gmail Pub/Sub push receiver.
|
||||
|
||||
Google Pub/Sub push subscriptions deliver Gmail watch notifications as POST
|
||||
requests with a JSON envelope. The body payload contains a base64-encoded
|
||||
JSON blob with ``emailAddress`` + ``historyId``. We resolve the user by
|
||||
email, look up their cloud_scout_configs row for provider='gmail', and
|
||||
hand off to ScoutEngine.trigger_scout.
|
||||
|
||||
Authentication: Pub/Sub push includes an OIDC JWT in the Authorization
|
||||
header. We verify it against Google's public keys with the audience
|
||||
configured in our Pub/Sub subscription.
|
||||
|
||||
Dev mode: when ``GMAIL_PUBSUB_AUDIENCE`` is empty, JWT verification is
|
||||
skipped and a warning is logged. Production must set this env var.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from fastapi import APIRouter, Header, HTTPException, Request, status
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config.settings import settings
|
||||
from app.db import async_session
|
||||
from app.models import CloudScoutConfig, User
|
||||
from app.scouts.engine import ScoutEngine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/scouts/webhooks", tags=["scout-webhooks"])
|
||||
|
||||
|
||||
def _verify_pubsub_jwt(token: str) -> bool:
|
||||
"""Verify the Google Pub/Sub OIDC JWT.
|
||||
|
||||
Returns True when valid, False on any verification failure.
|
||||
|
||||
Dev skip: if ``settings.GMAIL_PUBSUB_AUDIENCE`` is empty, logs a
|
||||
warning and returns True so local development works without a real
|
||||
Pub/Sub subscription. Production must configure the audience.
|
||||
"""
|
||||
if not token:
|
||||
return False
|
||||
|
||||
if not settings.GMAIL_PUBSUB_AUDIENCE:
|
||||
logger.warning(
|
||||
"GMAIL_PUBSUB_AUDIENCE not set — skipping Pub/Sub JWT verification (dev mode only)"
|
||||
)
|
||||
return True
|
||||
|
||||
try:
|
||||
from google.auth.transport import requests as g_requests # noqa: PLC0415
|
||||
from google.oauth2 import id_token # noqa: PLC0415
|
||||
|
||||
id_token.verify_oauth2_token(
|
||||
token,
|
||||
g_requests.Request(),
|
||||
audience=settings.GMAIL_PUBSUB_AUDIENCE,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
logger.warning("pubsub jwt verification failed", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
@router.post("/gmail", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def gmail_pubsub(
|
||||
request: Request,
|
||||
authorization: str = Header(default=""),
|
||||
) -> None:
|
||||
"""Receive a Gmail Pub/Sub push notification.
|
||||
|
||||
Verifies the OIDC JWT, decodes the Pub/Sub envelope, resolves the user
|
||||
by email, and triggers ScoutEngine.trigger_scout for each enabled Gmail
|
||||
scout belonging to that user.
|
||||
|
||||
Returns 204 No Content on success (including benign no-ops like unknown
|
||||
email or empty message data). Returns 401 on JWT verification failure.
|
||||
"""
|
||||
token = authorization.removeprefix("Bearer ").strip()
|
||||
if not _verify_pubsub_jwt(token):
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid Pub/Sub JWT")
|
||||
|
||||
body = await request.json()
|
||||
msg = body.get("message") or {}
|
||||
raw = msg.get("data")
|
||||
if not raw:
|
||||
return # ack without action — empty message data
|
||||
|
||||
try:
|
||||
decoded = json.loads(base64.b64decode(raw).decode())
|
||||
except Exception:
|
||||
logger.warning("pubsub payload decode failed")
|
||||
return
|
||||
|
||||
email = decoded.get("emailAddress")
|
||||
if not email:
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
user_q = await session.execute(select(User).where(User.email == email))
|
||||
user = user_q.scalar_one_or_none()
|
||||
if user is None:
|
||||
logger.info("pubsub: no user for %s — ignoring", email)
|
||||
return
|
||||
scouts_q = await session.execute(
|
||||
select(CloudScoutConfig).where(
|
||||
CloudScoutConfig.user_id == user.id,
|
||||
CloudScoutConfig.provider == "gmail",
|
||||
CloudScoutConfig.enabled == True, # noqa: E712
|
||||
)
|
||||
)
|
||||
scouts = scouts_q.scalars().all()
|
||||
|
||||
engine = ScoutEngine()
|
||||
for scout in scouts:
|
||||
await engine.trigger_scout(uuid.UUID(str(scout.id)))
|
||||
807
api/app/api/routes/scouts.py
Normal file
807
api/app/api/routes/scouts.py
Normal file
@@ -0,0 +1,807 @@
|
||||
"""Scout routes.
|
||||
|
||||
Backend responsibilities are intentionally minimal:
|
||||
GET /scouts/catalog — static catalog for UI display
|
||||
POST /scouts/can-create — billing eligibility check
|
||||
POST /scouts/trigger — trigger a local scout run
|
||||
|
||||
Scout configuration is owned by the Electron app and is not persisted
|
||||
in backend scout-config tables.
|
||||
|
||||
Gmail OAuth setup (scout-specific consent):
|
||||
GET /scouts/oauth/gmail/authorize — returns consent-screen URL
|
||||
GET /scouts/oauth/gmail/web-callback — bounces to deep link (excluded from schema)
|
||||
POST /scouts/oauth/gmail/callback — exchanges code, stores encrypted token
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import secrets
|
||||
import time
|
||||
import urllib.parse
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy import delete as sa_delete, func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.api.deps import get_current_user
|
||||
from app.auth.oauth_providers import generate_pkce_pair
|
||||
from app.billing.tier_manager import FEATURES
|
||||
from app.config.settings import settings
|
||||
from app.core.scout_runner import is_agent_running, run_local_agent
|
||||
from app.core.device_manager import device_manager
|
||||
from app.core.note_summarizer import generate_note_summary
|
||||
from app.db import get_session
|
||||
from app.integrations import decrypt_token, encrypt_token
|
||||
from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig
|
||||
from app.scouts.connectors.registry import get_connector
|
||||
from app.schemas import (
|
||||
CloudScoutCreateRequest,
|
||||
CloudScoutResponse,
|
||||
CloudScoutUpdateRequest,
|
||||
ScoutCatalogItem,
|
||||
ScoutCreationCheckRequest,
|
||||
ScoutCreationCheckResponse,
|
||||
ScoutRunLogResponse,
|
||||
ScoutTriggerRequest,
|
||||
UserProfile,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/scouts", tags=["scouts"])
|
||||
|
||||
|
||||
# ── Datetime helpers ──────────────────────────────────────────────────
|
||||
|
||||
def _dt_ms(dt: datetime) -> int:
|
||||
return int(dt.timestamp() * 1000)
|
||||
|
||||
|
||||
def _dt_ms_opt(dt: datetime | None) -> int | None:
|
||||
return int(dt.timestamp() * 1000) if dt else None
|
||||
|
||||
|
||||
def _to_data_types(values: list[str]) -> list[str]:
|
||||
normalize = {
|
||||
"task": "tasks", "tasks": "tasks",
|
||||
"note": "notes", "notes": "notes",
|
||||
"timeline": "timelines", "timelines": "timelines", "timelineEvents": "timelines",
|
||||
"project": "projects", "projects": "projects",
|
||||
}
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for v in values:
|
||||
mapped = normalize.get(v)
|
||||
if mapped and mapped not in seen:
|
||||
seen.add(mapped)
|
||||
result.append(mapped)
|
||||
return result
|
||||
|
||||
|
||||
def _to_run_log_response(log: ScoutRunLog) -> ScoutRunLogResponse:
|
||||
return ScoutRunLogResponse(
|
||||
id=log.id,
|
||||
agent_id=log.scout_id,
|
||||
agent_type=log.scout_type, # type: ignore[arg-type]
|
||||
status=log.status, # type: ignore[arg-type]
|
||||
items_processed=log.items_processed,
|
||||
items_created=log.items_created,
|
||||
errors=log.errors or [],
|
||||
started_at=_dt_ms(log.started_at),
|
||||
completed_at=_dt_ms_opt(log.completed_at),
|
||||
)
|
||||
|
||||
|
||||
def _enforce_agent_limit(tier: str, current_count: int) -> int:
|
||||
limit: int = FEATURES.get(tier, FEATURES["free"])["batch_active"]
|
||||
if limit != -1 and current_count >= limit:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Agent limit ({limit}) reached for your tier. Upgrade to create more.",
|
||||
)
|
||||
return limit
|
||||
|
||||
|
||||
async def _enforce_run_frequency(
|
||||
tier: str,
|
||||
user_id: str,
|
||||
db: AsyncSession,
|
||||
) -> None:
|
||||
"""Raise HTTP 402 if the user has exceeded their daily batch run limit."""
|
||||
limit: int = FEATURES.get(tier, FEATURES["free"])["batch_runs_per_day"]
|
||||
if limit == -1:
|
||||
return # unlimited
|
||||
|
||||
today_start = datetime.now(timezone.utc).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
result = await db.execute(
|
||||
select(func.count(ScoutRunLog.id)).where(
|
||||
ScoutRunLog.user_id == user_id,
|
||||
ScoutRunLog.started_at >= today_start,
|
||||
)
|
||||
)
|
||||
runs_today: int = result.scalar_one()
|
||||
|
||||
if runs_today >= limit:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_402_PAYMENT_REQUIRED,
|
||||
detail=f"Daily batch run limit ({limit}) reached for your tier. Upgrade for more runs.",
|
||||
)
|
||||
|
||||
|
||||
# ── Catalog ───────────────────────────────────────────────────────────
|
||||
|
||||
@router.get("/catalog", response_model=list[ScoutCatalogItem])
|
||||
async def get_agent_catalog(
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> list[ScoutCatalogItem]:
|
||||
"""Return the static list of available agent types and their descriptions."""
|
||||
return [
|
||||
ScoutCatalogItem(
|
||||
type="local_directory",
|
||||
name="Local Directory Monitor",
|
||||
description="Watches local directories, extracts data from files using AI",
|
||||
),
|
||||
ScoutCatalogItem(
|
||||
type="gmail",
|
||||
name="Gmail Connector",
|
||||
description="Scans Gmail inbox, extracts tasks/notes from emails",
|
||||
),
|
||||
ScoutCatalogItem(
|
||||
type="teams",
|
||||
name="Microsoft Teams Connector",
|
||||
description="Monitors Teams messages, extracts action items",
|
||||
),
|
||||
ScoutCatalogItem(
|
||||
type="outlook",
|
||||
name="Outlook Connector",
|
||||
description="Scans Outlook inbox, extracts tasks/notes",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@router.post("/can-create", response_model=ScoutCreationCheckResponse)
|
||||
async def can_create_agent(
|
||||
body: ScoutCreationCheckRequest,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> ScoutCreationCheckResponse:
|
||||
"""Check if the user can create one more agent based on billing tier.
|
||||
|
||||
Since configuration is client-owned, the Electron app sends its current
|
||||
active agent count and the backend applies tier limits.
|
||||
"""
|
||||
limit: int = FEATURES.get(current_user.tier, FEATURES["free"])["batch_active"]
|
||||
allowed = limit == -1 or body.active_agents < limit
|
||||
return ScoutCreationCheckResponse(
|
||||
allowed=allowed,
|
||||
tier=current_user.tier,
|
||||
active_agents=body.active_agents,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/trigger", response_model=ScoutRunLogResponse, status_code=status.HTTP_202_ACCEPTED)
|
||||
async def trigger_agent_run(
|
||||
body: ScoutTriggerRequest,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
db: AsyncSession = Depends(get_session),
|
||||
) -> ScoutRunLogResponse:
|
||||
"""Trigger a local agent run using client-provided configuration."""
|
||||
_enforce_agent_limit(current_user.tier, body.active_agents)
|
||||
await _enforce_run_frequency(current_user.tier, current_user.id, db)
|
||||
|
||||
last_run_dt = (
|
||||
datetime.fromtimestamp(body.last_run_at / 1000, tz=timezone.utc)
|
||||
if body.last_run_at
|
||||
else None
|
||||
)
|
||||
config = LocalScoutConfig(
|
||||
id=str(uuid.uuid4()),
|
||||
user_id=current_user.id,
|
||||
device_id=body.device_id,
|
||||
name="Local Directory Monitor",
|
||||
directory_paths=[body.directory],
|
||||
data_types=_to_data_types(body.what_to_extract),
|
||||
prompt_template=body.custom_agent_prompt or "",
|
||||
scout_config=body.agent_config,
|
||||
file_extensions=[],
|
||||
schedule_cron=body.batch_interval,
|
||||
enabled=True,
|
||||
last_run_at=last_run_dt,
|
||||
)
|
||||
|
||||
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id.
|
||||
stable_agent_id = body.agent_id or config.id
|
||||
|
||||
if is_agent_running(stable_agent_id):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail="Agent is already running. Only one run per agent is allowed at a time.",
|
||||
)
|
||||
|
||||
run_log = ScoutRunLog(
|
||||
scout_id=stable_agent_id,
|
||||
scout_type="local",
|
||||
user_id=current_user.id,
|
||||
status="running",
|
||||
)
|
||||
db.add(run_log)
|
||||
await db.commit()
|
||||
await db.refresh(run_log)
|
||||
|
||||
run_context = {
|
||||
"type": "agent_batch",
|
||||
"run_id": run_log.id,
|
||||
"agent_id": stable_agent_id,
|
||||
}
|
||||
|
||||
asyncio.create_task(
|
||||
run_local_agent(current_user.id, config, run_log, device_manager, run_context)
|
||||
)
|
||||
|
||||
return _to_run_log_response(run_log)
|
||||
|
||||
|
||||
# ── Note summary endpoint ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class NoteSummarizeRequest(BaseModel):
|
||||
title: str
|
||||
content: str
|
||||
|
||||
|
||||
class NoteSummarizeResponse(BaseModel):
|
||||
summary: str
|
||||
|
||||
|
||||
@router.post("/notes/summarize", response_model=NoteSummarizeResponse)
|
||||
async def summarize_note(
|
||||
body: NoteSummarizeRequest,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> NoteSummarizeResponse:
|
||||
"""Generate an AI summary for a note. Used by the Electron backfill on startup."""
|
||||
summary = await generate_note_summary(body.title, body.content)
|
||||
return NoteSummarizeResponse(summary=summary)
|
||||
|
||||
|
||||
# ── Cloud scout CRUD ──────────────────────────────────────────────────────────
|
||||
|
||||
_DEFAULT_CLOUD_SCHEDULE = "0 */6 * * *"
|
||||
|
||||
|
||||
def _to_cloud_response(scout: CloudScoutConfig) -> dict:
|
||||
return {
|
||||
"id": scout.id,
|
||||
"user_id": scout.user_id,
|
||||
"provider": scout.provider,
|
||||
"name": scout.name,
|
||||
"data_types": scout.data_types or [],
|
||||
"prompt_template": scout.prompt_template or "",
|
||||
"schedule_cron": scout.schedule_cron,
|
||||
"filter_config": scout.filter_config,
|
||||
"auto_trash_spam": scout.auto_trash_spam,
|
||||
"enabled": scout.enabled,
|
||||
"last_run_at": _dt_ms_opt(scout.last_run_at),
|
||||
"gmail_address": scout.gmail_address,
|
||||
"oauth_connected": scout.oauth_token_encrypted is not None,
|
||||
"created_at": _dt_ms(scout.created_at),
|
||||
"updated_at": _dt_ms(scout.updated_at),
|
||||
}
|
||||
|
||||
|
||||
@router.get("/cloud", response_model=list[CloudScoutResponse])
|
||||
async def list_cloud_scouts(
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
rows = (await db.execute(
|
||||
select(CloudScoutConfig).where(CloudScoutConfig.user_id == current_user.id)
|
||||
)).scalars().all()
|
||||
return [_to_cloud_response(s) for s in rows]
|
||||
|
||||
|
||||
@router.post("/cloud", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
|
||||
async def create_cloud_scout(
|
||||
body: CloudScoutCreateRequest,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
scout = CloudScoutConfig(
|
||||
id=str(uuid.uuid4()),
|
||||
user_id=current_user.id,
|
||||
provider=body.provider,
|
||||
name=body.name,
|
||||
data_types=body.data_types,
|
||||
prompt_template=body.prompt_template,
|
||||
filter_config=body.filter_config,
|
||||
schedule_cron=body.schedule_cron or _DEFAULT_CLOUD_SCHEDULE,
|
||||
auto_trash_spam=body.auto_trash_spam,
|
||||
enabled=True,
|
||||
)
|
||||
db.add(scout)
|
||||
await db.commit()
|
||||
await db.refresh(scout)
|
||||
return _to_cloud_response(scout)
|
||||
|
||||
|
||||
@router.put("/cloud/{scout_id}", response_model=CloudScoutResponse)
|
||||
async def update_cloud_scout(
|
||||
scout_id: str,
|
||||
body: CloudScoutUpdateRequest,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
scout = await db.get(CloudScoutConfig, scout_id)
|
||||
if scout is None or scout.user_id != current_user.id:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
|
||||
if body.name is not None:
|
||||
scout.name = body.name
|
||||
if body.data_types is not None:
|
||||
scout.data_types = body.data_types
|
||||
if body.prompt_template is not None:
|
||||
scout.prompt_template = body.prompt_template
|
||||
if body.schedule_cron is not None:
|
||||
scout.schedule_cron = body.schedule_cron
|
||||
if body.filter_config is not None:
|
||||
scout.filter_config = body.filter_config
|
||||
if body.auto_trash_spam is not None:
|
||||
scout.auto_trash_spam = body.auto_trash_spam
|
||||
if body.enabled is not None:
|
||||
scout.enabled = body.enabled
|
||||
await db.commit()
|
||||
await db.refresh(scout)
|
||||
return _to_cloud_response(scout)
|
||||
|
||||
|
||||
@router.delete("/cloud/{scout_id}")
|
||||
async def delete_cloud_scout(
|
||||
scout_id: str,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
scout = await db.get(CloudScoutConfig, scout_id)
|
||||
if scout is None or scout.user_id != current_user.id:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
|
||||
# Core deletes bypass the polymorphic ScoutRunLog relationship whose
|
||||
# varchar scout_id vs uuid id join is not directly comparable in Postgres.
|
||||
# scout_run_logs.scout_id is a plain string (matches the str scout_id);
|
||||
# scout_triage_queue rows cascade automatically via their FK ondelete.
|
||||
await db.execute(sa_delete(ScoutRunLog).where(ScoutRunLog.scout_id == scout_id))
|
||||
await db.execute(sa_delete(CloudScoutConfig).where(CloudScoutConfig.id == scout_id))
|
||||
await db.commit()
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
@router.get("/cloud/{scout_id}/gmail-labels")
|
||||
async def list_gmail_labels(
|
||||
scout_id: str,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
scout = await db.get(CloudScoutConfig, scout_id)
|
||||
if scout is None or scout.user_id != current_user.id:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
|
||||
try:
|
||||
connector = get_connector("gmail")
|
||||
except KeyError:
|
||||
return []
|
||||
return await connector.list_labels(scout)
|
||||
|
||||
|
||||
@router.post("/cloud/{scout_id}/gmail-disconnect", response_model=CloudScoutResponse)
|
||||
async def disconnect_gmail(
|
||||
scout_id: str,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
scout = await db.get(CloudScoutConfig, scout_id)
|
||||
if scout is None or scout.user_id != current_user.id:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
|
||||
try:
|
||||
connector = get_connector("gmail")
|
||||
await connector.stop_watch(scout)
|
||||
except KeyError:
|
||||
pass
|
||||
scout.oauth_token_encrypted = None
|
||||
scout.gmail_history_id = None
|
||||
scout.gmail_watch_expires_at = None
|
||||
scout.gmail_address = None
|
||||
scout.enabled = False
|
||||
await db.commit()
|
||||
await db.refresh(scout)
|
||||
return _to_cloud_response(scout)
|
||||
|
||||
|
||||
# ── Gmail OAuth setup (scout-specific) ───────────────────────────────────────
|
||||
|
||||
# Scopes required for Gmail scout connectivity.
|
||||
_GMAIL_SCOUT_SCOPES = [
|
||||
"openid",
|
||||
"email",
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
]
|
||||
|
||||
# Google OAuth endpoints.
|
||||
_GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
|
||||
_GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
|
||||
|
||||
# In-memory pending OAuth states for scout Gmail consent.
|
||||
#
|
||||
# state → {
|
||||
# "code_verifier": str,
|
||||
# "user_id": str,
|
||||
# "expires_at": float (epoch seconds),
|
||||
# "mode": "reconnect" | "create",
|
||||
# "scout_id": str | None, # set for reconnect mode
|
||||
# "draft": {name, prompt_template, auto_trash_spam} | None, # set for create mode
|
||||
# "token_encrypted": str | None, # populated after a successful create-mode callback
|
||||
# "gmail_address": str | None,
|
||||
# }
|
||||
#
|
||||
# Zero-trust: in create mode the encrypted Gmail token lives ONLY here, in
|
||||
# process memory, for at most _SCOUT_OAUTH_TTL_SECONDS. It is persisted to the
|
||||
# DB only when the user finalizes the scout (POST /scouts/cloud/finalize).
|
||||
# An abandoned/errored flow leaves no scout row and no stored token.
|
||||
#
|
||||
# Production note: this in-memory store is single-process only — replace with
|
||||
# Redis (keyed by state, TTL'd) for multi-worker deployments.
|
||||
_pending_scout_oauth_states: dict[str, dict] = {}
|
||||
_SCOUT_OAUTH_TTL_SECONDS = 900 # 15 minutes
|
||||
|
||||
|
||||
def _purge_expired_oauth_states() -> None:
|
||||
now = time.time()
|
||||
expired = [s for s, e in _pending_scout_oauth_states.items() if e.get("expires_at", 0) < now]
|
||||
for s in expired:
|
||||
del _pending_scout_oauth_states[s]
|
||||
|
||||
|
||||
def _scout_gmail_redirect_uri() -> str:
|
||||
"""Derive the scout Gmail web-callback URI from the configured base OAUTH_REDIRECT_URI.
|
||||
|
||||
``OAUTH_REDIRECT_URI`` is the full path used for login OAuth
|
||||
(e.g. http://localhost:8000/api/v1/auth/oauth/google/web-callback).
|
||||
We strip the path to get the scheme+host base, then append the scout path.
|
||||
"""
|
||||
parsed = urllib.parse.urlparse(settings.OAUTH_REDIRECT_URI)
|
||||
base = f"{parsed.scheme}://{parsed.netloc}"
|
||||
return f"{base}/api/v1/scouts/oauth/gmail/web-callback"
|
||||
|
||||
|
||||
class _ScoutGmailAuthorizeResponse(BaseModel):
|
||||
authorize_url: str
|
||||
|
||||
|
||||
class _ScoutGmailCallbackBody(BaseModel):
|
||||
code: str
|
||||
state: str
|
||||
|
||||
|
||||
class _ScoutGmailAuthorizeDraftBody(BaseModel):
|
||||
name: str
|
||||
prompt_template: str = ""
|
||||
auto_trash_spam: bool = False
|
||||
|
||||
|
||||
class _ScoutGmailFinalizeBody(BaseModel):
|
||||
session: str
|
||||
filter_config: dict | None = None
|
||||
|
||||
|
||||
def _build_gmail_authorize_url(state: str, code_challenge: str) -> str:
|
||||
"""Build the Google consent URL for the scout Gmail flow (shared by both modes)."""
|
||||
redirect_uri = _scout_gmail_redirect_uri()
|
||||
params = {
|
||||
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": "code",
|
||||
"scope": " ".join(_GMAIL_SCOUT_SCOPES),
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": "S256",
|
||||
"access_type": "offline",
|
||||
"prompt": "consent",
|
||||
}
|
||||
return f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
|
||||
|
||||
|
||||
@router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse)
|
||||
async def scout_gmail_oauth_authorize(
|
||||
scout_id: str,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> _ScoutGmailAuthorizeResponse:
|
||||
"""Start the Gmail OAuth flow for a specific cloud scout.
|
||||
|
||||
Returns the Google consent-screen URL. The client opens this URL in the
|
||||
system browser; after consent Google redirects to web-callback which bounces
|
||||
to the ``adiuvai://scout/oauth/gmail/callback`` deep link.
|
||||
"""
|
||||
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
|
||||
raise HTTPException(
|
||||
status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
"Google OAuth is not configured on this server",
|
||||
)
|
||||
|
||||
code_verifier, code_challenge = generate_pkce_pair()
|
||||
state = secrets.token_urlsafe(32)
|
||||
|
||||
_purge_expired_oauth_states()
|
||||
|
||||
_pending_scout_oauth_states[state] = {
|
||||
"code_verifier": code_verifier,
|
||||
"user_id": current_user.id,
|
||||
"expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
|
||||
"mode": "reconnect",
|
||||
"scout_id": scout_id,
|
||||
"draft": None,
|
||||
"token_encrypted": None,
|
||||
"gmail_address": None,
|
||||
}
|
||||
|
||||
return _ScoutGmailAuthorizeResponse(
|
||||
authorize_url=_build_gmail_authorize_url(state, code_challenge)
|
||||
)
|
||||
|
||||
|
||||
@router.post("/oauth/gmail/authorize-draft", response_model=_ScoutGmailAuthorizeResponse)
|
||||
async def scout_gmail_oauth_authorize_draft(
|
||||
body: _ScoutGmailAuthorizeDraftBody,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> _ScoutGmailAuthorizeResponse:
|
||||
"""Start the Gmail OAuth flow in *creation* mode — no scout row exists yet.
|
||||
|
||||
The draft scout fields are held in the pending OAuth session; the scout is
|
||||
only created once the user finalizes (POST /scouts/cloud/finalize).
|
||||
"""
|
||||
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
|
||||
raise HTTPException(
|
||||
status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
"Google OAuth is not configured on this server",
|
||||
)
|
||||
|
||||
code_verifier, code_challenge = generate_pkce_pair()
|
||||
state = secrets.token_urlsafe(32)
|
||||
|
||||
_purge_expired_oauth_states()
|
||||
|
||||
_pending_scout_oauth_states[state] = {
|
||||
"code_verifier": code_verifier,
|
||||
"user_id": current_user.id,
|
||||
"expires_at": time.time() + _SCOUT_OAUTH_TTL_SECONDS,
|
||||
"mode": "create",
|
||||
"scout_id": None,
|
||||
"draft": {
|
||||
"name": body.name,
|
||||
"prompt_template": body.prompt_template,
|
||||
"auto_trash_spam": body.auto_trash_spam,
|
||||
},
|
||||
"token_encrypted": None,
|
||||
"gmail_address": None,
|
||||
}
|
||||
|
||||
return _ScoutGmailAuthorizeResponse(
|
||||
authorize_url=_build_gmail_authorize_url(state, code_challenge)
|
||||
)
|
||||
|
||||
|
||||
@router.get("/oauth/gmail/web-callback", include_in_schema=False)
|
||||
async def scout_gmail_oauth_web_callback(code: str, state: str) -> RedirectResponse:
|
||||
"""Google redirects here after Gmail consent.
|
||||
|
||||
Immediately bounces to the Electron deep link so the desktop app
|
||||
receives the authorization code.
|
||||
"""
|
||||
params = urllib.parse.urlencode({"code": code, "state": state})
|
||||
deep_link = f"adiuvai://scout/oauth/gmail/callback?{params}"
|
||||
return RedirectResponse(url=deep_link, status_code=302)
|
||||
|
||||
|
||||
@router.post("/oauth/gmail/callback")
|
||||
async def scout_gmail_oauth_callback(
|
||||
body: _ScoutGmailCallbackBody,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> dict:
|
||||
"""Exchange the Gmail authorization code and store the encrypted token on the scout.
|
||||
|
||||
Called by the Electron app after it receives the deep-link callback with
|
||||
the ``code`` and ``state`` params.
|
||||
"""
|
||||
entry = _pending_scout_oauth_states.pop(body.state, None)
|
||||
if (
|
||||
entry is None
|
||||
or entry["expires_at"] < time.time()
|
||||
or entry["user_id"] != current_user.id
|
||||
):
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
|
||||
|
||||
code_verifier = entry["code_verifier"]
|
||||
mode = entry["mode"]
|
||||
scout_id = entry.get("scout_id")
|
||||
|
||||
redirect_uri = _scout_gmail_redirect_uri()
|
||||
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
_GOOGLE_TOKEN_URL,
|
||||
data={
|
||||
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
|
||||
"client_secret": settings.GOOGLE_AUTH_CLIENT_SECRET,
|
||||
"code": body.code,
|
||||
"code_verifier": code_verifier,
|
||||
"grant_type": "authorization_code",
|
||||
"redirect_uri": redirect_uri,
|
||||
},
|
||||
)
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
logger.error("Gmail token exchange failed: %s", exc.response.text)
|
||||
raise HTTPException(status.HTTP_502_BAD_GATEWAY, "Failed to exchange Gmail authorization code")
|
||||
|
||||
token_data = response.json()
|
||||
|
||||
creds_dict: dict = {
|
||||
"token": token_data["access_token"],
|
||||
"refresh_token": token_data.get("refresh_token"),
|
||||
"token_uri": _GOOGLE_TOKEN_URL,
|
||||
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
|
||||
"client_secret": settings.GOOGLE_AUTH_CLIENT_SECRET,
|
||||
"scopes": [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
],
|
||||
}
|
||||
encrypted = encrypt_token(creds_dict)
|
||||
|
||||
# Fetch the connected Gmail address for display.
|
||||
gmail_address: str | None = None
|
||||
try:
|
||||
from googleapiclient.discovery import build
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
def _fetch_email() -> str | None:
|
||||
creds = Credentials(
|
||||
token=creds_dict["token"],
|
||||
refresh_token=creds_dict.get("refresh_token"),
|
||||
token_uri=creds_dict["token_uri"],
|
||||
client_id=creds_dict["client_id"],
|
||||
client_secret=creds_dict["client_secret"],
|
||||
scopes=creds_dict["scopes"],
|
||||
)
|
||||
service = build("gmail", "v1", credentials=creds, cache_discovery=False)
|
||||
profile = service.users().getProfile(userId="me").execute()
|
||||
return profile.get("emailAddress")
|
||||
|
||||
gmail_address = await asyncio.to_thread(_fetch_email)
|
||||
except Exception:
|
||||
logger.exception("failed to fetch gmail address (mode=%s)", mode)
|
||||
|
||||
if mode == "create":
|
||||
# Do NOT create a scout yet. Hold the encrypted token + address in the
|
||||
# transient in-memory session; the scout is created at finalize.
|
||||
entry["token_encrypted"] = encrypted
|
||||
entry["gmail_address"] = gmail_address
|
||||
entry["expires_at"] = time.time() + _SCOUT_OAUTH_TTL_SECONDS
|
||||
_pending_scout_oauth_states[body.state] = entry
|
||||
return {"ok": True, "session_id": body.state, "gmail_address": gmail_address}
|
||||
|
||||
# mode == "reconnect": update the existing scout in place.
|
||||
scout = await db.get(CloudScoutConfig, scout_id)
|
||||
if scout is None or scout.user_id != current_user.id:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
|
||||
scout.oauth_token_encrypted = encrypted
|
||||
scout.gmail_address = gmail_address
|
||||
|
||||
await db.commit()
|
||||
|
||||
# Attempt to set up Gmail push watch so we start receiving Pub/Sub notifications.
|
||||
try:
|
||||
connector = get_connector("gmail")
|
||||
await connector.setup_watch(scout)
|
||||
await db.commit()
|
||||
except KeyError:
|
||||
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout_id)
|
||||
except Exception:
|
||||
logger.exception("setup_watch failed for scout %s", scout_id)
|
||||
|
||||
return {"ok": True, "session_id": None, "gmail_address": gmail_address}
|
||||
|
||||
|
||||
@router.get("/oauth/gmail/session-labels")
|
||||
async def scout_gmail_session_labels(
|
||||
session: str,
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
) -> list[dict]:
|
||||
"""List Gmail labels for a pending create-mode OAuth session (no scout row yet).
|
||||
|
||||
Builds a Gmail service from the session's transient decrypted token.
|
||||
Returns [] on any error.
|
||||
"""
|
||||
entry = _pending_scout_oauth_states.get(session)
|
||||
if (
|
||||
entry is None
|
||||
or entry["expires_at"] < time.time()
|
||||
or entry["user_id"] != current_user.id
|
||||
or entry.get("token_encrypted") is None
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Session not found or expired")
|
||||
|
||||
try:
|
||||
from app.scouts.connectors.gmail import _gmail_service_from_token
|
||||
|
||||
creds = decrypt_token(entry["token_encrypted"])
|
||||
|
||||
def _sync() -> list[dict]:
|
||||
service = _gmail_service_from_token(creds)
|
||||
resp = service.users().labels().list(userId="me").execute()
|
||||
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
|
||||
|
||||
return await asyncio.to_thread(_sync)
|
||||
except Exception:
|
||||
logger.exception("session-labels failed for session %s", session)
|
||||
return []
|
||||
|
||||
|
||||
@router.post("/cloud/finalize", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
|
||||
async def finalize_cloud_scout(
|
||||
body: _ScoutGmailFinalizeBody,
|
||||
db: AsyncSession = Depends(get_session),
|
||||
current_user: UserProfile = Depends(get_current_user),
|
||||
):
|
||||
"""Create the cloud scout from a completed create-mode OAuth session.
|
||||
|
||||
This is the only path that persists the Gmail token for a newly-created
|
||||
scout. Abandoned flows never reach here, so they leave no orphan rows.
|
||||
"""
|
||||
entry = _pending_scout_oauth_states.pop(body.session, None)
|
||||
if (
|
||||
entry is None
|
||||
or entry["expires_at"] < time.time()
|
||||
or entry["user_id"] != current_user.id
|
||||
or entry.get("mode") != "create"
|
||||
or entry.get("token_encrypted") is None
|
||||
):
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth session")
|
||||
|
||||
draft = entry["draft"] or {}
|
||||
scout = CloudScoutConfig(
|
||||
id=str(uuid.uuid4()),
|
||||
user_id=current_user.id,
|
||||
provider="gmail",
|
||||
name=draft.get("name", ""),
|
||||
data_types=[],
|
||||
prompt_template=draft.get("prompt_template", ""),
|
||||
filter_config=body.filter_config,
|
||||
schedule_cron=_DEFAULT_CLOUD_SCHEDULE,
|
||||
auto_trash_spam=draft.get("auto_trash_spam", False),
|
||||
enabled=True,
|
||||
oauth_token_encrypted=entry["token_encrypted"],
|
||||
gmail_address=entry.get("gmail_address"),
|
||||
)
|
||||
db.add(scout)
|
||||
await db.commit()
|
||||
await db.refresh(scout)
|
||||
|
||||
# Best-effort Gmail push watch — failure must not block scout creation.
|
||||
try:
|
||||
connector = get_connector("gmail")
|
||||
await connector.setup_watch(scout)
|
||||
await db.commit()
|
||||
except KeyError:
|
||||
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout.id)
|
||||
except Exception:
|
||||
logger.exception("setup_watch failed for scout %s", scout.id)
|
||||
|
||||
return _to_cloud_response(scout)
|
||||
@@ -58,6 +58,16 @@ class Settings(BaseSettings):
|
||||
# Prod: https://api.adiuvai.com/api/v1/auth/oauth/google/web-callback
|
||||
OAUTH_REDIRECT_URI: str = "http://localhost:8000/api/v1/auth/oauth/google/web-callback"
|
||||
|
||||
# Gmail Pub/Sub topic for push notifications.
|
||||
# Full resource name, e.g. "projects/my-project/topics/gmail-push".
|
||||
# Leave empty in dev — setup_watch will skip registration gracefully.
|
||||
GMAIL_PUBSUB_TOPIC: str = ""
|
||||
# OIDC token audience for Pub/Sub push subscription JWT verification.
|
||||
# Set to the service account email or audience string configured in the
|
||||
# Pub/Sub push subscription. Leave empty in dev to skip verification
|
||||
# (a warning is logged — never silent in production).
|
||||
GMAIL_PUBSUB_AUDIENCE: str = ""
|
||||
|
||||
# Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth
|
||||
# tokens stored in cloud_agent_configs.oauth_token_encrypted.
|
||||
# Generate with: from cryptography.fernet import Fernet; Fernet.generate_key()
|
||||
@@ -77,8 +77,98 @@ async def _memory_cron_tick() -> None:
|
||||
_log.warning("memory cron tick: failed: %s", exc)
|
||||
|
||||
|
||||
async def _scout_cron_tick() -> None:
|
||||
"""Every-15-min cron: poll enabled cloud scouts (cron-fallback; push is primary).
|
||||
|
||||
Skips any scout whose ``last_run_at`` is within the last 5 minutes so
|
||||
a push notification and the fallback cron don't double-fire within the
|
||||
same window.
|
||||
"""
|
||||
import logging # noqa: PLC0415
|
||||
import uuid # noqa: PLC0415
|
||||
from datetime import datetime, timezone # noqa: PLC0415
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
_log.info("scout cron tick: starting")
|
||||
try:
|
||||
from app.db import async_session # noqa: PLC0415
|
||||
from app.models import CloudScoutConfig # noqa: PLC0415
|
||||
from app.scouts.engine import ScoutEngine # noqa: PLC0415
|
||||
from sqlalchemy import select # noqa: PLC0415
|
||||
|
||||
async with async_session() as session:
|
||||
scouts = (await session.execute(
|
||||
select(CloudScoutConfig).where(CloudScoutConfig.enabled == True) # noqa: E712
|
||||
)).scalars().all()
|
||||
|
||||
engine = ScoutEngine()
|
||||
triggered = 0
|
||||
for scout in scouts:
|
||||
# Rate-limit guard: push is primary; skip if ran within 5 minutes.
|
||||
if scout.last_run_at:
|
||||
elapsed = (datetime.now(tz=timezone.utc) - scout.last_run_at).total_seconds()
|
||||
if elapsed < 300:
|
||||
continue
|
||||
try:
|
||||
await engine.trigger_scout(uuid.UUID(str(scout.id)))
|
||||
triggered += 1
|
||||
except Exception as exc:
|
||||
_log.warning("scout cron tick: trigger failed scout=%s: %s", scout.id, exc)
|
||||
|
||||
_log.info("scout cron tick: done triggered=%d total=%d", triggered, len(scouts))
|
||||
except Exception as exc:
|
||||
_log.warning("scout cron tick: failed: %s", exc)
|
||||
|
||||
|
||||
async def _scout_watch_renewal_tick() -> None:
|
||||
"""Every-24-hour cron: re-issue Gmail users.watch for scouts expiring within 24h.
|
||||
|
||||
Handles missing or misconfigured connectors gracefully — logs and continues.
|
||||
"""
|
||||
import logging # noqa: PLC0415
|
||||
from datetime import datetime, timedelta, timezone # noqa: PLC0415
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
_log.info("scout watch renewal tick: starting")
|
||||
try:
|
||||
from app.db import async_session # noqa: PLC0415
|
||||
from app.models import CloudScoutConfig # noqa: PLC0415
|
||||
from app.scouts.connectors.registry import get_connector # noqa: PLC0415
|
||||
from sqlalchemy import select # noqa: PLC0415
|
||||
|
||||
threshold = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
||||
renewed = 0
|
||||
async with async_session() as session:
|
||||
scouts = (await session.execute(
|
||||
select(CloudScoutConfig).where(
|
||||
CloudScoutConfig.enabled == True, # noqa: E712
|
||||
CloudScoutConfig.provider == "gmail",
|
||||
CloudScoutConfig.gmail_watch_expires_at <= threshold,
|
||||
)
|
||||
)).scalars().all()
|
||||
|
||||
for scout in scouts:
|
||||
try:
|
||||
connector = get_connector("gmail")
|
||||
await connector.renew_watch(scout)
|
||||
renewed += 1
|
||||
except Exception:
|
||||
_log.exception("scout watch renewal tick: renew failed scout=%s", scout.id)
|
||||
|
||||
await session.commit()
|
||||
|
||||
_log.info("scout watch renewal tick: done renewed=%d", renewed)
|
||||
except Exception as exc:
|
||||
_log.warning("scout watch renewal tick: failed: %s", exc)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# Startup: register source connectors.
|
||||
from app.scouts.connectors.gmail import GmailConnector # noqa: PLC0415
|
||||
from app.scouts.connectors.registry import register_connector # noqa: PLC0415
|
||||
register_connector(GmailConnector())
|
||||
|
||||
# Startup: ensure agent tool modules are loaded.
|
||||
import app.agents # noqa: F401
|
||||
|
||||
@@ -89,6 +179,14 @@ async def lifespan(app: FastAPI):
|
||||
scheduler = AsyncIOScheduler()
|
||||
scheduler.add_job(_memory_cron_tick, "interval", hours=1, id="memory_cron")
|
||||
scheduler.add_job(_memory_audit_cron_tick, "interval", weeks=1, id="memory_audit_cron")
|
||||
scheduler.add_job(
|
||||
_scout_cron_tick, "interval", minutes=15,
|
||||
id="scout_cron_tick", replace_existing=True,
|
||||
)
|
||||
scheduler.add_job(
|
||||
_scout_watch_renewal_tick, "interval", hours=24,
|
||||
id="scout_watch_renewal_tick", replace_existing=True,
|
||||
)
|
||||
scheduler.start()
|
||||
logging.getLogger(__name__).info("memory cron scheduler started (interval=1h)")
|
||||
|
||||
@@ -124,12 +222,13 @@ def create_app() -> FastAPI:
|
||||
app.add_middleware(SanitizerMiddleware)
|
||||
app.add_middleware(TierRateLimitMiddleware)
|
||||
|
||||
from app.api.routes import scouts, auth, billing, chat, device_ws, memory
|
||||
from app.api.routes import scouts, auth, billing, chat, device_ws, memory, scout_webhooks
|
||||
|
||||
app.include_router(auth.router, prefix="/api/v1")
|
||||
app.include_router(chat.router, prefix="/api/v1")
|
||||
app.include_router(billing.router, prefix="/api/v1")
|
||||
app.include_router(scouts.router, prefix="/api/v1")
|
||||
app.include_router(scout_webhooks.router, prefix="/api/v1")
|
||||
app.include_router(device_ws.router, prefix="/api/v1")
|
||||
app.include_router(memory.router, prefix="/api/v1")
|
||||
|
||||
@@ -223,6 +223,7 @@ class CloudScoutConfig(Base):
|
||||
gmail_history_id: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
gmail_watch_expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
device_inactivity_pause_days: Mapped[int] = mapped_column(Integer, nullable=False, default=14, server_default="14")
|
||||
gmail_address: Mapped[str | None] = mapped_column(String(320), nullable=True)
|
||||
|
||||
run_logs: Mapped[list["ScoutRunLog"]] = relationship(
|
||||
back_populates="cloud_scout",
|
||||
@@ -276,6 +276,46 @@ class ScoutRunLogResponse(BaseModel):
|
||||
completed_at: int | None
|
||||
|
||||
|
||||
# ── Cloud Scout CRUD ──────────────────────────────────────────────────
|
||||
|
||||
class CloudScoutCreateRequest(BaseModel):
|
||||
name: str
|
||||
provider: Literal["gmail", "teams", "outlook"]
|
||||
data_types: list[str] = Field(default_factory=list)
|
||||
prompt_template: str = ""
|
||||
schedule_cron: str | None = None # None → server default
|
||||
filter_config: dict | None = None
|
||||
auto_trash_spam: bool = False
|
||||
|
||||
|
||||
class CloudScoutUpdateRequest(BaseModel):
|
||||
name: str | None = None
|
||||
data_types: list[str] | None = None
|
||||
prompt_template: str | None = None
|
||||
schedule_cron: str | None = None
|
||||
filter_config: dict | None = None
|
||||
auto_trash_spam: bool | None = None
|
||||
enabled: bool | None = None
|
||||
|
||||
|
||||
class CloudScoutResponse(BaseModel):
|
||||
id: str
|
||||
user_id: str
|
||||
provider: str
|
||||
name: str
|
||||
data_types: list[str]
|
||||
prompt_template: str
|
||||
schedule_cron: str
|
||||
filter_config: dict | None
|
||||
auto_trash_spam: bool
|
||||
enabled: bool
|
||||
last_run_at: int | None
|
||||
gmail_address: str | None
|
||||
oauth_connected: bool
|
||||
created_at: int
|
||||
updated_at: int
|
||||
|
||||
|
||||
# ── Chatbot Journey ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
248
api/app/scouts/connectors/gmail.py
Normal file
248
api/app/scouts/connectors/gmail.py
Normal file
@@ -0,0 +1,248 @@
|
||||
"""Gmail SourceConnector — wraps the existing GmailClient.
|
||||
|
||||
Responsibilities:
|
||||
* list_new: incremental fetch since the scout's stored gmail_history_id
|
||||
* fetch_metadata: subject + sender + snippet only (Gmail metadata format)
|
||||
* fetch_content: full body text — transient, never persisted by engine
|
||||
* archive: move a message to Gmail Trash (recoverable for 30 days)
|
||||
* setup_watch / renew_watch: Gmail push notifications via Pub/Sub
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from app.config.settings import settings
|
||||
from app.integrations import decrypt_token
|
||||
from app.scouts.connectors.base import ItemContent, ItemMetadata, ItemRef
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _extract_plain_text_body(payload: dict) -> str:
|
||||
"""Recursively walk a Gmail message payload to find text/plain content."""
|
||||
import base64
|
||||
mime_type = payload.get("mimeType", "")
|
||||
if mime_type == "text/plain":
|
||||
data = payload.get("body", {}).get("data", "")
|
||||
if data:
|
||||
return base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
|
||||
return ""
|
||||
if mime_type.startswith("multipart/"):
|
||||
for part in payload.get("parts", []):
|
||||
text = _extract_plain_text_body(part)
|
||||
if text:
|
||||
return text
|
||||
# text/html fallback: strip tags rudimentarily if no text/plain part
|
||||
if mime_type == "text/html":
|
||||
data = payload.get("body", {}).get("data", "")
|
||||
if data:
|
||||
import re
|
||||
html = base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace")
|
||||
return re.sub(r"<[^>]+>", " ", html)
|
||||
return ""
|
||||
|
||||
|
||||
def _gmail_service_from_token(creds_info: dict):
|
||||
"""Build a synchronous Gmail API client from a decrypted credentials dict.
|
||||
|
||||
Shared by ``_get_gmail_service`` (scout-backed) and the pending-session
|
||||
OAuth flow which has a raw token but no scout row yet.
|
||||
"""
|
||||
from googleapiclient.discovery import build
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
credentials = Credentials(
|
||||
token=creds_info.get("token"),
|
||||
refresh_token=creds_info.get("refresh_token"),
|
||||
token_uri=creds_info.get("token_uri", "https://oauth2.googleapis.com/token"),
|
||||
client_id=creds_info.get("client_id"),
|
||||
client_secret=creds_info.get("client_secret"),
|
||||
scopes=creds_info.get("scopes"),
|
||||
)
|
||||
return build("gmail", "v1", credentials=credentials, cache_discovery=False)
|
||||
|
||||
|
||||
def _get_gmail_service(scout):
|
||||
"""Return a synchronous Google API client for low-level metadata/history calls."""
|
||||
creds_info = decrypt_token(scout.oauth_token_encrypted)
|
||||
return _gmail_service_from_token(creds_info)
|
||||
|
||||
|
||||
class GmailConnector:
|
||||
source_type = "gmail"
|
||||
|
||||
# ── list_new ──────────────────────────────────────────────────────────
|
||||
|
||||
async def list_new(self, scout) -> list[ItemRef]:
|
||||
"""Return new message refs since scout.gmail_history_id.
|
||||
|
||||
On first run (gmail_history_id is None/empty), records the current
|
||||
historyId without backfilling — avoids flooding the user with old mail.
|
||||
Updates scout.gmail_history_id in-place (caller must persist to DB).
|
||||
"""
|
||||
def _sync() -> tuple[list[ItemRef], str | None]:
|
||||
service = _get_gmail_service(scout)
|
||||
history_id = scout.gmail_history_id
|
||||
refs: list[ItemRef] = []
|
||||
new_history_id = history_id
|
||||
|
||||
if history_id:
|
||||
resp = (
|
||||
service.users()
|
||||
.history()
|
||||
.list(
|
||||
userId="me",
|
||||
startHistoryId=history_id,
|
||||
historyTypes=["messageAdded"],
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
for entry in resp.get("history", []):
|
||||
for added in entry.get("messagesAdded", []):
|
||||
refs.append(ItemRef(source_msg_ref=added["message"]["id"]))
|
||||
new_history_id = resp.get("historyId", history_id)
|
||||
else:
|
||||
# First run: capture baseline history id without backfilling.
|
||||
profile = service.users().getProfile(userId="me").execute()
|
||||
new_history_id = profile["historyId"]
|
||||
|
||||
return refs, new_history_id
|
||||
|
||||
refs, new_history_id = await asyncio.to_thread(_sync)
|
||||
if new_history_id and new_history_id != scout.gmail_history_id:
|
||||
scout.gmail_history_id = new_history_id
|
||||
return refs
|
||||
|
||||
# ── fetch_metadata ────────────────────────────────────────────────────
|
||||
|
||||
async def fetch_metadata(self, scout, ref: ItemRef) -> ItemMetadata:
|
||||
"""Fetch subject, sender, snippet only — uses Gmail metadata format (no body)."""
|
||||
|
||||
def _sync() -> ItemMetadata:
|
||||
service = _get_gmail_service(scout)
|
||||
msg = (
|
||||
service.users()
|
||||
.messages()
|
||||
.get(
|
||||
userId="me",
|
||||
id=ref.source_msg_ref,
|
||||
format="metadata",
|
||||
metadataHeaders=["Subject", "From", "Date"],
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
headers = {
|
||||
h["name"]: h["value"]
|
||||
for h in msg.get("payload", {}).get("headers", [])
|
||||
}
|
||||
return ItemMetadata(
|
||||
subject=headers.get("Subject"),
|
||||
sender=headers.get("From"),
|
||||
snippet=msg.get("snippet"),
|
||||
received_at=None,
|
||||
)
|
||||
|
||||
return await asyncio.to_thread(_sync)
|
||||
|
||||
# ── fetch_content ─────────────────────────────────────────────────────
|
||||
|
||||
async def fetch_content(self, scout, ref: ItemRef) -> ItemContent:
|
||||
"""Fetch full body text for a single message — transient, must not be persisted."""
|
||||
|
||||
def _sync() -> ItemContent:
|
||||
service = _get_gmail_service(scout)
|
||||
msg = service.users().messages().get(
|
||||
userId="me", id=ref.source_msg_ref, format="full",
|
||||
).execute()
|
||||
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
||||
body_text = _extract_plain_text_body(msg.get("payload", {}))
|
||||
return ItemContent(
|
||||
metadata=ItemMetadata(
|
||||
subject=headers.get("Subject"),
|
||||
sender=headers.get("From"),
|
||||
snippet=msg.get("snippet"),
|
||||
received_at=None,
|
||||
),
|
||||
body_text=body_text,
|
||||
raw_headers=headers,
|
||||
)
|
||||
|
||||
return await asyncio.to_thread(_sync)
|
||||
|
||||
# ── archive ───────────────────────────────────────────────────────────
|
||||
|
||||
async def archive(self, scout, ref: ItemRef) -> None:
|
||||
"""Move the message to Gmail Trash (recoverable for 30 days)."""
|
||||
|
||||
def _sync() -> None:
|
||||
service = _get_gmail_service(scout)
|
||||
service.users().messages().trash(
|
||||
userId="me", id=ref.source_msg_ref
|
||||
).execute()
|
||||
|
||||
await asyncio.to_thread(_sync)
|
||||
|
||||
# ── watch management ──────────────────────────────────────────────────
|
||||
|
||||
async def setup_watch(self, scout) -> None:
|
||||
"""Register a Gmail Pub/Sub push watch for the INBOX label.
|
||||
|
||||
Requires ``settings.GMAIL_PUBSUB_TOPIC`` to be set to the full topic
|
||||
resource name (e.g. ``projects/my-project/topics/gmail-push``).
|
||||
Logs a warning and returns without error if the topic is not configured.
|
||||
"""
|
||||
topic = settings.GMAIL_PUBSUB_TOPIC
|
||||
if not topic:
|
||||
logger.warning(
|
||||
"setup_watch: GMAIL_PUBSUB_TOPIC is not configured — skipping watch setup"
|
||||
)
|
||||
return
|
||||
|
||||
def _sync() -> None:
|
||||
service = _get_gmail_service(scout)
|
||||
request_body = {
|
||||
"labelIds": ["INBOX"],
|
||||
"topicName": topic,
|
||||
}
|
||||
resp = service.users().watch(userId="me", body=request_body).execute()
|
||||
scout.gmail_history_id = resp.get("historyId")
|
||||
expiration_ms = resp.get("expiration")
|
||||
if expiration_ms:
|
||||
scout.gmail_watch_expires_at = datetime.fromtimestamp(
|
||||
int(expiration_ms) / 1000, tz=timezone.utc
|
||||
)
|
||||
|
||||
await asyncio.to_thread(_sync)
|
||||
|
||||
async def renew_watch(self, scout) -> None:
|
||||
"""Renew an existing Gmail Pub/Sub watch (same as setup_watch)."""
|
||||
await self.setup_watch(scout)
|
||||
|
||||
async def list_labels(self, scout) -> list[dict]:
|
||||
"""Return the account's Gmail labels as [{id, name}]. Empty if no token."""
|
||||
if not scout.oauth_token_encrypted:
|
||||
return []
|
||||
|
||||
def _sync() -> list[dict]:
|
||||
service = _get_gmail_service(scout)
|
||||
resp = service.users().labels().list(userId="me").execute()
|
||||
return [{"id": lbl["id"], "name": lbl["name"]} for lbl in resp.get("labels", [])]
|
||||
|
||||
return await asyncio.to_thread(_sync)
|
||||
|
||||
async def stop_watch(self, scout) -> None:
|
||||
"""Stop Gmail push notifications. Swallows errors (watch may be gone)."""
|
||||
if not scout.oauth_token_encrypted:
|
||||
return
|
||||
|
||||
def _sync() -> None:
|
||||
service = _get_gmail_service(scout)
|
||||
service.users().stop(userId="me").execute()
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(_sync)
|
||||
except Exception:
|
||||
logger.exception("stop_watch failed for scout %s", scout.id)
|
||||
@@ -28,6 +28,8 @@ from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
||||
from app.core.llm import get_llm
|
||||
from app.db import async_session
|
||||
from app.models import CloudScoutConfig, ScoutTriageQueue
|
||||
from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict
|
||||
@@ -142,6 +144,7 @@ class ScoutEngine:
|
||||
ScoutTriageQueue.status == "queued",
|
||||
)
|
||||
)).scalars().all()
|
||||
logger.info("deliver_pending: user=%s found %d queued rows", user_id, len(rows))
|
||||
|
||||
for row in rows:
|
||||
try:
|
||||
@@ -171,7 +174,9 @@ class ScoutEngine:
|
||||
"payload": None,
|
||||
},
|
||||
}
|
||||
logger.info("deliver_pending: sending proposal id=%s subject=%r", row.id, meta.subject)
|
||||
await ws.send_json(payload)
|
||||
logger.info("deliver_pending: send_json returned for proposal id=%s", row.id)
|
||||
row.status = "delivered"
|
||||
row.delivered_at = datetime.now(tz=timezone.utc)
|
||||
|
||||
@@ -188,5 +193,81 @@ class ScoutEngine:
|
||||
await session.commit()
|
||||
|
||||
async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict:
|
||||
"""Stub — real implementation in Task 24."""
|
||||
raise NotImplementedError("Real triage LLM call lands in Task 24")
|
||||
"""Call the scout-triage-system Langfuse prompt to classify an item as relevant or spam.
|
||||
|
||||
Uses gpt-4o-mini with JSON mode. Wraps the LLM call in a Langfuse generation
|
||||
observation when Langfuse is configured.
|
||||
"""
|
||||
import json # noqa: PLC0415
|
||||
|
||||
from langchain_core.messages import HumanMessage, SystemMessage # noqa: PLC0415
|
||||
|
||||
_TRIAGE_FALLBACK = (
|
||||
"You are a triage classifier for an executive-assistant scout that watches a "
|
||||
"{source_type} feed.\n"
|
||||
'The scout\'s purpose is: "{scout_purpose}".\n\n'
|
||||
"Given one item, decide whether it is RELEVANT (worth surfacing to the user as a "
|
||||
"potential task / event / note / project) or SPAM (advertising, mass marketing, "
|
||||
"phishing, bulk notifications with no actionable content).\n\n"
|
||||
"Item:\n"
|
||||
" - Subject: {item_subject}\n"
|
||||
" - From: {item_sender}\n"
|
||||
" - Body (truncated): {item_body_truncated_2k}\n\n"
|
||||
'Return JSON only, matching this schema:\n'
|
||||
' {{"verdict": "relevant" | "spam", "reason": <short string>, "confidence": <0..1>}}\n\n'
|
||||
"Be conservative on \"spam\" — if a message could plausibly be a personal/work "
|
||||
"email, mark it relevant."
|
||||
)
|
||||
|
||||
template, prompt_obj = get_prompt_or_fallback("scout-triage-system", _TRIAGE_FALLBACK)
|
||||
|
||||
body_trunc = (content.body_text or "")[:2000]
|
||||
variables = dict(
|
||||
source_type=scout.provider,
|
||||
scout_purpose=scout.prompt_template or "",
|
||||
item_subject=content.metadata.subject or "",
|
||||
item_sender=content.metadata.sender or "",
|
||||
item_body_truncated_2k=body_trunc,
|
||||
)
|
||||
|
||||
if prompt_obj is not None:
|
||||
try:
|
||||
system_text = prompt_obj.compile(**variables)
|
||||
if isinstance(system_text, list):
|
||||
system_text = "\n".join(
|
||||
m.get("content", "") for m in system_text if isinstance(m, dict)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("scout triage: compile failed: %s", exc)
|
||||
system_text = template.replace("{{source_type}}", variables["source_type"]) \
|
||||
.replace("{{scout_purpose}}", variables["scout_purpose"]) \
|
||||
.replace("{{item_subject}}", variables["item_subject"]) \
|
||||
.replace("{{item_sender}}", variables["item_sender"]) \
|
||||
.replace("{{item_body_truncated_2k}}", variables["item_body_truncated_2k"])
|
||||
else:
|
||||
system_text = template.format(**variables)
|
||||
|
||||
llm = get_llm(model="gpt-4o-mini", temperature=0)
|
||||
llm_json = llm.bind(response_format={"type": "json_object"}) # type: ignore[attr-defined]
|
||||
|
||||
messages = [
|
||||
SystemMessage(content=system_text),
|
||||
HumanMessage(content="Classify this item."),
|
||||
]
|
||||
|
||||
lf = get_langfuse()
|
||||
if lf:
|
||||
with lf.start_as_current_observation(
|
||||
as_type="generation",
|
||||
name="scout-triage",
|
||||
model="gpt-4o-mini",
|
||||
prompt=prompt_obj,
|
||||
input=messages,
|
||||
) as gen:
|
||||
response = await llm_json.ainvoke(messages)
|
||||
gen.update(output=response.content, usage=extract_usage(response))
|
||||
else:
|
||||
response = await llm_json.ainvoke(messages)
|
||||
|
||||
data = json.loads(response.content)
|
||||
return TriageVerdict(**data)
|
||||
56
api/scripts/inspect_gmail_scout_token.py
Normal file
56
api/scripts/inspect_gmail_scout_token.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Decrypt and inspect the Gmail scout's stored OAuth token.
|
||||
|
||||
Shows what scopes were granted at consent time. If gmail.readonly / gmail.modify
|
||||
are missing, the consent screen didn't actually grant them.
|
||||
|
||||
Usage:
|
||||
python scripts/inspect_gmail_scout_token.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
_API_ROOT = Path(__file__).resolve().parent.parent
|
||||
if str(_API_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_API_ROOT))
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.db import async_session
|
||||
from app.integrations import decrypt_token
|
||||
from app.models import CloudScoutConfig
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
async with async_session() as session:
|
||||
scouts = (
|
||||
await session.execute(
|
||||
select(CloudScoutConfig).where(CloudScoutConfig.provider == "gmail")
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if not scouts:
|
||||
print("No Gmail scouts found.")
|
||||
return
|
||||
|
||||
for scout in scouts:
|
||||
print(f"\nScout: {scout.name} (id={scout.id})")
|
||||
if not scout.oauth_token_encrypted:
|
||||
print(" (no token stored)")
|
||||
continue
|
||||
try:
|
||||
creds = decrypt_token(scout.oauth_token_encrypted)
|
||||
except Exception as exc:
|
||||
print(f" decrypt failed: {exc}")
|
||||
continue
|
||||
print(f" has refresh_token: {bool(creds.get('refresh_token'))}")
|
||||
print(f" stored scopes: {creds.get('scopes')}")
|
||||
print(f" token_uri: {creds.get('token_uri')}")
|
||||
print(f" client_id (last 8): ...{(creds.get('client_id') or '')[-8:]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
35
api/scripts/reset_triage_queue_to_queued.py
Normal file
35
api/scripts/reset_triage_queue_to_queued.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""Re-queue all delivered (but not acked) triage rows so deliver_pending sends them again.
|
||||
|
||||
Usage:
|
||||
python scripts/reset_triage_queue_to_queued.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
_API_ROOT = Path(__file__).resolve().parent.parent
|
||||
if str(_API_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_API_ROOT))
|
||||
|
||||
from sqlalchemy import update
|
||||
|
||||
from app.db import async_session
|
||||
from app.models import ScoutTriageQueue
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
update(ScoutTriageQueue)
|
||||
.where(ScoutTriageQueue.status == "delivered")
|
||||
.values(status="queued", delivered_at=None)
|
||||
)
|
||||
await session.commit()
|
||||
print(f"Reset {result.rowcount} rows from delivered → queued")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
59
api/scripts/show_gmail_scout_state.py
Normal file
59
api/scripts/show_gmail_scout_state.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Print Gmail scout state for debugging.
|
||||
|
||||
Usage:
|
||||
python scripts/show_gmail_scout_state.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
_API_ROOT = Path(__file__).resolve().parent.parent
|
||||
if str(_API_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_API_ROOT))
|
||||
|
||||
from sqlalchemy import select, func
|
||||
|
||||
from app.db import async_session
|
||||
from app.models import CloudScoutConfig, ScoutTriageQueue, ScoutRunLog
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
async with async_session() as session:
|
||||
scouts = (
|
||||
await session.execute(
|
||||
select(CloudScoutConfig).where(CloudScoutConfig.provider == "gmail")
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
for scout in scouts:
|
||||
print(f"\nScout: {scout.name} (id={scout.id})")
|
||||
print(f" enabled: {scout.enabled}")
|
||||
print(f" gmail_history_id: {scout.gmail_history_id}")
|
||||
print(f" gmail_watch_expires_at: {scout.gmail_watch_expires_at}")
|
||||
print(f" auto_trash_spam: {scout.auto_trash_spam}")
|
||||
print(f" last_run_at: {scout.last_run_at}")
|
||||
|
||||
queued_count = (
|
||||
await session.execute(
|
||||
select(func.count())
|
||||
.select_from(ScoutTriageQueue)
|
||||
.where(ScoutTriageQueue.scout_id == scout.id)
|
||||
)
|
||||
).scalar()
|
||||
print(f" triage_queue rows: {queued_count}")
|
||||
|
||||
run_count = (
|
||||
await session.execute(
|
||||
select(func.count())
|
||||
.select_from(ScoutRunLog)
|
||||
.where(ScoutRunLog.scout_id == scout.id)
|
||||
)
|
||||
).scalar()
|
||||
print(f" scout_run_logs: {run_count}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
74
api/scripts/trigger_gmail_scout.py
Normal file
74
api/scripts/trigger_gmail_scout.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Manually trigger the user's Gmail scout for testing.
|
||||
|
||||
Usage:
|
||||
python scripts/trigger_gmail_scout.py [user_email]
|
||||
|
||||
If user_email omitted, picks the first user with a Gmail scout.
|
||||
Runs ScoutEngine.trigger_scout — which calls Gmail history.list since last
|
||||
gmail_history_id, fetches each new message, runs LLM triage, inserts queue rows
|
||||
for relevant items.
|
||||
|
||||
After running, check the queue:
|
||||
psql -d adiuvai -c "select source_msg_ref, triage_verdict, status from scout_triage_queue order by triaged_at desc limit 10"
|
||||
|
||||
Then restart the Electron app to trigger deliver_pending → frames → local
|
||||
scout_suggestions rows.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure api/ root is importable when running from scripts/ subdir
|
||||
_API_ROOT = Path(__file__).resolve().parent.parent
|
||||
if str(_API_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_API_ROOT))
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.db import async_session
|
||||
from app.models import CloudScoutConfig, User
|
||||
from app.scouts.connectors.gmail import GmailConnector
|
||||
from app.scouts.connectors.registry import register_connector
|
||||
from app.scouts.engine import ScoutEngine
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
register_connector(GmailConnector())
|
||||
|
||||
target_email = sys.argv[1] if len(sys.argv) > 1 else None
|
||||
|
||||
async with async_session() as session:
|
||||
q = select(CloudScoutConfig).where(
|
||||
CloudScoutConfig.provider == "gmail",
|
||||
CloudScoutConfig.enabled.is_(True),
|
||||
)
|
||||
if target_email:
|
||||
user = (
|
||||
await session.execute(select(User).where(User.email == target_email))
|
||||
).scalar_one_or_none()
|
||||
if user is None:
|
||||
print(f"No user with email {target_email}")
|
||||
return
|
||||
q = q.where(CloudScoutConfig.user_id == user.id)
|
||||
|
||||
scouts = (await session.execute(q)).scalars().all()
|
||||
|
||||
if not scouts:
|
||||
print("No enabled Gmail scouts found. Create one in Settings → Scouts first.")
|
||||
return
|
||||
|
||||
for scout in scouts:
|
||||
print(f"Triggering scout id={scout.id} name={scout.name!r} user={scout.user_id}")
|
||||
try:
|
||||
await ScoutEngine().trigger_scout(uuid.UUID(scout.id))
|
||||
print(" → done")
|
||||
except Exception as exc:
|
||||
print(f" → failed: {exc}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user