feat(scouts): add cron-fallback poll + gmail watch renewal ticks
This commit is contained in:
93
app/main.py
93
app/main.py
@@ -77,6 +77,91 @@ async def _memory_cron_tick() -> None:
|
|||||||
_log.warning("memory cron tick: failed: %s", exc)
|
_log.warning("memory cron tick: failed: %s", exc)
|
||||||
|
|
||||||
|
|
||||||
|
async def _scout_cron_tick() -> None:
|
||||||
|
"""Every-15-min cron: poll enabled cloud scouts (cron-fallback; push is primary).
|
||||||
|
|
||||||
|
Skips any scout whose ``last_run_at`` is within the last 5 minutes so
|
||||||
|
a push notification and the fallback cron don't double-fire within the
|
||||||
|
same window.
|
||||||
|
"""
|
||||||
|
import logging # noqa: PLC0415
|
||||||
|
import uuid # noqa: PLC0415
|
||||||
|
from datetime import datetime, timezone # noqa: PLC0415
|
||||||
|
|
||||||
|
_log = logging.getLogger(__name__)
|
||||||
|
_log.info("scout cron tick: starting")
|
||||||
|
try:
|
||||||
|
from app.db import async_session # noqa: PLC0415
|
||||||
|
from app.models import CloudScoutConfig # noqa: PLC0415
|
||||||
|
from app.scouts.engine import ScoutEngine # noqa: PLC0415
|
||||||
|
from sqlalchemy import select # noqa: PLC0415
|
||||||
|
|
||||||
|
async with async_session() as session:
|
||||||
|
scouts = (await session.execute(
|
||||||
|
select(CloudScoutConfig).where(CloudScoutConfig.enabled == True) # noqa: E712
|
||||||
|
)).scalars().all()
|
||||||
|
|
||||||
|
engine = ScoutEngine()
|
||||||
|
triggered = 0
|
||||||
|
for scout in scouts:
|
||||||
|
# Rate-limit guard: push is primary; skip if ran within 5 minutes.
|
||||||
|
if scout.last_run_at:
|
||||||
|
elapsed = (datetime.now(tz=timezone.utc) - scout.last_run_at).total_seconds()
|
||||||
|
if elapsed < 300:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
await engine.trigger_scout(uuid.UUID(str(scout.id)))
|
||||||
|
triggered += 1
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("scout cron tick: trigger failed scout=%s: %s", scout.id, exc)
|
||||||
|
|
||||||
|
_log.info("scout cron tick: done triggered=%d total=%d", triggered, len(scouts))
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("scout cron tick: failed: %s", exc)
|
||||||
|
|
||||||
|
|
||||||
|
async def _scout_watch_renewal_tick() -> None:
|
||||||
|
"""Every-24-hour cron: re-issue Gmail users.watch for scouts expiring within 24h.
|
||||||
|
|
||||||
|
Handles missing or misconfigured connectors gracefully — logs and continues.
|
||||||
|
"""
|
||||||
|
import logging # noqa: PLC0415
|
||||||
|
from datetime import datetime, timedelta, timezone # noqa: PLC0415
|
||||||
|
|
||||||
|
_log = logging.getLogger(__name__)
|
||||||
|
_log.info("scout watch renewal tick: starting")
|
||||||
|
try:
|
||||||
|
from app.db import async_session # noqa: PLC0415
|
||||||
|
from app.models import CloudScoutConfig # noqa: PLC0415
|
||||||
|
from app.scouts.connectors.registry import get_connector # noqa: PLC0415
|
||||||
|
from sqlalchemy import select # noqa: PLC0415
|
||||||
|
|
||||||
|
threshold = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
||||||
|
renewed = 0
|
||||||
|
async with async_session() as session:
|
||||||
|
scouts = (await session.execute(
|
||||||
|
select(CloudScoutConfig).where(
|
||||||
|
CloudScoutConfig.enabled == True, # noqa: E712
|
||||||
|
CloudScoutConfig.provider == "gmail",
|
||||||
|
CloudScoutConfig.gmail_watch_expires_at <= threshold,
|
||||||
|
)
|
||||||
|
)).scalars().all()
|
||||||
|
|
||||||
|
for scout in scouts:
|
||||||
|
try:
|
||||||
|
connector = get_connector("gmail")
|
||||||
|
await connector.renew_watch(scout)
|
||||||
|
renewed += 1
|
||||||
|
except Exception:
|
||||||
|
_log.exception("scout watch renewal tick: renew failed scout=%s", scout.id)
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
_log.info("scout watch renewal tick: done renewed=%d", renewed)
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("scout watch renewal tick: failed: %s", exc)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# Startup: register source connectors.
|
# Startup: register source connectors.
|
||||||
@@ -94,6 +179,14 @@ async def lifespan(app: FastAPI):
|
|||||||
scheduler = AsyncIOScheduler()
|
scheduler = AsyncIOScheduler()
|
||||||
scheduler.add_job(_memory_cron_tick, "interval", hours=1, id="memory_cron")
|
scheduler.add_job(_memory_cron_tick, "interval", hours=1, id="memory_cron")
|
||||||
scheduler.add_job(_memory_audit_cron_tick, "interval", weeks=1, id="memory_audit_cron")
|
scheduler.add_job(_memory_audit_cron_tick, "interval", weeks=1, id="memory_audit_cron")
|
||||||
|
scheduler.add_job(
|
||||||
|
_scout_cron_tick, "interval", minutes=15,
|
||||||
|
id="scout_cron_tick", replace_existing=True,
|
||||||
|
)
|
||||||
|
scheduler.add_job(
|
||||||
|
_scout_watch_renewal_tick, "interval", hours=24,
|
||||||
|
id="scout_watch_renewal_tick", replace_existing=True,
|
||||||
|
)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
logging.getLogger(__name__).info("memory cron scheduler started (interval=1h)")
|
logging.getLogger(__name__).info("memory cron scheduler started (interval=1h)")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user