diff --git a/app/main.py b/app/main.py index 62b6954..b6bc9a1 100644 --- a/app/main.py +++ b/app/main.py @@ -77,6 +77,91 @@ async def _memory_cron_tick() -> None: _log.warning("memory cron tick: failed: %s", exc) +async def _scout_cron_tick() -> None: + """Every-15-min cron: poll enabled cloud scouts (cron-fallback; push is primary). + + Skips any scout whose ``last_run_at`` is within the last 5 minutes so + a push notification and the fallback cron don't double-fire within the + same window. + """ + import logging # noqa: PLC0415 + import uuid # noqa: PLC0415 + from datetime import datetime, timezone # noqa: PLC0415 + + _log = logging.getLogger(__name__) + _log.info("scout cron tick: starting") + try: + from app.db import async_session # noqa: PLC0415 + from app.models import CloudScoutConfig # noqa: PLC0415 + from app.scouts.engine import ScoutEngine # noqa: PLC0415 + from sqlalchemy import select # noqa: PLC0415 + + async with async_session() as session: + scouts = (await session.execute( + select(CloudScoutConfig).where(CloudScoutConfig.enabled == True) # noqa: E712 + )).scalars().all() + + engine = ScoutEngine() + triggered = 0 + for scout in scouts: + # Rate-limit guard: push is primary; skip if ran within 5 minutes. + if scout.last_run_at: + elapsed = (datetime.now(tz=timezone.utc) - scout.last_run_at).total_seconds() + if elapsed < 300: + continue + try: + await engine.trigger_scout(uuid.UUID(str(scout.id))) + triggered += 1 + except Exception as exc: + _log.warning("scout cron tick: trigger failed scout=%s: %s", scout.id, exc) + + _log.info("scout cron tick: done triggered=%d total=%d", triggered, len(scouts)) + except Exception as exc: + _log.warning("scout cron tick: failed: %s", exc) + + +async def _scout_watch_renewal_tick() -> None: + """Every-24-hour cron: re-issue Gmail users.watch for scouts expiring within 24h. + + Handles missing or misconfigured connectors gracefully — logs and continues. + """ + import logging # noqa: PLC0415 + from datetime import datetime, timedelta, timezone # noqa: PLC0415 + + _log = logging.getLogger(__name__) + _log.info("scout watch renewal tick: starting") + try: + from app.db import async_session # noqa: PLC0415 + from app.models import CloudScoutConfig # noqa: PLC0415 + from app.scouts.connectors.registry import get_connector # noqa: PLC0415 + from sqlalchemy import select # noqa: PLC0415 + + threshold = datetime.now(tz=timezone.utc) + timedelta(hours=24) + renewed = 0 + async with async_session() as session: + scouts = (await session.execute( + select(CloudScoutConfig).where( + CloudScoutConfig.enabled == True, # noqa: E712 + CloudScoutConfig.provider == "gmail", + CloudScoutConfig.gmail_watch_expires_at <= threshold, + ) + )).scalars().all() + + for scout in scouts: + try: + connector = get_connector("gmail") + await connector.renew_watch(scout) + renewed += 1 + except Exception: + _log.exception("scout watch renewal tick: renew failed scout=%s", scout.id) + + await session.commit() + + _log.info("scout watch renewal tick: done renewed=%d", renewed) + except Exception as exc: + _log.warning("scout watch renewal tick: failed: %s", exc) + + @asynccontextmanager async def lifespan(app: FastAPI): # Startup: register source connectors. @@ -94,6 +179,14 @@ async def lifespan(app: FastAPI): scheduler = AsyncIOScheduler() scheduler.add_job(_memory_cron_tick, "interval", hours=1, id="memory_cron") scheduler.add_job(_memory_audit_cron_tick, "interval", weeks=1, id="memory_audit_cron") + scheduler.add_job( + _scout_cron_tick, "interval", minutes=15, + id="scout_cron_tick", replace_existing=True, + ) + scheduler.add_job( + _scout_watch_renewal_tick, "interval", hours=24, + id="scout_watch_renewal_tick", replace_existing=True, + ) scheduler.start() logging.getLogger(__name__).info("memory cron scheduler started (interval=1h)")