feat(scouts): real triage LLM call via scout-triage-system prompt

This commit is contained in:
Roberto
2026-05-16 04:26:16 +02:00
parent d1016fd65a
commit 0c0299808c
2 changed files with 133 additions and 2 deletions

View File

@@ -28,6 +28,8 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback
from app.core.llm import get_llm
from app.db import async_session from app.db import async_session
from app.models import CloudScoutConfig, ScoutTriageQueue from app.models import CloudScoutConfig, ScoutTriageQueue
from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict from app.scouts.connectors.base import ItemContent, ItemRef, TriageVerdict
@@ -188,5 +190,81 @@ class ScoutEngine:
await session.commit() await session.commit()
async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict: async def _triage_llm(self, scout: CloudScoutConfig, content: ItemContent) -> TriageVerdict:
"""Stub — real implementation in Task 24.""" """Call the scout-triage-system Langfuse prompt to classify an item as relevant or spam.
raise NotImplementedError("Real triage LLM call lands in Task 24")
Uses gpt-4o-mini with JSON mode. Wraps the LLM call in a Langfuse generation
observation when Langfuse is configured.
"""
import json # noqa: PLC0415
from langchain_core.messages import HumanMessage, SystemMessage # noqa: PLC0415
_TRIAGE_FALLBACK = (
"You are a triage classifier for an executive-assistant scout that watches a "
"{source_type} feed.\n"
'The scout\'s purpose is: "{scout_purpose}".\n\n'
"Given one item, decide whether it is RELEVANT (worth surfacing to the user as a "
"potential task / event / note / project) or SPAM (advertising, mass marketing, "
"phishing, bulk notifications with no actionable content).\n\n"
"Item:\n"
" - Subject: {item_subject}\n"
" - From: {item_sender}\n"
" - Body (truncated): {item_body_truncated_2k}\n\n"
'Return JSON only, matching this schema:\n'
' {{"verdict": "relevant" | "spam", "reason": <short string>, "confidence": <0..1>}}\n\n'
"Be conservative on \"spam\" — if a message could plausibly be a personal/work "
"email, mark it relevant."
)
template, prompt_obj = get_prompt_or_fallback("scout-triage-system", _TRIAGE_FALLBACK)
body_trunc = (content.body_text or "")[:2000]
variables = dict(
source_type=scout.provider,
scout_purpose=scout.prompt_template or "",
item_subject=content.metadata.subject or "",
item_sender=content.metadata.sender or "",
item_body_truncated_2k=body_trunc,
)
if prompt_obj is not None:
try:
system_text = prompt_obj.compile(**variables)
if isinstance(system_text, list):
system_text = "\n".join(
m.get("content", "") for m in system_text if isinstance(m, dict)
)
except Exception as exc:
logger.warning("scout triage: compile failed: %s", exc)
system_text = template.replace("{{source_type}}", variables["source_type"]) \
.replace("{{scout_purpose}}", variables["scout_purpose"]) \
.replace("{{item_subject}}", variables["item_subject"]) \
.replace("{{item_sender}}", variables["item_sender"]) \
.replace("{{item_body_truncated_2k}}", variables["item_body_truncated_2k"])
else:
system_text = template.format(**variables)
llm = get_llm(model="gpt-4o-mini", temperature=0)
llm_json = llm.bind(response_format={"type": "json_object"}) # type: ignore[attr-defined]
messages = [
SystemMessage(content=system_text),
HumanMessage(content="Classify this item."),
]
lf = get_langfuse()
if lf:
with lf.start_as_current_observation(
as_type="generation",
name="scout-triage",
model="gpt-4o-mini",
prompt=prompt_obj,
input=messages,
) as gen:
response = await llm_json.ainvoke(messages)
gen.update(output=response.content, usage=extract_usage(response))
else:
response = await llm_json.ainvoke(messages)
data = json.loads(response.content)
return TriageVerdict(**data)

View File

@@ -172,6 +172,59 @@ async def test_idempotent_replay(monkeypatch):
assert len(rows) == 1, "Replay must not create duplicate queue rows" assert len(rows) == 1, "Replay must not create duplicate queue rows"
@pytest.mark.asyncio
async def test_triage_llm_parses_json_response(monkeypatch):
"""Real _triage_llm path: mock the LLM ainvoke, verify TriageVerdict parsed correctly."""
from unittest.mock import MagicMock # noqa: PLC0415
from app.models import CloudScoutConfig # noqa: PLC0415
scout = CloudScoutConfig(
id=str(uuid.uuid4()),
user_id="00000000-0000-0000-0000-000000000003",
provider="gmail",
name="test-scout",
data_types=[],
prompt_template="watch invoices and project updates",
schedule_cron="0 * * * *",
enabled=True,
auto_trash_spam=False,
device_inactivity_pause_days=14,
)
content = ItemContent(
metadata=ItemMetadata(subject="Invoice 42", sender="billing@acme.com"),
body_text="Payment of €1 200 is due on 2026-06-01. Please confirm receipt.",
)
# Build a fake LangChain response whose .content is valid JSON.
fake_response = MagicMock()
fake_response.content = '{"verdict": "relevant", "reason": "invoice due", "confidence": 0.92}'
fake_response.usage_metadata = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
# Fake LLM: .bind() returns self (or another mock with ainvoke).
fake_llm = MagicMock()
fake_llm.bind.return_value = fake_llm
fake_llm.ainvoke = AsyncMock(return_value=fake_response)
# Patch get_llm inside app.scouts.engine so our fake is used.
monkeypatch.setattr("app.scouts.engine.get_llm", lambda **kwargs: fake_llm)
# Disable Langfuse for this test.
monkeypatch.setattr("app.scouts.engine.get_langfuse", lambda: None)
# Use fallback prompt (no Langfuse) — patch get_prompt_or_fallback to return fallback.
monkeypatch.setattr(
"app.scouts.engine.get_prompt_or_fallback",
lambda name, fallback: (fallback, None),
)
engine = ScoutEngine(session_factory=_TestSessionLocal)
verdict = await engine._triage_llm(scout, content)
assert verdict.verdict == "relevant"
assert verdict.reason == "invoice due"
assert abs(verdict.confidence - 0.92) < 1e-6
fake_llm.ainvoke.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_deliver_pending_sends_one_frame_per_queued_row(monkeypatch): async def test_deliver_pending_sends_one_frame_per_queued_row(monkeypatch):
user_id = "00000000-0000-0000-0000-000000000003" user_id = "00000000-0000-0000-0000-000000000003"