diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index 4b47f42..5116b8e 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -41,6 +41,7 @@ from sqlalchemy import update from app.api.routes.scout_setup import handle_journey_message, handle_journey_start from app.config.settings import settings +from app.scouts.engine import ScoutEngine from app.core.scout_runner import trigger_pending_runs from app.core.scout_session_buffer import session_buffer from app.core.brief_agent import run_home_brief, run_project_brief @@ -118,6 +119,16 @@ async def device_ws(websocket: WebSocket) -> None: # Trigger any overdue agent runs now that the device is connected. asyncio.create_task(trigger_pending_runs(user_id, device_id, device_manager)) + # Drain any queued scout proposals and deliver to the client (non-blocking). + async def _deliver_pending_safe() -> None: + import uuid as _uuid # noqa: PLC0415 + try: + await ScoutEngine().deliver_pending(_uuid.UUID(user_id), websocket) + except Exception: + logger.exception("scout deliver_pending failed for user %s", user_id) + + asyncio.create_task(_deliver_pending_safe()) + # ── 4. Concurrent message loop + heartbeat ──────────────────────── try: await asyncio.gather( @@ -204,6 +215,14 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: _handle_contextual_scope_update(websocket, user_id, frame) ) + elif frame_type == "scout_proposal_ack": + proposal_id = frame.get("proposal_id") + if proposal_id: + try: + await ScoutEngine().ack_proposal(proposal_id) + except Exception: + logger.exception("scout ack_proposal failed for %s", proposal_id) + elif frame_type == "pong": # Heartbeat ack — nothing to do, connection is alive. pass diff --git a/app/scouts/engine.py b/app/scouts/engine.py index 4cdfd1e..c5c8ccc 100644 --- a/app/scouts/engine.py +++ b/app/scouts/engine.py @@ -132,6 +132,61 @@ class ScoutEngine: ref.source_msg_ref, ) + async def deliver_pending(self, user_id: uuid.UUID, ws) -> None: + """Drain status='queued' rows for user, send scout_proposal WS frames, flip to 'delivered'.""" + from app.scouts.connectors.base import ItemRef # noqa: PLC0415 + async with self._session_factory() as session: + rows = (await session.execute( + select(ScoutTriageQueue).where( + ScoutTriageQueue.user_id == str(user_id), + ScoutTriageQueue.status == "queued", + ) + )).scalars().all() + + for row in rows: + try: + connector = get_connector(row.source_type) + except KeyError: + logger.warning("deliver_pending: no connector for %s", row.source_type) + continue + scout = await session.get(CloudScoutConfig, row.scout_id) + if scout is None: + continue + try: + meta = await connector.fetch_metadata(scout, ItemRef(source_msg_ref=row.source_msg_ref)) + except Exception: + logger.exception("deliver_pending: fetch_metadata failed") + continue + + payload = { + "type": "scout_proposal", + "proposal": { + "id": row.id, + "scout_id": row.scout_id, + "source_type": row.source_type, + "source_msg_ref": row.source_msg_ref, + "raw_subject": meta.subject, + "raw_snippet": meta.snippet, + "category": "unprocessed", + "payload": None, + }, + } + await ws.send_json(payload) + row.status = "delivered" + row.delivered_at = datetime.now(tz=timezone.utc) + + await session.commit() + + async def ack_proposal(self, proposal_id: str) -> None: + """Flip a delivered proposal to acked. Idempotent — no-op if already acked.""" + async with self._session_factory() as session: + row = await session.get(ScoutTriageQueue, proposal_id) + if row is None: + return + row.status = "acked" + row.acked_at = datetime.now(tz=timezone.utc) + await session.commit() + async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict: """Stub — real implementation in Task 24.""" raise NotImplementedError("Real triage LLM call lands in Task 24") diff --git a/tests/test_scout_engine.py b/tests/test_scout_engine.py index 2d9d8c8..cb7cddc 100644 --- a/tests/test_scout_engine.py +++ b/tests/test_scout_engine.py @@ -170,3 +170,48 @@ async def test_idempotent_replay(monkeypatch): async with _TestSessionLocal() as session: rows = (await session.execute(select(ScoutTriageQueue))).scalars().all() assert len(rows) == 1, "Replay must not create duplicate queue rows" + + +@pytest.mark.asyncio +async def test_deliver_pending_sends_one_frame_per_queued_row(monkeypatch): + user_id = "00000000-0000-0000-0000-000000000003" + scout_id = str(uuid.uuid4()) + now = datetime.now(tz=timezone.utc) + + async with _TestSessionLocal() as session: + session.add(CloudScoutConfig( + id=scout_id, user_id=user_id, provider="gmail", name="Test", + data_types=[], prompt_template="", schedule_cron="0 * * * *", + enabled=True, auto_trash_spam=False, device_inactivity_pause_days=14, + )) + for i in range(3): + session.add(ScoutTriageQueue( + id=str(uuid.uuid4()), user_id=user_id, scout_id=scout_id, + source_type="gmail", source_msg_ref=f"msg-{i}", + triage_verdict="relevant", status="queued", + triaged_at=now, expires_at=now + timedelta(days=30), + )) + await session.commit() + + connector = AsyncMock() + connector.source_type = "gmail" + connector.fetch_metadata = AsyncMock(side_effect=lambda scout, ref: ItemMetadata( + subject=f"sub-{ref.source_msg_ref}", snippet=f"snip-{ref.source_msg_ref}", + )) + register_connector(connector) + + sent = [] + ws = AsyncMock() + ws.send_json = AsyncMock(side_effect=lambda payload: sent.append(payload)) + + engine = ScoutEngine(session_factory=_TestSessionLocal) + await engine.deliver_pending(uuid.UUID(user_id), ws) + + assert len(sent) == 3 + assert all(s["type"] == "scout_proposal" for s in sent) + subjects = {s["proposal"]["raw_subject"] for s in sent} + assert subjects == {"sub-msg-0", "sub-msg-1", "sub-msg-2"} + async with _TestSessionLocal() as session: + rows = (await session.execute(select(ScoutTriageQueue))).scalars().all() + assert all(r.status == "delivered" for r in rows) + assert all(r.delivered_at is not None for r in rows)