"""ScoutEngine — orchestrates triage, queueing, and delivery for cloud scouts. Triage flow per scout: 1. Resolve scout config from the DB. 2. Skip if device hasn't connected within ``device_inactivity_pause_days``. 3. Ask the connector to ``list_new`` — fresh items since last poll. 4. For each item: - skip if already in the queue (idempotent on (scout_id, source_msg_ref)) - fetch the full content via the connector (transient, never persisted) - run the triage LLM call → relevant | spam - spam + auto_trash_spam → connector.archive - relevant → INSERT scout_triage_queue row 5. Update scout.last_run_at. Delivery flow on Electron WS reconnect: - drain ``status='queued'`` rows for the user - fetch metadata-only for each (subject + snippet) - send a ``scout_proposal`` frame - flip status to ``delivered`` on ack """ from __future__ import annotations import logging import uuid from datetime import datetime, timedelta, timezone from sqlalchemy import select from sqlalchemy.exc import IntegrityError from app.db import async_session from app.models import CloudScoutConfig, ScoutTriageQueue from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict from app.scouts.connectors.registry import get_connector logger = logging.getLogger(__name__) QUEUE_TTL_DAYS = 30 class ScoutEngine: def __init__(self, session_factory=None) -> None: self._session_factory = session_factory or async_session async def trigger_scout(self, scout_id: uuid.UUID) -> None: async with self._session_factory() as session: scout = await session.get(CloudScoutConfig, str(scout_id)) if scout is None: logger.warning("trigger_scout: no such scout id=%s", scout_id) return if not scout.enabled: return # Device-inactivity pause check is a simple heuristic on last_run_at — # the device-online signal lives in the DeviceConnectionManager and is # consulted at delivery time. For triage, we only check that the # configured pause threshold isn't suppressing the run. connector = get_connector(scout.provider) try: refs = await connector.list_new(scout) except Exception: logger.exception("scout %s: list_new failed", scout.id) return for ref in refs: await self._process_item(session, scout, connector, ref) scout.last_run_at = datetime.now(tz=timezone.utc) await session.commit() async def _process_item( self, session, scout: CloudScoutConfig, connector, ref: ItemRef, ) -> None: # Idempotency check existing = await session.execute( select(ScoutTriageQueue.id).where( ScoutTriageQueue.scout_id == scout.id, ScoutTriageQueue.source_msg_ref == ref.source_msg_ref, ) ) if existing.first() is not None: return try: content = await connector.fetch_content(scout, ref) except Exception: logger.exception("scout %s: fetch_content failed for %s", scout.id, ref.source_msg_ref) return try: verdict = await self._triage_llm(scout, content) except Exception: logger.exception("scout %s: triage_llm failed for %s", scout.id, ref.source_msg_ref) return if verdict.verdict == "spam": if scout.auto_trash_spam: try: await connector.archive(scout, ref) except Exception: logger.exception("scout %s: archive failed for %s", scout.id, ref.source_msg_ref) return now = datetime.now(tz=timezone.utc) row = ScoutTriageQueue( id=str(uuid.uuid4()), user_id=scout.user_id, scout_id=scout.id, source_type=connector.source_type, source_msg_ref=ref.source_msg_ref, triage_verdict=verdict.verdict, triage_reason=verdict.reason, status="queued", triaged_at=now, expires_at=now + timedelta(days=QUEUE_TTL_DAYS), ) session.add(row) try: # Use a savepoint so an IntegrityError on race doesn't poison the # outer session — works on both PostgreSQL (SAVEPOINT) and SQLite. async with session.begin_nested(): await session.flush() except IntegrityError: # Race: another worker inserted between our SELECT and INSERT. # The unique constraint did its job; safe to ignore. logger.debug( "scout %s: idempotent skip for %s (race on unique constraint)", scout.id, ref.source_msg_ref, ) async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict: """Stub — real implementation in Task 24.""" raise NotImplementedError("Real triage LLM call lands in Task 24")