"""ScoutEngine — orchestrates triage, queueing, and delivery for cloud scouts. Triage flow per scout: 1. Resolve scout config from the DB. 2. Skip if device hasn't connected within ``device_inactivity_pause_days``. 3. Ask the connector to ``list_new`` — fresh items since last poll. 4. For each item: - skip if already in the queue (idempotent on (scout_id, source_msg_ref)) - fetch the full content via the connector (transient, never persisted) - run the triage LLM call → relevant | spam - spam + auto_trash_spam → connector.archive - relevant → INSERT scout_triage_queue row 5. Update scout.last_run_at. Delivery flow on Electron WS reconnect: - drain ``status='queued'`` rows for the user - fetch metadata-only for each (subject + snippet) - send a ``scout_proposal`` frame - flip status to ``delivered`` on ack """ from __future__ import annotations import logging import uuid from datetime import datetime, timedelta, timezone from sqlalchemy import select from sqlalchemy.exc import IntegrityError from app.db import async_session from app.models import CloudScoutConfig, ScoutTriageQueue from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict from app.scouts.connectors.registry import get_connector logger = logging.getLogger(__name__) QUEUE_TTL_DAYS = 30 class ScoutEngine: def __init__(self, session_factory=None) -> None: self._session_factory = session_factory or async_session async def trigger_scout(self, scout_id: uuid.UUID) -> None: async with self._session_factory() as session: scout = await session.get(CloudScoutConfig, str(scout_id)) if scout is None: logger.warning("trigger_scout: no such scout id=%s", scout_id) return if not scout.enabled: return # Device-inactivity pause check is a simple heuristic on last_run_at — # the device-online signal lives in the DeviceConnectionManager and is # consulted at delivery time. For triage, we only check that the # configured pause threshold isn't suppressing the run. connector = get_connector(scout.provider) try: refs = await connector.list_new(scout) except Exception: logger.exception("scout %s: list_new failed", scout.id) return for ref in refs: await self._process_item(session, scout, connector, ref) scout.last_run_at = datetime.now(tz=timezone.utc) await session.commit() async def _process_item( self, session, scout: CloudScoutConfig, connector, ref: ItemRef, ) -> None: # Idempotency check existing = await session.execute( select(ScoutTriageQueue.id).where( ScoutTriageQueue.scout_id == scout.id, ScoutTriageQueue.source_msg_ref == ref.source_msg_ref, ) ) if existing.first() is not None: return try: content = await connector.fetch_content(scout, ref) except Exception: logger.exception("scout %s: fetch_content failed for %s", scout.id, ref.source_msg_ref) return try: verdict = await self._triage_llm(scout, content) except Exception: logger.exception("scout %s: triage_llm failed for %s", scout.id, ref.source_msg_ref) return if verdict.verdict == "spam": if scout.auto_trash_spam: try: await connector.archive(scout, ref) except Exception: logger.exception("scout %s: archive failed for %s", scout.id, ref.source_msg_ref) return now = datetime.now(tz=timezone.utc) row = ScoutTriageQueue( id=str(uuid.uuid4()), user_id=scout.user_id, scout_id=scout.id, source_type=connector.source_type, source_msg_ref=ref.source_msg_ref, triage_verdict=verdict.verdict, triage_reason=verdict.reason, status="queued", triaged_at=now, expires_at=now + timedelta(days=QUEUE_TTL_DAYS), ) session.add(row) try: # Use a savepoint so an IntegrityError on race doesn't poison the # outer session — works on both PostgreSQL (SAVEPOINT) and SQLite. async with session.begin_nested(): await session.flush() except IntegrityError: # Race: another worker inserted between our SELECT and INSERT. # The unique constraint did its job; safe to ignore. logger.debug( "scout %s: idempotent skip for %s (race on unique constraint)", scout.id, ref.source_msg_ref, ) async def deliver_pending(self, user_id: uuid.UUID, ws) -> None: """Drain status='queued' rows for user, send scout_proposal WS frames, flip to 'delivered'.""" from app.scouts.connectors.base import ItemRef # noqa: PLC0415 async with self._session_factory() as session: rows = (await session.execute( select(ScoutTriageQueue).where( ScoutTriageQueue.user_id == str(user_id), ScoutTriageQueue.status == "queued", ) )).scalars().all() for row in rows: try: connector = get_connector(row.source_type) except KeyError: logger.warning("deliver_pending: no connector for %s", row.source_type) continue scout = await session.get(CloudScoutConfig, row.scout_id) if scout is None: continue try: meta = await connector.fetch_metadata(scout, ItemRef(source_msg_ref=row.source_msg_ref)) except Exception: logger.exception("deliver_pending: fetch_metadata failed") continue payload = { "type": "scout_proposal", "proposal": { "id": row.id, "scout_id": row.scout_id, "source_type": row.source_type, "source_msg_ref": row.source_msg_ref, "raw_subject": meta.subject, "raw_snippet": meta.snippet, "category": "unprocessed", "payload": None, }, } await ws.send_json(payload) row.status = "delivered" row.delivered_at = datetime.now(tz=timezone.utc) await session.commit() async def ack_proposal(self, proposal_id: str) -> None: """Flip a delivered proposal to acked. Idempotent — no-op if already acked.""" async with self._session_factory() as session: row = await session.get(ScoutTriageQueue, proposal_id) if row is None: return row.status = "acked" row.acked_at = datetime.now(tz=timezone.utc) await session.commit() async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict: """Stub — real implementation in Task 24.""" raise NotImplementedError("Real triage LLM call lands in Task 24")