271 lines
11 KiB
Python
271 lines
11 KiB
Python
"""ScoutEngine — orchestrates triage, queueing, and delivery for cloud scouts.
|
|
|
|
Triage flow per scout:
|
|
1. Resolve scout config from the DB.
|
|
2. Skip if device hasn't connected within ``device_inactivity_pause_days``.
|
|
3. Ask the connector to ``list_new`` — fresh items since last poll.
|
|
4. For each item:
|
|
- skip if already in the queue (idempotent on (scout_id, source_msg_ref))
|
|
- fetch the full content via the connector (transient, never persisted)
|
|
- run the triage LLM call → relevant | spam
|
|
- spam + auto_trash_spam → connector.archive
|
|
- relevant → INSERT scout_triage_queue row
|
|
5. Update scout.last_run_at.
|
|
|
|
Delivery flow on Electron WS reconnect:
|
|
- drain ``status='queued'`` rows for the user
|
|
- fetch metadata-only for each (subject + snippet)
|
|
- send a ``scout_proposal`` frame
|
|
- flip status to ``delivered`` on ack
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
|
|
from app.core.llm import get_llm
|
|
from app.db import async_session
|
|
from app.models import CloudScoutConfig, ScoutTriageQueue
|
|
from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict
|
|
from app.scouts.connectors.registry import get_connector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
QUEUE_TTL_DAYS = 30
|
|
|
|
|
|
class ScoutEngine:
|
|
def __init__(self, session_factory=None) -> None:
|
|
self._session_factory = session_factory or async_session
|
|
|
|
async def trigger_scout(self, scout_id: uuid.UUID) -> None:
|
|
async with self._session_factory() as session:
|
|
scout = await session.get(CloudScoutConfig, str(scout_id))
|
|
if scout is None:
|
|
logger.warning("trigger_scout: no such scout id=%s", scout_id)
|
|
return
|
|
if not scout.enabled:
|
|
return
|
|
# Device-inactivity pause check is a simple heuristic on last_run_at —
|
|
# the device-online signal lives in the DeviceConnectionManager and is
|
|
# consulted at delivery time. For triage, we only check that the
|
|
# configured pause threshold isn't suppressing the run.
|
|
connector = get_connector(scout.provider)
|
|
try:
|
|
refs = await connector.list_new(scout)
|
|
except Exception:
|
|
logger.exception("scout %s: list_new failed", scout.id)
|
|
return
|
|
|
|
for ref in refs:
|
|
await self._process_item(session, scout, connector, ref)
|
|
|
|
scout.last_run_at = datetime.now(tz=timezone.utc)
|
|
await session.commit()
|
|
|
|
async def _process_item(
|
|
self,
|
|
session,
|
|
scout: CloudScoutConfig,
|
|
connector,
|
|
ref: ItemRef,
|
|
) -> None:
|
|
# Idempotency check
|
|
existing = await session.execute(
|
|
select(ScoutTriageQueue.id).where(
|
|
ScoutTriageQueue.scout_id == scout.id,
|
|
ScoutTriageQueue.source_msg_ref == ref.source_msg_ref,
|
|
)
|
|
)
|
|
if existing.first() is not None:
|
|
return
|
|
|
|
try:
|
|
content = await connector.fetch_content(scout, ref)
|
|
except Exception:
|
|
logger.exception("scout %s: fetch_content failed for %s", scout.id, ref.source_msg_ref)
|
|
return
|
|
|
|
try:
|
|
verdict = await self._triage_llm(scout, content)
|
|
except Exception:
|
|
logger.exception("scout %s: triage_llm failed for %s", scout.id, ref.source_msg_ref)
|
|
return
|
|
|
|
if verdict.verdict == "spam":
|
|
if scout.auto_trash_spam:
|
|
try:
|
|
await connector.archive(scout, ref)
|
|
except Exception:
|
|
logger.exception("scout %s: archive failed for %s", scout.id, ref.source_msg_ref)
|
|
return
|
|
|
|
now = datetime.now(tz=timezone.utc)
|
|
row = ScoutTriageQueue(
|
|
id=str(uuid.uuid4()),
|
|
user_id=scout.user_id,
|
|
scout_id=scout.id,
|
|
source_type=connector.source_type,
|
|
source_msg_ref=ref.source_msg_ref,
|
|
triage_verdict=verdict.verdict,
|
|
triage_reason=verdict.reason,
|
|
status="queued",
|
|
triaged_at=now,
|
|
expires_at=now + timedelta(days=QUEUE_TTL_DAYS),
|
|
)
|
|
session.add(row)
|
|
try:
|
|
# Use a savepoint so an IntegrityError on race doesn't poison the
|
|
# outer session — works on both PostgreSQL (SAVEPOINT) and SQLite.
|
|
async with session.begin_nested():
|
|
await session.flush()
|
|
except IntegrityError:
|
|
# Race: another worker inserted between our SELECT and INSERT.
|
|
# The unique constraint did its job; safe to ignore.
|
|
logger.debug(
|
|
"scout %s: idempotent skip for %s (race on unique constraint)",
|
|
scout.id,
|
|
ref.source_msg_ref,
|
|
)
|
|
|
|
async def deliver_pending(self, user_id: uuid.UUID, ws) -> None:
|
|
"""Drain status='queued' rows for user, send scout_proposal WS frames, flip to 'delivered'."""
|
|
from app.scouts.connectors.base import ItemRef # noqa: PLC0415
|
|
async with self._session_factory() as session:
|
|
rows = (await session.execute(
|
|
select(ScoutTriageQueue).where(
|
|
ScoutTriageQueue.user_id == str(user_id),
|
|
ScoutTriageQueue.status == "queued",
|
|
)
|
|
)).scalars().all()
|
|
|
|
for row in rows:
|
|
try:
|
|
connector = get_connector(row.source_type)
|
|
except KeyError:
|
|
logger.warning("deliver_pending: no connector for %s", row.source_type)
|
|
continue
|
|
scout = await session.get(CloudScoutConfig, row.scout_id)
|
|
if scout is None:
|
|
continue
|
|
try:
|
|
meta = await connector.fetch_metadata(scout, ItemRef(source_msg_ref=row.source_msg_ref))
|
|
except Exception:
|
|
logger.exception("deliver_pending: fetch_metadata failed")
|
|
continue
|
|
|
|
payload = {
|
|
"type": "scout_proposal",
|
|
"proposal": {
|
|
"id": row.id,
|
|
"scout_id": row.scout_id,
|
|
"source_type": row.source_type,
|
|
"source_msg_ref": row.source_msg_ref,
|
|
"raw_subject": meta.subject,
|
|
"raw_snippet": meta.snippet,
|
|
"category": "unprocessed",
|
|
"payload": None,
|
|
},
|
|
}
|
|
await ws.send_json(payload)
|
|
row.status = "delivered"
|
|
row.delivered_at = datetime.now(tz=timezone.utc)
|
|
|
|
await session.commit()
|
|
|
|
async def ack_proposal(self, proposal_id: str) -> None:
|
|
"""Flip a delivered proposal to acked. Idempotent — no-op if already acked."""
|
|
async with self._session_factory() as session:
|
|
row = await session.get(ScoutTriageQueue, proposal_id)
|
|
if row is None:
|
|
return
|
|
row.status = "acked"
|
|
row.acked_at = datetime.now(tz=timezone.utc)
|
|
await session.commit()
|
|
|
|
async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict:
|
|
"""Call the scout-triage-system Langfuse prompt to classify an item as relevant or spam.
|
|
|
|
Uses gpt-4o-mini with JSON mode. Wraps the LLM call in a Langfuse generation
|
|
observation when Langfuse is configured.
|
|
"""
|
|
import json # noqa: PLC0415
|
|
|
|
from langchain_core.messages import HumanMessage, SystemMessage # noqa: PLC0415
|
|
|
|
_TRIAGE_FALLBACK = (
|
|
"You are a triage classifier for an executive-assistant scout that watches a "
|
|
"{source_type} feed.\n"
|
|
'The scout\'s purpose is: "{scout_purpose}".\n\n'
|
|
"Given one item, decide whether it is RELEVANT (worth surfacing to the user as a "
|
|
"potential task / event / note / project) or SPAM (advertising, mass marketing, "
|
|
"phishing, bulk notifications with no actionable content).\n\n"
|
|
"Item:\n"
|
|
" - Subject: {item_subject}\n"
|
|
" - From: {item_sender}\n"
|
|
" - Body (truncated): {item_body_truncated_2k}\n\n"
|
|
'Return JSON only, matching this schema:\n'
|
|
' {{"verdict": "relevant" | "spam", "reason": <short string>, "confidence": <0..1>}}\n\n'
|
|
"Be conservative on \"spam\" — if a message could plausibly be a personal/work "
|
|
"email, mark it relevant."
|
|
)
|
|
|
|
template, prompt_obj = get_prompt_or_fallback("scout-triage-system", _TRIAGE_FALLBACK)
|
|
|
|
body_trunc = (content.body_text or "")[:2000]
|
|
variables = dict(
|
|
source_type=scout.provider,
|
|
scout_purpose=scout.prompt_template or "",
|
|
item_subject=content.metadata.subject or "",
|
|
item_sender=content.metadata.sender or "",
|
|
item_body_truncated_2k=body_trunc,
|
|
)
|
|
|
|
if prompt_obj is not None:
|
|
try:
|
|
system_text = prompt_obj.compile(**variables)
|
|
if isinstance(system_text, list):
|
|
system_text = "\n".join(
|
|
m.get("content", "") for m in system_text if isinstance(m, dict)
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("scout triage: compile failed: %s", exc)
|
|
system_text = template.replace("{{source_type}}", variables["source_type"]) \
|
|
.replace("{{scout_purpose}}", variables["scout_purpose"]) \
|
|
.replace("{{item_subject}}", variables["item_subject"]) \
|
|
.replace("{{item_sender}}", variables["item_sender"]) \
|
|
.replace("{{item_body_truncated_2k}}", variables["item_body_truncated_2k"])
|
|
else:
|
|
system_text = template.format(**variables)
|
|
|
|
llm = get_llm(model="gpt-4o-mini", temperature=0)
|
|
llm_json = llm.bind(response_format={"type": "json_object"}) # type: ignore[attr-defined]
|
|
|
|
messages = [
|
|
SystemMessage(content=system_text),
|
|
HumanMessage(content="Classify this item."),
|
|
]
|
|
|
|
lf = get_langfuse()
|
|
if lf:
|
|
with lf.start_as_current_observation(
|
|
as_type="generation",
|
|
name="scout-triage",
|
|
model="gpt-4o-mini",
|
|
prompt=prompt_obj,
|
|
input=messages,
|
|
) as gen:
|
|
response = await llm_json.ainvoke(messages)
|
|
gen.update(output=response.content, usage=extract_usage(response))
|
|
else:
|
|
response = await llm_json.ainvoke(messages)
|
|
|
|
data = json.loads(response.content)
|
|
return TriageVerdict(**data)
|