"""ScoutEngine — orchestrates triage, queueing, and delivery for cloud scouts. Triage flow per scout: 1. Resolve scout config from the DB. 2. Skip if device hasn't connected within ``device_inactivity_pause_days``. 3. Ask the connector to ``list_new`` — fresh items since last poll. 4. For each item: - skip if already in the queue (idempotent on (scout_id, source_msg_ref)) - fetch the full content via the connector (transient, never persisted) - run the triage LLM call → relevant | spam - spam + auto_trash_spam → connector.archive - relevant → INSERT scout_triage_queue row 5. Update scout.last_run_at. Delivery flow on Electron WS reconnect: - drain ``status='queued'`` rows for the user - fetch metadata-only for each (subject + snippet) - send a ``scout_proposal`` frame - flip status to ``delivered`` on ack """ from __future__ import annotations import logging import uuid from datetime import datetime, timedelta, timezone from sqlalchemy import select from sqlalchemy.exc import IntegrityError from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_llm from app.db import async_session from app.models import CloudScoutConfig, ScoutTriageQueue from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict from app.scouts.connectors.registry import get_connector logger = logging.getLogger(__name__) QUEUE_TTL_DAYS = 30 class ScoutEngine: def __init__(self, session_factory=None) -> None: self._session_factory = session_factory or async_session async def trigger_scout(self, scout_id: uuid.UUID) -> None: async with self._session_factory() as session: scout = await session.get(CloudScoutConfig, str(scout_id)) if scout is None: logger.warning("trigger_scout: no such scout id=%s", scout_id) return if not scout.enabled: return # Device-inactivity pause check is a simple heuristic on last_run_at — # the device-online signal lives in the DeviceConnectionManager and is # consulted at delivery time. For triage, we only check that the # configured pause threshold isn't suppressing the run. connector = get_connector(scout.provider) try: refs = await connector.list_new(scout) except Exception: logger.exception("scout %s: list_new failed", scout.id) return for ref in refs: await self._process_item(session, scout, connector, ref) scout.last_run_at = datetime.now(tz=timezone.utc) await session.commit() async def _process_item( self, session, scout: CloudScoutConfig, connector, ref: ItemRef, ) -> None: # Idempotency check existing = await session.execute( select(ScoutTriageQueue.id).where( ScoutTriageQueue.scout_id == scout.id, ScoutTriageQueue.source_msg_ref == ref.source_msg_ref, ) ) if existing.first() is not None: return try: content = await connector.fetch_content(scout, ref) except Exception: logger.exception("scout %s: fetch_content failed for %s", scout.id, ref.source_msg_ref) return try: verdict = await self._triage_llm(scout, content) except Exception: logger.exception("scout %s: triage_llm failed for %s", scout.id, ref.source_msg_ref) return if verdict.verdict == "spam": if scout.auto_trash_spam: try: await connector.archive(scout, ref) except Exception: logger.exception("scout %s: archive failed for %s", scout.id, ref.source_msg_ref) return now = datetime.now(tz=timezone.utc) row = ScoutTriageQueue( id=str(uuid.uuid4()), user_id=scout.user_id, scout_id=scout.id, source_type=connector.source_type, source_msg_ref=ref.source_msg_ref, triage_verdict=verdict.verdict, triage_reason=verdict.reason, status="queued", triaged_at=now, expires_at=now + timedelta(days=QUEUE_TTL_DAYS), ) session.add(row) try: # Use a savepoint so an IntegrityError on race doesn't poison the # outer session — works on both PostgreSQL (SAVEPOINT) and SQLite. async with session.begin_nested(): await session.flush() except IntegrityError: # Race: another worker inserted between our SELECT and INSERT. # The unique constraint did its job; safe to ignore. logger.debug( "scout %s: idempotent skip for %s (race on unique constraint)", scout.id, ref.source_msg_ref, ) async def deliver_pending(self, user_id: uuid.UUID, ws) -> None: """Drain status='queued' rows for user, send scout_proposal WS frames, flip to 'delivered'.""" from app.scouts.connectors.base import ItemRef # noqa: PLC0415 async with self._session_factory() as session: rows = (await session.execute( select(ScoutTriageQueue).where( ScoutTriageQueue.user_id == str(user_id), ScoutTriageQueue.status == "queued", ) )).scalars().all() for row in rows: try: connector = get_connector(row.source_type) except KeyError: logger.warning("deliver_pending: no connector for %s", row.source_type) continue scout = await session.get(CloudScoutConfig, row.scout_id) if scout is None: continue try: meta = await connector.fetch_metadata(scout, ItemRef(source_msg_ref=row.source_msg_ref)) except Exception: logger.exception("deliver_pending: fetch_metadata failed") continue payload = { "type": "scout_proposal", "proposal": { "id": row.id, "scout_id": row.scout_id, "source_type": row.source_type, "source_msg_ref": row.source_msg_ref, "raw_subject": meta.subject, "raw_snippet": meta.snippet, "category": "unprocessed", "payload": None, }, } await ws.send_json(payload) row.status = "delivered" row.delivered_at = datetime.now(tz=timezone.utc) await session.commit() async def ack_proposal(self, proposal_id: str) -> None: """Flip a delivered proposal to acked. Idempotent — no-op if already acked.""" async with self._session_factory() as session: row = await session.get(ScoutTriageQueue, proposal_id) if row is None: return row.status = "acked" row.acked_at = datetime.now(tz=timezone.utc) await session.commit() async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict: """Call the scout-triage-system Langfuse prompt to classify an item as relevant or spam. Uses gpt-4o-mini with JSON mode. Wraps the LLM call in a Langfuse generation observation when Langfuse is configured. """ import json # noqa: PLC0415 from langchain_core.messages import HumanMessage, SystemMessage # noqa: PLC0415 _TRIAGE_FALLBACK = ( "You are a triage classifier for an executive-assistant scout that watches a " "{source_type} feed.\n" 'The scout\'s purpose is: "{scout_purpose}".\n\n' "Given one item, decide whether it is RELEVANT (worth surfacing to the user as a " "potential task / event / note / project) or SPAM (advertising, mass marketing, " "phishing, bulk notifications with no actionable content).\n\n" "Item:\n" " - Subject: {item_subject}\n" " - From: {item_sender}\n" " - Body (truncated): {item_body_truncated_2k}\n\n" 'Return JSON only, matching this schema:\n' ' {{"verdict": "relevant" | "spam", "reason": , "confidence": <0..1>}}\n\n' "Be conservative on \"spam\" — if a message could plausibly be a personal/work " "email, mark it relevant." ) template, prompt_obj = get_prompt_or_fallback("scout-triage-system", _TRIAGE_FALLBACK) body_trunc = (content.body_text or "")[:2000] variables = dict( source_type=scout.provider, scout_purpose=scout.prompt_template or "", item_subject=content.metadata.subject or "", item_sender=content.metadata.sender or "", item_body_truncated_2k=body_trunc, ) if prompt_obj is not None: try: system_text = prompt_obj.compile(**variables) if isinstance(system_text, list): system_text = "\n".join( m.get("content", "") for m in system_text if isinstance(m, dict) ) except Exception as exc: logger.warning("scout triage: compile failed: %s", exc) system_text = template.replace("{{source_type}}", variables["source_type"]) \ .replace("{{scout_purpose}}", variables["scout_purpose"]) \ .replace("{{item_subject}}", variables["item_subject"]) \ .replace("{{item_sender}}", variables["item_sender"]) \ .replace("{{item_body_truncated_2k}}", variables["item_body_truncated_2k"]) else: system_text = template.format(**variables) llm = get_llm(model="gpt-4o-mini", temperature=0) llm_json = llm.bind(response_format={"type": "json_object"}) # type: ignore[attr-defined] messages = [ SystemMessage(content=system_text), HumanMessage(content="Classify this item."), ] lf = get_langfuse() if lf: with lf.start_as_current_observation( as_type="generation", name="scout-triage", model="gpt-4o-mini", prompt=prompt_obj, input=messages, ) as gen: response = await llm_json.ainvoke(messages) gen.update(output=response.content, usage=extract_usage(response)) else: response = await llm_json.ainvoke(messages) data = json.loads(response.content) return TriageVerdict(**data)