From d5fea955611969869a6ae754107c4500aa78edfe Mon Sep 17 00:00:00 2001 From: Roberto Musso Date: Sat, 18 Apr 2026 22:18:53 +0200 Subject: [PATCH] =?UTF-8?q?Phase=203=20=E2=80=94=20WS=20frame=20+=20REST?= =?UTF-8?q?=20fallbacka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 4 + .gitignore | 3 + app/agents/note_agent.py | 5 + app/agents/project_agent.py | 6 + app/agents/task_agent.py | 6 + app/agents/timeline_agent.py | 25 ++++ app/api/routes/agents.py | 4 +- app/api/routes/chat.py | 59 ++++++++- app/api/routes/device_ws.py | 88 +++++++++++++- app/config/settings.py | 1 + app/core/agent_runner.py | 1 - app/core/brief_agent.py | 222 ++++++++++++++++++++++++++++++++++ app/core/deep_agent.py | 11 +- app/core/llm.py | 1 + app/main.py | 8 +- app/schemas.py | 13 ++ tests/test_agent_runner_v2.py | 1 - tests/test_brief_agent.py | 163 +++++++++++++++++++++++++ tests/test_device_ws.py | 1 - tests/test_integrations.py | 6 +- 20 files changed, 613 insertions(+), 15 deletions(-) create mode 100644 app/core/brief_agent.py create mode 100644 tests/test_brief_agent.py diff --git a/.env.example b/.env.example index 3c9e0f3..48f85ee 100644 --- a/.env.example +++ b/.env.example @@ -50,6 +50,10 @@ LLM_MODEL_UNIFIED_PROCESSOR= # Cloud-processor — fetches and processes data from cloud connectors. LLM_MODEL_CLOUD_PROCESSOR= +# Brief-agent — produces home and project text briefs. +# A small model (e.g. gpt-4o-mini) is sufficient. +# LLM_MODEL_BRIEF_AGENT= + # Setup-agent — guided journey to build an AgentConfig via WebSocket chat. LLM_MODEL_SETUP_AGENT= diff --git a/.gitignore b/.gitignore index 4e57c0d..7a5d5e6 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,9 @@ tests/fixtures/private*/ # OS .DS_Store + +# Smoke scripts (dev-only, not for CI) +scripts/smoke_*.py Thumbs.db # Claude Code diff --git a/app/agents/note_agent.py b/app/agents/note_agent.py index 3698b06..19a690a 100644 --- a/app/agents/note_agent.py +++ b/app/agents/note_agent.py @@ -122,3 +122,8 @@ NOTE_TOOLS: list[Any] = [ update_note, delete_note, ] + +NOTE_READ_TOOLS: list[Any] = [ + list_notes, + get_note, +] diff --git a/app/agents/project_agent.py b/app/agents/project_agent.py index 9f8f452..4689b31 100644 --- a/app/agents/project_agent.py +++ b/app/agents/project_agent.py @@ -125,3 +125,9 @@ PROJECT_TOOLS: list[Any] = [ update_project, delete_project, ] + +PROJECT_READ_TOOLS: list[Any] = [ + list_projects, + list_all_projects, + get_project, +] diff --git a/app/agents/task_agent.py b/app/agents/task_agent.py index 1a3880f..8688765 100644 --- a/app/agents/task_agent.py +++ b/app/agents/task_agent.py @@ -219,3 +219,9 @@ TASK_TOOLS: list[Any] = [ add_task_comment, delete_task_comment, ] + +TASK_READ_TOOLS: list[Any] = [ + list_tasks, + list_tasks_due_today, + list_task_comments, +] diff --git a/app/agents/timeline_agent.py b/app/agents/timeline_agent.py index f7fb52a..c6c4e7e 100644 --- a/app/agents/timeline_agent.py +++ b/app/agents/timeline_agent.py @@ -3,6 +3,7 @@ from __future__ import annotations import re +from datetime import datetime, timezone from typing import Any from langchain_core.tools import tool @@ -92,9 +93,33 @@ async def delete_timeline(timeline_id: str) -> str: return f"Timeline {timeline_id} deleted." +@tool +async def list_timelines_today() -> str: + """List all timeline events (milestones) whose date falls on today (UTC).""" + now = datetime.now(tz=timezone.utc) + start_ms = int(datetime(now.year, now.month, now.day, tzinfo=timezone.utc).timestamp() * 1000) + end_ms = start_ms + 86_400_000 - 1 + result = await execute_on_client( + action="select", + table="timelines", + filters={"dateFrom": start_ms, "dateTo": end_ms}, + ) + rows = result.get("rows", []) + if not rows: + return "No timeline events today." + lines = [f"- {r['title']} (date: {r['date']}, id: {r['id']})" for r in rows] + return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines) + + TIMELINE_TOOLS: list[Any] = [ list_timelines, + list_timelines_today, create_timeline, update_timeline, delete_timeline, ] + +TIMELINE_READ_TOOLS: list[Any] = [ + list_timelines, + list_timelines_today, +] diff --git a/app/api/routes/agents.py b/app/api/routes/agents.py index 24084a1..f170c82 100644 --- a/app/api/routes/agents.py +++ b/app/api/routes/agents.py @@ -16,8 +16,6 @@ import logging import uuid from datetime import datetime, timezone -logger = logging.getLogger(__name__) - from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -37,6 +35,8 @@ from app.schemas import ( UserProfile, ) +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/agents", tags=["agents"]) diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py index 00c01ec..3908b0f 100644 --- a/app/api/routes/chat.py +++ b/app/api/routes/chat.py @@ -5,13 +5,19 @@ WebSocket chat is handled by the unified device WS endpoint (/api/v1/ws/device). from __future__ import annotations -from fastapi import APIRouter, Depends +import uuid +from typing import Literal + +from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from app.api.deps import get_current_user +from app.core.brief_agent import run_home_brief, run_project_brief from app.core.deep_agent import run_home from app.core.llm import embed +from app.core.memory_middleware import MemoryMiddleware +from app.db import async_session from app.schemas import ChatRequest, UserProfile router = APIRouter(prefix="/chat", tags=["chat"]) @@ -45,6 +51,57 @@ async def chat( return JSONResponse(content={"response": response}) +class _BriefRequest(BaseModel): + mode: Literal["home", "project"] + project_id: str | None = None + + +class _BriefResponse(BaseModel): + response: str + + +@router.post("/brief", response_model=_BriefResponse) +async def brief( + body: _BriefRequest, + current_user: UserProfile = Depends(get_current_user), +) -> _BriefResponse: + """REST fallback for brief when the device WebSocket is not ready.""" + if body.mode == "project": + if not body.project_id: + raise HTTPException(status_code=422, detail="project_id required for project mode") + try: + uuid.UUID(body.project_id) + except ValueError: + raise HTTPException(status_code=422, detail="project_id must be a valid UUID") + + request_id = str(uuid.uuid4()) + async with async_session() as db: + memory = MemoryMiddleware(db) + memory_context = await memory.enrich_context( + current_user.id, + "", + trace_id=request_id, + session_id=request_id, + ) + + context: dict = { + "_debug": {"request_id": request_id, "user_id": current_user.id}, + **memory_context, + } + + chunks: list[str] = [] + if body.mode == "project": + stream = run_project_brief(current_user.id, body.project_id, context) # type: ignore[arg-type] + else: + stream = run_home_brief(current_user.id, context) + + async for event_type, data in stream: + if event_type == "token" and data: + chunks.append(str(data)) + + return _BriefResponse(response="".join(chunks)) + + @router.post("/embed", response_model=_EmbedResponse) async def embed_text( body: _EmbedRequest, diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index e868c2d..1c8abb5 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -42,6 +42,7 @@ from sqlalchemy import update from app.api.routes.agent_setup import handle_journey_message, handle_journey_start from app.config.settings import settings from app.core.agent_runner import trigger_pending_runs +from app.core.brief_agent import run_home_brief, run_project_brief from app.core.deep_agent import run_floating_stream, run_home_stream from app.core.device_manager import device_manager from app.core.memory_middleware import MemoryMiddleware @@ -49,7 +50,7 @@ from app.core.output_formatter import StreamFormatter from app.core.ws_context import clear_client_executor, set_client_executor from app.db import async_session from app.models import AgentRunLog -from app.schemas import WsFrameType +from app.schemas import WsFrameType, WsStreamEnd logger = logging.getLogger(__name__) @@ -158,6 +159,11 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None: _handle_floating_request(websocket, user_id, frame) ) + elif frame_type == WsFrameType.brief_request: + asyncio.create_task( + _handle_brief_request(websocket, user_id, frame) + ) + elif frame_type == WsFrameType.journey_start: asyncio.create_task( _handle_journey_start(websocket, user_id, frame) @@ -325,6 +331,86 @@ async def _handle_floating_request( ) +async def _handle_brief_request( + websocket: WebSocket, + user_id: str, + frame: dict, +) -> None: + """Handle a brief_request frame — streams plain-text brief back on the socket. + + No episode storage — briefs are not conversations. + """ + import uuid as _uuid + + request_id = frame.get("request_id") or str(uuid4()) + session_id = frame.get("session_id") or str(uuid4()) + mode: str = frame.get("mode", "home") + project_id: str | None = frame.get("project_id") + + logger.info( + "device_ws: brief_request_start user=%s req=%s mode=%s project_id=%s", + user_id, request_id, mode, project_id, + ) + + # Validate project_id for project mode before touching LLM. + if mode == "project": + try: + if not project_id: + raise ValueError("project_id required for project mode") + _uuid.UUID(project_id) + except (ValueError, AttributeError) as exc: + logger.warning( + "device_ws: brief_request invalid project_id user=%s req=%s: %s", + user_id, request_id, exc, + ) + await websocket.send_text( + WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json() + ) + return + + # Enrich context with memory (no user message — use empty string as probe). + async with async_session() as db: + memory = MemoryMiddleware(db) + memory_context = await memory.enrich_context( + user_id, + "", + trace_id=request_id, + session_id=session_id, + ) + + context: dict = { + "_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id}, + **memory_context, + } + + executor = await _make_ws_executor(websocket, user_id) + set_client_executor(executor) + try: + if mode == "project": + event_stream = run_project_brief(user_id, project_id, context) # type: ignore[arg-type] + else: + event_stream = run_home_brief(user_id, context) + + formatter = StreamFormatter(request_id=request_id) + async for ws_frame in formatter.format(event_stream): + await websocket.send_text(ws_frame.model_dump_json()) + except Exception as exc: + logger.error( + "device_ws: brief_request failed user=%s req=%s: %s", + user_id, request_id, exc, + ) + await websocket.send_text( + WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json() + ) + finally: + clear_client_executor() + + logger.info( + "device_ws: brief_request_end user=%s req=%s mode=%s", + user_id, request_id, mode, + ) + + # ── v4 Journey Handlers ───────────────────────────────────────────── diff --git a/app/config/settings.py b/app/config/settings.py index ebba918..25e42b8 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -26,6 +26,7 @@ class Settings(BaseSettings): LLM_MODEL_FLOATING_AGENT: str = "" # floating-agent (contextual chat) LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner) LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner) + LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs) LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide) LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining) diff --git a/app/core/agent_runner.py b/app/core/agent_runner.py index b12323d..7f66143 100644 --- a/app/core/agent_runner.py +++ b/app/core/agent_runner.py @@ -287,7 +287,6 @@ async def _run_agent_with_tools( return final_text for call in response.tool_calls: - call_id = str(call.get("id", "")) call_name = str(call.get("name", "")) call_args = call.get("args", {}) logger.info( diff --git a/app/core/brief_agent.py b/app/core/brief_agent.py new file mode 100644 index 0000000..7fcd00f --- /dev/null +++ b/app/core/brief_agent.py @@ -0,0 +1,222 @@ +"""Brief agent — produces plain-text home and project status briefs. + +Read-only tool subset only. Never calls _normalize_tagged_list_lines — +the brief prompt forbids XML tags, so skipping post-processing is intentional. +""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator +from datetime import date +from typing import Any + +from app.agents.note_agent import NOTE_READ_TOOLS +from app.agents.project_agent import PROJECT_READ_TOOLS +from app.agents.task_agent import TASK_READ_TOOLS +from app.agents.timeline_agent import TIMELINE_READ_TOOLS +from app.core.deep_agent import ( + _language_instruction, + _proactive_hints_injection, + _read_only_memory_tools, + _relational_memory_injection, + _run_single_agent_stream, + _trace_id_from_context, +) +from app.core.langfuse_client import compile_prompt, get_prompt_or_fallback + +_LANGUAGE_NAMES: dict[str, str] = { + "en": "English", "it": "Italian", "es": "Spanish", + "fr": "French", "de": "German", + "english": "English", "italian": "Italian", "italiano": "Italian", + "spanish": "Spanish", "español": "Spanish", + "french": "French", "français": "French", + "german": "German", "deutsch": "German", +} + +_HOME_BRIEF_FALLBACK = """\ +You are the user's personal assistant producing a short daily brief. + +ROLE +Act like a calm, attentive secretary writing a stand-up note for your boss. +Warm and human, never breezy. Never cheerful filler, never emojis, never +"here is your brief" meta-text. The user is opening the app mid-workday and +is probably stressed — your job is to lower cognitive load, not add noise. + +TOOLS — always call before writing +Pull fresh data every run. Do not invent counts or titles. Use at minimum: +- list_tasks_due_today — tasks the user owes today +- list_timelines_today — events starting or ending today +- list_all_projects — projects currently in progress or at risk +- memory_list_blocks / memory_get — personal context about people, clients, + payment habits, working preferences +If a tool returns nothing, simply omit that topic. Never report zeros. + +WHAT TO INCLUDE +1. Tasks due today (title + priority; group the 1-2 most important). +2. Timeline events starting or ending today (and anything that starts/ends + tomorrow if the user has a very light day). +3. Active projects that need a nudge — stalled, blocked, or awaiting input. +4. Memory-aware colour where it sharpens the brief. Examples: + - "Client Rossi tends to pay late — the Acme invoice is 6 days out." + - "You usually dislike meetings before 10:00 — the call at 09:30 is unusual." + Only add a memory line when it changes what the user does. Do not pad. + +WHAT TO OMIT +- Zero-counts ("no overdue items", "0 meetings today"). +- Statistics ("2 active projects, 3 completed tasks"). +- Headers, titles, greetings, sign-offs, dates, emojis, slang. +- Meta-phrases ("here is", "let me know if", "hope this helps"). +- XML/HTML tags of any kind. Plain prose only. + +LIGHT-DAY CLAUSE +If tasks + events + active-project-nudges together produce fewer than two +sentences of content, also list 1-2 projects in status on_hold or waiting +and ask a single, specific question about them — e.g. "Is the Bianchi +redesign still paused, or ready to pick back up?" One question max, grounded +in a real project name. + +VOICE +- Calm. Concise. Human. Short sentences. +- Use **bold** sparingly for task titles, project names, and people's names. +- No bullet lists. Flow as 2-4 sentences of prose. + +LENGTH +2-4 sentences total. Hard cap 4. If the day is truly empty, one sentence. + +Respond in the user's language ({language}). Today is {today}.\ +""" + +_PROJECT_BRIEF_FALLBACK = """\ +You are the project assistant producing a short status brief for ONE project. + +ROLE +A senior project manager summarising state-of-play for the owner. Factual, +sharp, forward-looking. Never reassuring filler, never emojis. + +SCOPE +Work only with project_id = {project_id}. Do not mention or pull data from +other projects. Use tools to fetch fresh data: +- get_project — current status, dates, description +- list_tasks(project_id) — open work, split by status +- list_timelines(project_id) — milestones hit, upcoming, overdue +- list_notes(project_id) — any recent decisions or blockers +- memory_get — relevant context about the client, collaborators, constraints + +STRUCTURE — follow exactly, one short paragraph per section, no headers +1. **State.** One sentence: current phase, health (on track / at risk / blocked), + and why. Cite the concrete signal (overdue milestone, stalled tasks, recent + blocker note). +2. **What's moving.** What was completed or progressed recently. Name specific + tasks or milestones. +3. **Next steps.** The 1-3 most important things the user should do next, in + priority order. Be concrete — task name, who owns it, when due if known. + If waiting on someone else, name them and what the ask is. +4. **Risks / memory-flagged items.** One line max. Only include when there is + a real risk or a relevant memory (e.g. late-paying client, tight deadline, + scope change). Omit the section entirely if nothing to say. + +WHAT TO OMIT +- Zero-counts ("no overdue tasks"). +- Generic advice ("keep up the good work"). +- Greetings, headers, bullet lists, emojis, sign-offs, meta-phrases. +- XML/HTML tags or bracketed id lists. Plain prose only. + +VOICE +- Direct. Factual. No fluff. +- Use **bold** sparingly for task titles, milestone names, and the owner's name. +- Short sentences. Prefer verbs over nouns ("Client review is blocking release" + not "There is a blocker which is the client review"). + +LENGTH +4-8 sentences total across the 3-4 sections. Hard cap 8. + +Respond in the user's language ({language}). Today is {today}.\ +""" + + +def _resolve_language(context: dict[str, Any]) -> str: + core = context.get("core_memory") or {} + raw = (core.get("language") or "en").strip().lower() + return _LANGUAGE_NAMES.get(raw, raw.title()) or "English" + + +def _build_read_tools(user_id: str, trace_id: str | None) -> list[Any]: + return [ + *TASK_READ_TOOLS, + *PROJECT_READ_TOOLS, + *TIMELINE_READ_TOOLS, + *NOTE_READ_TOOLS, + *_read_only_memory_tools(user_id, trace_id), + ] + + +async def run_home_brief( + user_id: str, + context: dict[str, Any], +) -> AsyncGenerator[tuple[str, Any], None]: + """Stream a plain-text daily home brief. + + Yields (event_type, data) tuples identical to _run_single_agent_stream. + Do NOT post-process output through _normalize_tagged_list_lines. + """ + trace_id = _trace_id_from_context(context) + today = date.today().isoformat() + language = _resolve_language(context) + + raw_template, langfuse_prompt = get_prompt_or_fallback("home_brief", _HOME_BRIEF_FALLBACK) + system_prompt = compile_prompt(raw_template, langfuse_prompt, language=language, today=today) + system_prompt += _relational_memory_injection(context) + system_prompt += _proactive_hints_injection(context) + system_prompt += _language_instruction(context) + if today not in system_prompt: + system_prompt += f"\nToday is {today}." + + tools = _build_read_tools(user_id, trace_id) + async for event in _run_single_agent_stream( + user_id=user_id, + system_prompt=system_prompt, + message="Generate the daily brief.", + context=context, + langfuse_prompt=langfuse_prompt, + agent_name="brief-agent", + tools=tools, + ): + yield event + + +async def run_project_brief( + user_id: str, + project_id: str, + context: dict[str, Any], +) -> AsyncGenerator[tuple[str, Any], None]: + """Stream a plain-text project status brief for project_id. + + Yields (event_type, data) tuples identical to _run_single_agent_stream. + Do NOT post-process output through _normalize_tagged_list_lines. + """ + trace_id = _trace_id_from_context(context) + today = date.today().isoformat() + language = _resolve_language(context) + + raw_template, langfuse_prompt = get_prompt_or_fallback("project_brief", _PROJECT_BRIEF_FALLBACK) + system_prompt = compile_prompt( + raw_template, langfuse_prompt, + language=language, today=today, project_id=project_id, + ) + system_prompt += _relational_memory_injection(context) + system_prompt += _proactive_hints_injection(context) + system_prompt += _language_instruction(context) + if today not in system_prompt: + system_prompt += f"\nToday is {today}." + + tools = _build_read_tools(user_id, trace_id) + async for event in _run_single_agent_stream( + user_id=user_id, + system_prompt=system_prompt, + message=f"Generate the project status brief for project {project_id}.", + context=context, + langfuse_prompt=langfuse_prompt, + agent_name="brief-agent", + tools=tools, + ): + yield event diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index b6ed4fc..4f071a8 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -489,6 +489,13 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: ] +def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]: + """Return memory tools that only read — safe for the read-only brief-agent subset.""" + all_mem = _memory_tools(user_id, trace_id) + _read_names = {"memory_list_blocks", "memory_get", "archival_memory_search", "conversation_search"} + return [t for t in all_mem if t.name in _read_names] + + def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]: return [*_all_tools(), *_memory_tools(user_id, trace_id)] @@ -792,12 +799,14 @@ async def _run_single_agent_stream( max_steps: int = 6, langfuse_prompt: Any = None, agent_name: str = "agent", + tools: list[Any] | None = None, ) -> AsyncGenerator[tuple[str, Any], None]: trace_id = _trace_id_from_context(context) session_id = _session_id_from_context(context) lf = get_langfuse() llm = get_agent_llm(agent_name) - tools = _all_tools_for_user(user_id, trace_id) + if tools is None: + tools = _all_tools_for_user(user_id, trace_id) model_context = _context_for_model(context) logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id) llm_with_tools = llm.bind_tools(tools) diff --git a/app/core/llm.py b/app/core/llm.py index 5ccbf9a..d06a381 100644 --- a/app/core/llm.py +++ b/app/core/llm.py @@ -102,6 +102,7 @@ _AGENT_MODEL_SETTINGS: dict[str, Callable[[], str]] = { "floating-agent": lambda: settings.LLM_MODEL_FLOATING_AGENT or settings.LLM_MODEL, "unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL, "cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL, + "brief-agent": lambda: settings.LLM_MODEL_BRIEF_AGENT or settings.LLM_MODEL, "setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL, "memory-extractor": lambda: settings.LLM_MODEL_MEMORY_EXTRACTOR or "gpt-4o-mini", "memory-miner": lambda: settings.LLM_MODEL_MEMORY_MINER or "gpt-4o-mini", diff --git a/app/main.py b/app/main.py index b3c9b8e..c35e020 100644 --- a/app/main.py +++ b/app/main.py @@ -4,6 +4,10 @@ import logging from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from app.api.middleware.rate_limit import TierRateLimitMiddleware +from app.api.middleware.sanitizer import SanitizerMiddleware +from app.config.settings import settings + logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", @@ -11,10 +15,6 @@ logging.basicConfig( logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) logging.getLogger("sqlalchemy.pool").setLevel(logging.WARNING) -from app.api.middleware.rate_limit import TierRateLimitMiddleware -from app.api.middleware.sanitizer import SanitizerMiddleware -from app.config.settings import settings - async def _memory_audit_cron_tick() -> None: """Weekly cron: contradiction scan + label canonicalization for all users (Phase 7).""" diff --git a/app/schemas.py b/app/schemas.py index da39ce9..5661c04 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -85,6 +85,8 @@ class WsFrameType(str, Enum): journey_start = "journey_start" journey_message = "journey_message" journey_reply = "journey_reply" + # ── v5 brief frame types ────────────────────────────────────────── + brief_request = "brief_request" class WsToolCall(BaseModel): @@ -163,6 +165,16 @@ class WsFloatingRequest(BaseModel): scope: WsFloatingScope +class WsBriefRequest(BaseModel): + """Client → Server: Request a plain-text brief (home or project).""" + + type: Literal[WsFrameType.brief_request] = WsFrameType.brief_request + request_id: str | None = None + session_id: str | None = None + mode: Literal["home", "project"] + project_id: str | None = None + + class WsStreamStart(BaseModel): """Server → Client: signals start of a streaming response.""" @@ -183,6 +195,7 @@ class WsStreamEnd(BaseModel): type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end request_id: str + error: str | None = None class WsDomain(BaseModel): diff --git a/tests/test_agent_runner_v2.py b/tests/test_agent_runner_v2.py index fb301f3..fc3ab85 100644 --- a/tests/test_agent_runner_v2.py +++ b/tests/test_agent_runner_v2.py @@ -382,7 +382,6 @@ async def test_eval_runner(runner_case, pytestconfig): await run_local_agent(_USER_ID, config, run_log, mgr) _, kwargs = mock_fin.call_args - inserts = [c for c in calls if c["action"] == "insert"] score, comment = _evaluate_case(case, calls, kwargs) if obs is not None: diff --git a/tests/test_brief_agent.py b/tests/test_brief_agent.py new file mode 100644 index 0000000..214c4a1 --- /dev/null +++ b/tests/test_brief_agent.py @@ -0,0 +1,163 @@ +"""Tests for Phase 3: brief agent WS frame + REST fallback. + +Coverage: + - run_home_brief streams non-empty text (mocked _run_single_agent_stream) + - run_project_brief with bogus UUID → WS returns stream_end with error, no crash + - _build_read_tools uses read-only subset only (no mutating tools) + - POST /chat/brief home mode returns {response: "..."} + - POST /chat/brief project mode with invalid UUID → 422 +""" + +from __future__ import annotations + +import uuid +from collections.abc import AsyncGenerator +from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest + +from tests.conftest import TEST_USER_IDS, auth_header + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_USER_ID = TEST_USER_IDS["pro"] +_EMPTY_CONTEXT: dict[str, Any] = {"core_memory": {}} + + +async def _fake_token_stream(*_args, **_kwargs) -> AsyncGenerator[tuple[str, Any], None]: + """Fake _run_single_agent_stream that yields two token events.""" + yield ("token", "Hello") + yield ("token", " world") + + +# --------------------------------------------------------------------------- +# Unit: run_home_brief streams non-empty text +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_run_home_brief_streams_text(): + with patch( + "app.core.brief_agent._run_single_agent_stream", + side_effect=_fake_token_stream, + ): + from app.core.brief_agent import run_home_brief + + chunks: list[str] = [] + async for event_type, data in run_home_brief(_USER_ID, _EMPTY_CONTEXT): + if event_type == "token": + chunks.append(str(data)) + + assert "".join(chunks) == "Hello world" + + +# --------------------------------------------------------------------------- +# Unit: run_project_brief streams text with valid UUID +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_run_project_brief_streams_text(): + project_id = str(uuid.uuid4()) + with patch( + "app.core.brief_agent._run_single_agent_stream", + side_effect=_fake_token_stream, + ): + from app.core.brief_agent import run_project_brief + + chunks: list[str] = [] + async for event_type, data in run_project_brief(_USER_ID, project_id, _EMPTY_CONTEXT): + if event_type == "token": + chunks.append(str(data)) + + assert "".join(chunks) == "Hello world" + + +# --------------------------------------------------------------------------- +# Unit: _build_read_tools uses read-only subset (no write tools) +# --------------------------------------------------------------------------- + +def test_build_read_tools_read_only_subset(): + from app.agents.note_agent import NOTE_READ_TOOLS + from app.agents.project_agent import PROJECT_READ_TOOLS + from app.agents.task_agent import TASK_READ_TOOLS + from app.agents.timeline_agent import TIMELINE_READ_TOOLS + from app.core.brief_agent import _build_read_tools + + tools = _build_read_tools(_USER_ID, None) + tool_names = {getattr(t, "name", None) or getattr(t, "__name__", str(t)) for t in tools} + + # Read-only exports must be present. + for read_list in (TASK_READ_TOOLS, PROJECT_READ_TOOLS, TIMELINE_READ_TOOLS, NOTE_READ_TOOLS): + for t in read_list: + name = getattr(t, "name", None) or getattr(t, "__name__", str(t)) + assert name in tool_names, f"Read tool {name!r} missing from _build_read_tools" + + # No mutating tools (e.g. create_task, update_task, delete_task). + mutating = {"create_task", "update_task", "delete_task", "create_project", + "update_project", "delete_project", "create_note", "update_note", + "delete_note", "memory_add", "memory_update", "memory_delete"} + overlap = tool_names & mutating + assert not overlap, f"Mutating tools in brief read-only subset: {overlap}" + + +# --------------------------------------------------------------------------- +# Integration: POST /chat/brief — home mode +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _override_db(db_session): + from app.db import get_session + from app.main import app + + async def _gen(): + yield db_session + + app.dependency_overrides[get_session] = _gen + yield + app.dependency_overrides.pop(get_session, None) + + +@pytest.mark.asyncio +async def test_rest_brief_home_returns_response(client): + async def _fake_home_brief(user_id, context): + yield ("token", "Today looks light.") + + with ( + patch("app.api.routes.chat.run_home_brief", side_effect=_fake_home_brief), + patch( + "app.api.routes.chat.MemoryMiddleware.enrich_context", + new=AsyncMock(return_value={}), + ), + ): + res = client.post( + "/api/v1/chat/brief", + json={"mode": "home"}, + headers=auth_header("pro"), + ) + + assert res.status_code == 200 + data = res.json() + assert data["response"] == "Today looks light." + + +@pytest.mark.asyncio +async def test_rest_brief_project_invalid_uuid_returns_422(client): + res = client.post( + "/api/v1/chat/brief", + json={"mode": "project", "project_id": "not-a-uuid"}, + headers=auth_header("pro"), + ) + assert res.status_code == 422 + + +@pytest.mark.asyncio +async def test_rest_brief_project_missing_uuid_returns_422(client): + res = client.post( + "/api/v1/chat/brief", + json={"mode": "project"}, + headers=auth_header("pro"), + ) + assert res.status_code == 422 diff --git a/tests/test_device_ws.py b/tests/test_device_ws.py index 1dc457e..b0307c3 100644 --- a/tests/test_device_ws.py +++ b/tests/test_device_ws.py @@ -201,7 +201,6 @@ def test_ws_device_invalid_first_frame_closes(client): def test_ws_device_tool_result_dispatched(client): """tool_result frame is routed to the DeviceConnectionManager.""" token = make_jwt(tier="free") - user_id = TEST_USER_IDS["free"] from app.core.device_manager import device_manager as dm diff --git a/tests/test_integrations.py b/tests/test_integrations.py index 242095f..e018609 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -328,7 +328,7 @@ def _make_gmail_message( class TestGmailClientFetchMessages: """GmailClient.fetch_messages tests with mocked Google API.""" - def _make_client(self) -> "GmailClient": + def _make_client(self): from app.integrations.gmail import GmailClient return GmailClient(_TOKEN_DICT) @@ -509,7 +509,7 @@ def _make_graph_teams_message( class TestMSGraphClientFetchEmails: """MSGraphClient.fetch_emails tests with mocked httpx.""" - def _make_client(self) -> "MSGraphClient": + def _make_client(self): from app.integrations.ms_graph import MSGraphClient return MSGraphClient(_MS_TOKEN_DICT) @@ -608,7 +608,7 @@ class TestMSGraphClientFetchEmails: class TestMSGraphClientFetchMessages: """MSGraphClient.fetch_messages (Teams) tests.""" - def _make_client(self) -> "MSGraphClient": + def _make_client(self): from app.integrations.ms_graph import MSGraphClient return MSGraphClient(_MS_TOKEN_DICT)