Files
api/app/api/routes/scouts.py

570 lines
20 KiB
Python

"""Scout routes.
Backend responsibilities are intentionally minimal:
GET /scouts/catalog — static catalog for UI display
POST /scouts/can-create — billing eligibility check
POST /scouts/trigger — trigger a local scout run
Scout configuration is owned by the Electron app and is not persisted
in backend scout-config tables.
Gmail OAuth setup (scout-specific consent):
GET /scouts/oauth/gmail/authorize — returns consent-screen URL
GET /scouts/oauth/gmail/web-callback — bounces to deep link (excluded from schema)
POST /scouts/oauth/gmail/callback — exchanges code, stores encrypted token
"""
from __future__ import annotations
import asyncio
import logging
import secrets
import time
import urllib.parse
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import RedirectResponse
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.api.deps import get_current_user
from app.auth.oauth_providers import generate_pkce_pair
from app.billing.tier_manager import FEATURES
from app.config.settings import settings
from app.core.scout_runner import is_agent_running, run_local_agent
from app.core.device_manager import device_manager
from app.core.note_summarizer import generate_note_summary
from app.db import get_session
from app.integrations import encrypt_token
from app.models import CloudScoutConfig, ScoutRunLog, LocalScoutConfig
from app.schemas import (
CloudScoutCreateRequest,
CloudScoutResponse,
CloudScoutUpdateRequest,
ScoutCatalogItem,
ScoutCreationCheckRequest,
ScoutCreationCheckResponse,
ScoutRunLogResponse,
ScoutTriggerRequest,
UserProfile,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/scouts", tags=["scouts"])
# ── Datetime helpers ──────────────────────────────────────────────────
def _dt_ms(dt: datetime) -> int:
return int(dt.timestamp() * 1000)
def _dt_ms_opt(dt: datetime | None) -> int | None:
return int(dt.timestamp() * 1000) if dt else None
def _to_data_types(values: list[str]) -> list[str]:
normalize = {
"task": "tasks", "tasks": "tasks",
"note": "notes", "notes": "notes",
"timeline": "timelines", "timelines": "timelines", "timelineEvents": "timelines",
"project": "projects", "projects": "projects",
}
seen: set[str] = set()
result: list[str] = []
for v in values:
mapped = normalize.get(v)
if mapped and mapped not in seen:
seen.add(mapped)
result.append(mapped)
return result
def _to_run_log_response(log: ScoutRunLog) -> ScoutRunLogResponse:
return ScoutRunLogResponse(
id=log.id,
agent_id=log.scout_id,
agent_type=log.scout_type, # type: ignore[arg-type]
status=log.status, # type: ignore[arg-type]
items_processed=log.items_processed,
items_created=log.items_created,
errors=log.errors or [],
started_at=_dt_ms(log.started_at),
completed_at=_dt_ms_opt(log.completed_at),
)
def _enforce_agent_limit(tier: str, current_count: int) -> int:
limit: int = FEATURES.get(tier, FEATURES["free"])["batch_active"]
if limit != -1 and current_count >= limit:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Agent limit ({limit}) reached for your tier. Upgrade to create more.",
)
return limit
async def _enforce_run_frequency(
tier: str,
user_id: str,
db: AsyncSession,
) -> None:
"""Raise HTTP 402 if the user has exceeded their daily batch run limit."""
limit: int = FEATURES.get(tier, FEATURES["free"])["batch_runs_per_day"]
if limit == -1:
return # unlimited
today_start = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
result = await db.execute(
select(func.count(ScoutRunLog.id)).where(
ScoutRunLog.user_id == user_id,
ScoutRunLog.started_at >= today_start,
)
)
runs_today: int = result.scalar_one()
if runs_today >= limit:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Daily batch run limit ({limit}) reached for your tier. Upgrade for more runs.",
)
# ── Catalog ───────────────────────────────────────────────────────────
@router.get("/catalog", response_model=list[ScoutCatalogItem])
async def get_agent_catalog(
current_user: UserProfile = Depends(get_current_user),
) -> list[ScoutCatalogItem]:
"""Return the static list of available agent types and their descriptions."""
return [
ScoutCatalogItem(
type="local_directory",
name="Local Directory Monitor",
description="Watches local directories, extracts data from files using AI",
),
ScoutCatalogItem(
type="gmail",
name="Gmail Connector",
description="Scans Gmail inbox, extracts tasks/notes from emails",
),
ScoutCatalogItem(
type="teams",
name="Microsoft Teams Connector",
description="Monitors Teams messages, extracts action items",
),
ScoutCatalogItem(
type="outlook",
name="Outlook Connector",
description="Scans Outlook inbox, extracts tasks/notes",
),
]
@router.post("/can-create", response_model=ScoutCreationCheckResponse)
async def can_create_agent(
body: ScoutCreationCheckRequest,
current_user: UserProfile = Depends(get_current_user),
) -> ScoutCreationCheckResponse:
"""Check if the user can create one more agent based on billing tier.
Since configuration is client-owned, the Electron app sends its current
active agent count and the backend applies tier limits.
"""
limit: int = FEATURES.get(current_user.tier, FEATURES["free"])["batch_active"]
allowed = limit == -1 or body.active_agents < limit
return ScoutCreationCheckResponse(
allowed=allowed,
tier=current_user.tier,
active_agents=body.active_agents,
limit=limit,
)
@router.post("/trigger", response_model=ScoutRunLogResponse, status_code=status.HTTP_202_ACCEPTED)
async def trigger_agent_run(
body: ScoutTriggerRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> ScoutRunLogResponse:
"""Trigger a local agent run using client-provided configuration."""
_enforce_agent_limit(current_user.tier, body.active_agents)
await _enforce_run_frequency(current_user.tier, current_user.id, db)
last_run_dt = (
datetime.fromtimestamp(body.last_run_at / 1000, tz=timezone.utc)
if body.last_run_at
else None
)
config = LocalScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
device_id=body.device_id,
name="Local Directory Monitor",
directory_paths=[body.directory],
data_types=_to_data_types(body.what_to_extract),
prompt_template=body.custom_agent_prompt or "",
scout_config=body.agent_config,
file_extensions=[],
schedule_cron=body.batch_interval,
enabled=True,
last_run_at=last_run_dt,
)
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id.
stable_agent_id = body.agent_id or config.id
if is_agent_running(stable_agent_id):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Agent is already running. Only one run per agent is allowed at a time.",
)
run_log = ScoutRunLog(
scout_id=stable_agent_id,
scout_type="local",
user_id=current_user.id,
status="running",
)
db.add(run_log)
await db.commit()
await db.refresh(run_log)
run_context = {
"type": "agent_batch",
"run_id": run_log.id,
"agent_id": stable_agent_id,
}
asyncio.create_task(
run_local_agent(current_user.id, config, run_log, device_manager, run_context)
)
return _to_run_log_response(run_log)
# ── Note summary endpoint ──────────────────────────────────────────────────────
class NoteSummarizeRequest(BaseModel):
title: str
content: str
class NoteSummarizeResponse(BaseModel):
summary: str
@router.post("/notes/summarize", response_model=NoteSummarizeResponse)
async def summarize_note(
body: NoteSummarizeRequest,
current_user: UserProfile = Depends(get_current_user),
) -> NoteSummarizeResponse:
"""Generate an AI summary for a note. Used by the Electron backfill on startup."""
summary = await generate_note_summary(body.title, body.content)
return NoteSummarizeResponse(summary=summary)
# ── Cloud scout CRUD ──────────────────────────────────────────────────────────
_DEFAULT_CLOUD_SCHEDULE = "0 */6 * * *"
def _to_cloud_response(scout: CloudScoutConfig) -> dict:
return {
"id": scout.id,
"user_id": scout.user_id,
"provider": scout.provider,
"name": scout.name,
"data_types": scout.data_types or [],
"prompt_template": scout.prompt_template or "",
"schedule_cron": scout.schedule_cron,
"filter_config": scout.filter_config,
"auto_trash_spam": scout.auto_trash_spam,
"enabled": scout.enabled,
"last_run_at": _dt_ms_opt(scout.last_run_at),
"gmail_address": scout.gmail_address,
"oauth_connected": scout.oauth_token_encrypted is not None,
"created_at": _dt_ms(scout.created_at),
"updated_at": _dt_ms(scout.updated_at),
}
@router.get("/cloud", response_model=list[CloudScoutResponse])
async def list_cloud_scouts(
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
rows = (await db.execute(
select(CloudScoutConfig).where(CloudScoutConfig.user_id == current_user.id)
)).scalars().all()
return [_to_cloud_response(s) for s in rows]
@router.post("/cloud", response_model=CloudScoutResponse, status_code=status.HTTP_201_CREATED)
async def create_cloud_scout(
body: CloudScoutCreateRequest,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id=current_user.id,
provider=body.provider,
name=body.name,
data_types=body.data_types,
prompt_template=body.prompt_template,
filter_config=body.filter_config,
schedule_cron=body.schedule_cron or _DEFAULT_CLOUD_SCHEDULE,
auto_trash_spam=body.auto_trash_spam,
enabled=True,
)
db.add(scout)
await db.commit()
await db.refresh(scout)
return _to_cloud_response(scout)
@router.put("/cloud/{scout_id}", response_model=CloudScoutResponse)
async def update_cloud_scout(
scout_id: str,
body: CloudScoutUpdateRequest,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
if body.name is not None:
scout.name = body.name
if body.data_types is not None:
scout.data_types = body.data_types
if body.prompt_template is not None:
scout.prompt_template = body.prompt_template
if body.schedule_cron is not None:
scout.schedule_cron = body.schedule_cron
if body.filter_config is not None:
scout.filter_config = body.filter_config
if body.auto_trash_spam is not None:
scout.auto_trash_spam = body.auto_trash_spam
if body.enabled is not None:
scout.enabled = body.enabled
await db.commit()
await db.refresh(scout)
return _to_cloud_response(scout)
@router.delete("/cloud/{scout_id}")
async def delete_cloud_scout(
scout_id: str,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
):
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
await db.delete(scout)
await db.commit()
return {"ok": True}
# ── Gmail OAuth setup (scout-specific) ───────────────────────────────────────
# Scopes required for Gmail scout connectivity.
_GMAIL_SCOUT_SCOPES = [
"openid",
"email",
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.modify",
]
# Google OAuth endpoints.
_GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
# In-memory pending OAuth states for scout Gmail consent:
# state → (code_verifier, scout_id, user_id, expires_at_epoch_s)
# Production note: replace with Redis for multi-process deployments.
_pending_scout_oauth_states: dict[str, tuple[str, str, str, float]] = {}
_SCOUT_OAUTH_TTL_SECONDS = 600 # 10 minutes
def _scout_gmail_redirect_uri() -> str:
"""Derive the scout Gmail web-callback URI from the configured base OAUTH_REDIRECT_URI.
``OAUTH_REDIRECT_URI`` is the full path used for login OAuth
(e.g. http://localhost:8000/api/v1/auth/oauth/google/web-callback).
We strip the path to get the scheme+host base, then append the scout path.
"""
parsed = urllib.parse.urlparse(settings.OAUTH_REDIRECT_URI)
base = f"{parsed.scheme}://{parsed.netloc}"
return f"{base}/api/v1/scouts/oauth/gmail/web-callback"
class _ScoutGmailAuthorizeResponse(BaseModel):
authorize_url: str
class _ScoutGmailCallbackBody(BaseModel):
code: str
state: str
@router.get("/oauth/gmail/authorize", response_model=_ScoutGmailAuthorizeResponse)
async def scout_gmail_oauth_authorize(
scout_id: str,
current_user: UserProfile = Depends(get_current_user),
) -> _ScoutGmailAuthorizeResponse:
"""Start the Gmail OAuth flow for a specific cloud scout.
Returns the Google consent-screen URL. The client opens this URL in the
system browser; after consent Google redirects to web-callback which bounces
to the ``adiuvai://scout/oauth/gmail/callback`` deep link.
"""
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
"Google OAuth is not configured on this server",
)
code_verifier, code_challenge = generate_pkce_pair()
state = secrets.token_urlsafe(32)
# Purge expired states to prevent unbounded growth.
now = time.time()
expired = [s for s, (_, _, _, exp) in _pending_scout_oauth_states.items() if exp < now]
for s in expired:
del _pending_scout_oauth_states[s]
_pending_scout_oauth_states[state] = (code_verifier, scout_id, current_user.id, now + _SCOUT_OAUTH_TTL_SECONDS)
redirect_uri = _scout_gmail_redirect_uri()
params = {
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(_GMAIL_SCOUT_SCOPES),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "consent",
}
authorize_url = f"{_GOOGLE_AUTH_URL}?{urllib.parse.urlencode(params)}"
return _ScoutGmailAuthorizeResponse(authorize_url=authorize_url)
@router.get("/oauth/gmail/web-callback", include_in_schema=False)
async def scout_gmail_oauth_web_callback(code: str, state: str) -> RedirectResponse:
"""Google redirects here after Gmail consent.
Immediately bounces to the Electron deep link so the desktop app
receives the authorization code.
"""
params = urllib.parse.urlencode({"code": code, "state": state})
deep_link = f"adiuvai://scout/oauth/gmail/callback?{params}"
return RedirectResponse(url=deep_link, status_code=302)
@router.post("/oauth/gmail/callback")
async def scout_gmail_oauth_callback(
body: _ScoutGmailCallbackBody,
db: AsyncSession = Depends(get_session),
current_user: UserProfile = Depends(get_current_user),
) -> dict:
"""Exchange the Gmail authorization code and store the encrypted token on the scout.
Called by the Electron app after it receives the deep-link callback with
the ``code`` and ``state`` params.
"""
entry = _pending_scout_oauth_states.pop(body.state, None)
if entry is None or entry[3] < time.time() or entry[2] != current_user.id:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
code_verifier, scout_id, _, _ = entry
redirect_uri = _scout_gmail_redirect_uri()
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
_GOOGLE_TOKEN_URL,
data={
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"client_secret": settings.GOOGLE_AUTH_CLIENT_SECRET,
"code": body.code,
"code_verifier": code_verifier,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
logger.error("Gmail token exchange failed: %s", exc.response.text)
raise HTTPException(status.HTTP_502_BAD_GATEWAY, "Failed to exchange Gmail authorization code")
token_data = response.json()
creds_dict: dict = {
"token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"token_uri": _GOOGLE_TOKEN_URL,
"client_id": settings.GOOGLE_AUTH_CLIENT_ID,
"client_secret": settings.GOOGLE_AUTH_CLIENT_SECRET,
"scopes": [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.modify",
],
}
encrypted = encrypt_token(creds_dict)
scout = await db.get(CloudScoutConfig, scout_id)
if scout is None or scout.user_id != current_user.id:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Scout not found")
scout.oauth_token_encrypted = encrypted
# Fetch the connected Gmail address for display.
try:
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
def _fetch_email() -> str | None:
creds = Credentials(
token=creds_dict["token"],
refresh_token=creds_dict.get("refresh_token"),
token_uri=creds_dict["token_uri"],
client_id=creds_dict["client_id"],
client_secret=creds_dict["client_secret"],
scopes=creds_dict["scopes"],
)
service = build("gmail", "v1", credentials=creds, cache_discovery=False)
profile = service.users().getProfile(userId="me").execute()
return profile.get("emailAddress")
scout.gmail_address = await asyncio.to_thread(_fetch_email)
except Exception:
logger.exception("failed to fetch gmail address for scout %s", scout_id)
await db.commit()
# Attempt to set up Gmail push watch so we start receiving Pub/Sub notifications.
from app.scouts.connectors.registry import get_connector
try:
connector = get_connector("gmail")
await connector.setup_watch(scout)
await db.commit()
except KeyError:
logger.warning("gmail connector not registered — skipping setup_watch for scout %s", scout_id)
except Exception:
logger.exception("setup_watch failed for scout %s", scout_id)
return {"ok": True}