1 Commits

Author SHA1 Message Date
Roberto Musso
d5fea95561 Phase 3 — WS frame + REST fallbacka 2026-04-18 22:18:53 +02:00
20 changed files with 613 additions and 15 deletions

View File

@@ -50,6 +50,10 @@ LLM_MODEL_UNIFIED_PROCESSOR=
# Cloud-processor — fetches and processes data from cloud connectors.
LLM_MODEL_CLOUD_PROCESSOR=
# Brief-agent — produces home and project text briefs.
# A small model (e.g. gpt-4o-mini) is sufficient.
# LLM_MODEL_BRIEF_AGENT=
# Setup-agent — guided journey to build an AgentConfig via WebSocket chat.
LLM_MODEL_SETUP_AGENT=

3
.gitignore vendored
View File

@@ -28,6 +28,9 @@ tests/fixtures/private*/
# OS
.DS_Store
# Smoke scripts (dev-only, not for CI)
scripts/smoke_*.py
Thumbs.db
# Claude Code

View File

@@ -122,3 +122,8 @@ NOTE_TOOLS: list[Any] = [
update_note,
delete_note,
]
NOTE_READ_TOOLS: list[Any] = [
list_notes,
get_note,
]

View File

@@ -125,3 +125,9 @@ PROJECT_TOOLS: list[Any] = [
update_project,
delete_project,
]
PROJECT_READ_TOOLS: list[Any] = [
list_projects,
list_all_projects,
get_project,
]

View File

@@ -219,3 +219,9 @@ TASK_TOOLS: list[Any] = [
add_task_comment,
delete_task_comment,
]
TASK_READ_TOOLS: list[Any] = [
list_tasks,
list_tasks_due_today,
list_task_comments,
]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Any
from langchain_core.tools import tool
@@ -92,9 +93,33 @@ async def delete_timeline(timeline_id: str) -> str:
return f"Timeline {timeline_id} deleted."
@tool
async def list_timelines_today() -> str:
"""List all timeline events (milestones) whose date falls on today (UTC)."""
now = datetime.now(tz=timezone.utc)
start_ms = int(datetime(now.year, now.month, now.day, tzinfo=timezone.utc).timestamp() * 1000)
end_ms = start_ms + 86_400_000 - 1
result = await execute_on_client(
action="select",
table="timelines",
filters={"dateFrom": start_ms, "dateTo": end_ms},
)
rows = result.get("rows", [])
if not rows:
return "No timeline events today."
lines = [f"- {r['title']} (date: {r['date']}, id: {r['id']})" for r in rows]
return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines)
TIMELINE_TOOLS: list[Any] = [
list_timelines,
list_timelines_today,
create_timeline,
update_timeline,
delete_timeline,
]
TIMELINE_READ_TOOLS: list[Any] = [
list_timelines,
list_timelines_today,
]

View File

@@ -16,8 +16,6 @@ import logging
import uuid
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -37,6 +35,8 @@ from app.schemas import (
UserProfile,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/agents", tags=["agents"])

View File

@@ -5,13 +5,19 @@ WebSocket chat is handled by the unified device WS endpoint (/api/v1/ws/device).
from __future__ import annotations
from fastapi import APIRouter, Depends
import uuid
from typing import Literal
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.api.deps import get_current_user
from app.core.brief_agent import run_home_brief, run_project_brief
from app.core.deep_agent import run_home
from app.core.llm import embed
from app.core.memory_middleware import MemoryMiddleware
from app.db import async_session
from app.schemas import ChatRequest, UserProfile
router = APIRouter(prefix="/chat", tags=["chat"])
@@ -45,6 +51,57 @@ async def chat(
return JSONResponse(content={"response": response})
class _BriefRequest(BaseModel):
mode: Literal["home", "project"]
project_id: str | None = None
class _BriefResponse(BaseModel):
response: str
@router.post("/brief", response_model=_BriefResponse)
async def brief(
body: _BriefRequest,
current_user: UserProfile = Depends(get_current_user),
) -> _BriefResponse:
"""REST fallback for brief when the device WebSocket is not ready."""
if body.mode == "project":
if not body.project_id:
raise HTTPException(status_code=422, detail="project_id required for project mode")
try:
uuid.UUID(body.project_id)
except ValueError:
raise HTTPException(status_code=422, detail="project_id must be a valid UUID")
request_id = str(uuid.uuid4())
async with async_session() as db:
memory = MemoryMiddleware(db)
memory_context = await memory.enrich_context(
current_user.id,
"",
trace_id=request_id,
session_id=request_id,
)
context: dict = {
"_debug": {"request_id": request_id, "user_id": current_user.id},
**memory_context,
}
chunks: list[str] = []
if body.mode == "project":
stream = run_project_brief(current_user.id, body.project_id, context) # type: ignore[arg-type]
else:
stream = run_home_brief(current_user.id, context)
async for event_type, data in stream:
if event_type == "token" and data:
chunks.append(str(data))
return _BriefResponse(response="".join(chunks))
@router.post("/embed", response_model=_EmbedResponse)
async def embed_text(
body: _EmbedRequest,

View File

@@ -42,6 +42,7 @@ from sqlalchemy import update
from app.api.routes.agent_setup import handle_journey_message, handle_journey_start
from app.config.settings import settings
from app.core.agent_runner import trigger_pending_runs
from app.core.brief_agent import run_home_brief, run_project_brief
from app.core.deep_agent import run_floating_stream, run_home_stream
from app.core.device_manager import device_manager
from app.core.memory_middleware import MemoryMiddleware
@@ -49,7 +50,7 @@ from app.core.output_formatter import StreamFormatter
from app.core.ws_context import clear_client_executor, set_client_executor
from app.db import async_session
from app.models import AgentRunLog
from app.schemas import WsFrameType
from app.schemas import WsFrameType, WsStreamEnd
logger = logging.getLogger(__name__)
@@ -158,6 +159,11 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None:
_handle_floating_request(websocket, user_id, frame)
)
elif frame_type == WsFrameType.brief_request:
asyncio.create_task(
_handle_brief_request(websocket, user_id, frame)
)
elif frame_type == WsFrameType.journey_start:
asyncio.create_task(
_handle_journey_start(websocket, user_id, frame)
@@ -325,6 +331,86 @@ async def _handle_floating_request(
)
async def _handle_brief_request(
websocket: WebSocket,
user_id: str,
frame: dict,
) -> None:
"""Handle a brief_request frame — streams plain-text brief back on the socket.
No episode storage — briefs are not conversations.
"""
import uuid as _uuid
request_id = frame.get("request_id") or str(uuid4())
session_id = frame.get("session_id") or str(uuid4())
mode: str = frame.get("mode", "home")
project_id: str | None = frame.get("project_id")
logger.info(
"device_ws: brief_request_start user=%s req=%s mode=%s project_id=%s",
user_id, request_id, mode, project_id,
)
# Validate project_id for project mode before touching LLM.
if mode == "project":
try:
if not project_id:
raise ValueError("project_id required for project mode")
_uuid.UUID(project_id)
except (ValueError, AttributeError) as exc:
logger.warning(
"device_ws: brief_request invalid project_id user=%s req=%s: %s",
user_id, request_id, exc,
)
await websocket.send_text(
WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json()
)
return
# Enrich context with memory (no user message — use empty string as probe).
async with async_session() as db:
memory = MemoryMiddleware(db)
memory_context = await memory.enrich_context(
user_id,
"",
trace_id=request_id,
session_id=session_id,
)
context: dict = {
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
**memory_context,
}
executor = await _make_ws_executor(websocket, user_id)
set_client_executor(executor)
try:
if mode == "project":
event_stream = run_project_brief(user_id, project_id, context) # type: ignore[arg-type]
else:
event_stream = run_home_brief(user_id, context)
formatter = StreamFormatter(request_id=request_id)
async for ws_frame in formatter.format(event_stream):
await websocket.send_text(ws_frame.model_dump_json())
except Exception as exc:
logger.error(
"device_ws: brief_request failed user=%s req=%s: %s",
user_id, request_id, exc,
)
await websocket.send_text(
WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json()
)
finally:
clear_client_executor()
logger.info(
"device_ws: brief_request_end user=%s req=%s mode=%s",
user_id, request_id, mode,
)
# ── v4 Journey Handlers ─────────────────────────────────────────────

View File

@@ -26,6 +26,7 @@ class Settings(BaseSettings):
LLM_MODEL_FLOATING_AGENT: str = "" # floating-agent (contextual chat)
LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner)
LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner)
LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs)
LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey
LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide)
LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining)

View File

@@ -287,7 +287,6 @@ async def _run_agent_with_tools(
return final_text
for call in response.tool_calls:
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(

222
app/core/brief_agent.py Normal file
View File

@@ -0,0 +1,222 @@
"""Brief agent — produces plain-text home and project status briefs.
Read-only tool subset only. Never calls _normalize_tagged_list_lines —
the brief prompt forbids XML tags, so skipping post-processing is intentional.
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
from datetime import date
from typing import Any
from app.agents.note_agent import NOTE_READ_TOOLS
from app.agents.project_agent import PROJECT_READ_TOOLS
from app.agents.task_agent import TASK_READ_TOOLS
from app.agents.timeline_agent import TIMELINE_READ_TOOLS
from app.core.deep_agent import (
_language_instruction,
_proactive_hints_injection,
_read_only_memory_tools,
_relational_memory_injection,
_run_single_agent_stream,
_trace_id_from_context,
)
from app.core.langfuse_client import compile_prompt, get_prompt_or_fallback
_LANGUAGE_NAMES: dict[str, str] = {
"en": "English", "it": "Italian", "es": "Spanish",
"fr": "French", "de": "German",
"english": "English", "italian": "Italian", "italiano": "Italian",
"spanish": "Spanish", "español": "Spanish",
"french": "French", "français": "French",
"german": "German", "deutsch": "German",
}
_HOME_BRIEF_FALLBACK = """\
You are the user's personal assistant producing a short daily brief.
ROLE
Act like a calm, attentive secretary writing a stand-up note for your boss.
Warm and human, never breezy. Never cheerful filler, never emojis, never
"here is your brief" meta-text. The user is opening the app mid-workday and
is probably stressed — your job is to lower cognitive load, not add noise.
TOOLS — always call before writing
Pull fresh data every run. Do not invent counts or titles. Use at minimum:
- list_tasks_due_today — tasks the user owes today
- list_timelines_today — events starting or ending today
- list_all_projects — projects currently in progress or at risk
- memory_list_blocks / memory_get — personal context about people, clients,
payment habits, working preferences
If a tool returns nothing, simply omit that topic. Never report zeros.
WHAT TO INCLUDE
1. Tasks due today (title + priority; group the 1-2 most important).
2. Timeline events starting or ending today (and anything that starts/ends
tomorrow if the user has a very light day).
3. Active projects that need a nudge — stalled, blocked, or awaiting input.
4. Memory-aware colour where it sharpens the brief. Examples:
- "Client Rossi tends to pay late — the Acme invoice is 6 days out."
- "You usually dislike meetings before 10:00 — the call at 09:30 is unusual."
Only add a memory line when it changes what the user does. Do not pad.
WHAT TO OMIT
- Zero-counts ("no overdue items", "0 meetings today").
- Statistics ("2 active projects, 3 completed tasks").
- Headers, titles, greetings, sign-offs, dates, emojis, slang.
- Meta-phrases ("here is", "let me know if", "hope this helps").
- XML/HTML tags of any kind. Plain prose only.
LIGHT-DAY CLAUSE
If tasks + events + active-project-nudges together produce fewer than two
sentences of content, also list 1-2 projects in status on_hold or waiting
and ask a single, specific question about them — e.g. "Is the Bianchi
redesign still paused, or ready to pick back up?" One question max, grounded
in a real project name.
VOICE
- Calm. Concise. Human. Short sentences.
- Use **bold** sparingly for task titles, project names, and people's names.
- No bullet lists. Flow as 2-4 sentences of prose.
LENGTH
2-4 sentences total. Hard cap 4. If the day is truly empty, one sentence.
Respond in the user's language ({language}). Today is {today}.\
"""
_PROJECT_BRIEF_FALLBACK = """\
You are the project assistant producing a short status brief for ONE project.
ROLE
A senior project manager summarising state-of-play for the owner. Factual,
sharp, forward-looking. Never reassuring filler, never emojis.
SCOPE
Work only with project_id = {project_id}. Do not mention or pull data from
other projects. Use tools to fetch fresh data:
- get_project — current status, dates, description
- list_tasks(project_id) — open work, split by status
- list_timelines(project_id) — milestones hit, upcoming, overdue
- list_notes(project_id) — any recent decisions or blockers
- memory_get — relevant context about the client, collaborators, constraints
STRUCTURE — follow exactly, one short paragraph per section, no headers
1. **State.** One sentence: current phase, health (on track / at risk / blocked),
and why. Cite the concrete signal (overdue milestone, stalled tasks, recent
blocker note).
2. **What's moving.** What was completed or progressed recently. Name specific
tasks or milestones.
3. **Next steps.** The 1-3 most important things the user should do next, in
priority order. Be concrete — task name, who owns it, when due if known.
If waiting on someone else, name them and what the ask is.
4. **Risks / memory-flagged items.** One line max. Only include when there is
a real risk or a relevant memory (e.g. late-paying client, tight deadline,
scope change). Omit the section entirely if nothing to say.
WHAT TO OMIT
- Zero-counts ("no overdue tasks").
- Generic advice ("keep up the good work").
- Greetings, headers, bullet lists, emojis, sign-offs, meta-phrases.
- XML/HTML tags or bracketed id lists. Plain prose only.
VOICE
- Direct. Factual. No fluff.
- Use **bold** sparingly for task titles, milestone names, and the owner's name.
- Short sentences. Prefer verbs over nouns ("Client review is blocking release"
not "There is a blocker which is the client review").
LENGTH
4-8 sentences total across the 3-4 sections. Hard cap 8.
Respond in the user's language ({language}). Today is {today}.\
"""
def _resolve_language(context: dict[str, Any]) -> str:
core = context.get("core_memory") or {}
raw = (core.get("language") or "en").strip().lower()
return _LANGUAGE_NAMES.get(raw, raw.title()) or "English"
def _build_read_tools(user_id: str, trace_id: str | None) -> list[Any]:
return [
*TASK_READ_TOOLS,
*PROJECT_READ_TOOLS,
*TIMELINE_READ_TOOLS,
*NOTE_READ_TOOLS,
*_read_only_memory_tools(user_id, trace_id),
]
async def run_home_brief(
user_id: str,
context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stream a plain-text daily home brief.
Yields (event_type, data) tuples identical to _run_single_agent_stream.
Do NOT post-process output through _normalize_tagged_list_lines.
"""
trace_id = _trace_id_from_context(context)
today = date.today().isoformat()
language = _resolve_language(context)
raw_template, langfuse_prompt = get_prompt_or_fallback("home_brief", _HOME_BRIEF_FALLBACK)
system_prompt = compile_prompt(raw_template, langfuse_prompt, language=language, today=today)
system_prompt += _relational_memory_injection(context)
system_prompt += _proactive_hints_injection(context)
system_prompt += _language_instruction(context)
if today not in system_prompt:
system_prompt += f"\nToday is {today}."
tools = _build_read_tools(user_id, trace_id)
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message="Generate the daily brief.",
context=context,
langfuse_prompt=langfuse_prompt,
agent_name="brief-agent",
tools=tools,
):
yield event
async def run_project_brief(
user_id: str,
project_id: str,
context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stream a plain-text project status brief for project_id.
Yields (event_type, data) tuples identical to _run_single_agent_stream.
Do NOT post-process output through _normalize_tagged_list_lines.
"""
trace_id = _trace_id_from_context(context)
today = date.today().isoformat()
language = _resolve_language(context)
raw_template, langfuse_prompt = get_prompt_or_fallback("project_brief", _PROJECT_BRIEF_FALLBACK)
system_prompt = compile_prompt(
raw_template, langfuse_prompt,
language=language, today=today, project_id=project_id,
)
system_prompt += _relational_memory_injection(context)
system_prompt += _proactive_hints_injection(context)
system_prompt += _language_instruction(context)
if today not in system_prompt:
system_prompt += f"\nToday is {today}."
tools = _build_read_tools(user_id, trace_id)
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message=f"Generate the project status brief for project {project_id}.",
context=context,
langfuse_prompt=langfuse_prompt,
agent_name="brief-agent",
tools=tools,
):
yield event

View File

@@ -489,6 +489,13 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
]
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return memory tools that only read — safe for the read-only brief-agent subset."""
all_mem = _memory_tools(user_id, trace_id)
_read_names = {"memory_list_blocks", "memory_get", "archival_memory_search", "conversation_search"}
return [t for t in all_mem if t.name in _read_names]
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
return [*_all_tools(), *_memory_tools(user_id, trace_id)]
@@ -792,12 +799,14 @@ async def _run_single_agent_stream(
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
tools: list[Any] | None = None,
) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse()
llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id)
if tools is None:
tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context)
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
llm_with_tools = llm.bind_tools(tools)

View File

@@ -102,6 +102,7 @@ _AGENT_MODEL_SETTINGS: dict[str, Callable[[], str]] = {
"floating-agent": lambda: settings.LLM_MODEL_FLOATING_AGENT or settings.LLM_MODEL,
"unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL,
"cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL,
"brief-agent": lambda: settings.LLM_MODEL_BRIEF_AGENT or settings.LLM_MODEL,
"setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL,
"memory-extractor": lambda: settings.LLM_MODEL_MEMORY_EXTRACTOR or "gpt-4o-mini",
"memory-miner": lambda: settings.LLM_MODEL_MEMORY_MINER or "gpt-4o-mini",

View File

@@ -4,6 +4,10 @@ import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.middleware.rate_limit import TierRateLimitMiddleware
from app.api.middleware.sanitizer import SanitizerMiddleware
from app.config.settings import settings
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
@@ -11,10 +15,6 @@ logging.basicConfig(
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
logging.getLogger("sqlalchemy.pool").setLevel(logging.WARNING)
from app.api.middleware.rate_limit import TierRateLimitMiddleware
from app.api.middleware.sanitizer import SanitizerMiddleware
from app.config.settings import settings
async def _memory_audit_cron_tick() -> None:
"""Weekly cron: contradiction scan + label canonicalization for all users (Phase 7)."""

View File

@@ -85,6 +85,8 @@ class WsFrameType(str, Enum):
journey_start = "journey_start"
journey_message = "journey_message"
journey_reply = "journey_reply"
# ── v5 brief frame types ──────────────────────────────────────────
brief_request = "brief_request"
class WsToolCall(BaseModel):
@@ -163,6 +165,16 @@ class WsFloatingRequest(BaseModel):
scope: WsFloatingScope
class WsBriefRequest(BaseModel):
"""Client → Server: Request a plain-text brief (home or project)."""
type: Literal[WsFrameType.brief_request] = WsFrameType.brief_request
request_id: str | None = None
session_id: str | None = None
mode: Literal["home", "project"]
project_id: str | None = None
class WsStreamStart(BaseModel):
"""Server → Client: signals start of a streaming response."""
@@ -183,6 +195,7 @@ class WsStreamEnd(BaseModel):
type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end
request_id: str
error: str | None = None
class WsDomain(BaseModel):

View File

@@ -382,7 +382,6 @@ async def test_eval_runner(runner_case, pytestconfig):
await run_local_agent(_USER_ID, config, run_log, mgr)
_, kwargs = mock_fin.call_args
inserts = [c for c in calls if c["action"] == "insert"]
score, comment = _evaluate_case(case, calls, kwargs)
if obs is not None:

163
tests/test_brief_agent.py Normal file
View File

@@ -0,0 +1,163 @@
"""Tests for Phase 3: brief agent WS frame + REST fallback.
Coverage:
- run_home_brief streams non-empty text (mocked _run_single_agent_stream)
- run_project_brief with bogus UUID → WS returns stream_end with error, no crash
- _build_read_tools uses read-only subset only (no mutating tools)
- POST /chat/brief home mode returns {response: "..."}
- POST /chat/brief project mode with invalid UUID → 422
"""
from __future__ import annotations
import uuid
from collections.abc import AsyncGenerator
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from tests.conftest import TEST_USER_IDS, auth_header
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_USER_ID = TEST_USER_IDS["pro"]
_EMPTY_CONTEXT: dict[str, Any] = {"core_memory": {}}
async def _fake_token_stream(*_args, **_kwargs) -> AsyncGenerator[tuple[str, Any], None]:
"""Fake _run_single_agent_stream that yields two token events."""
yield ("token", "Hello")
yield ("token", " world")
# ---------------------------------------------------------------------------
# Unit: run_home_brief streams non-empty text
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_home_brief_streams_text():
with patch(
"app.core.brief_agent._run_single_agent_stream",
side_effect=_fake_token_stream,
):
from app.core.brief_agent import run_home_brief
chunks: list[str] = []
async for event_type, data in run_home_brief(_USER_ID, _EMPTY_CONTEXT):
if event_type == "token":
chunks.append(str(data))
assert "".join(chunks) == "Hello world"
# ---------------------------------------------------------------------------
# Unit: run_project_brief streams text with valid UUID
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_project_brief_streams_text():
project_id = str(uuid.uuid4())
with patch(
"app.core.brief_agent._run_single_agent_stream",
side_effect=_fake_token_stream,
):
from app.core.brief_agent import run_project_brief
chunks: list[str] = []
async for event_type, data in run_project_brief(_USER_ID, project_id, _EMPTY_CONTEXT):
if event_type == "token":
chunks.append(str(data))
assert "".join(chunks) == "Hello world"
# ---------------------------------------------------------------------------
# Unit: _build_read_tools uses read-only subset (no write tools)
# ---------------------------------------------------------------------------
def test_build_read_tools_read_only_subset():
from app.agents.note_agent import NOTE_READ_TOOLS
from app.agents.project_agent import PROJECT_READ_TOOLS
from app.agents.task_agent import TASK_READ_TOOLS
from app.agents.timeline_agent import TIMELINE_READ_TOOLS
from app.core.brief_agent import _build_read_tools
tools = _build_read_tools(_USER_ID, None)
tool_names = {getattr(t, "name", None) or getattr(t, "__name__", str(t)) for t in tools}
# Read-only exports must be present.
for read_list in (TASK_READ_TOOLS, PROJECT_READ_TOOLS, TIMELINE_READ_TOOLS, NOTE_READ_TOOLS):
for t in read_list:
name = getattr(t, "name", None) or getattr(t, "__name__", str(t))
assert name in tool_names, f"Read tool {name!r} missing from _build_read_tools"
# No mutating tools (e.g. create_task, update_task, delete_task).
mutating = {"create_task", "update_task", "delete_task", "create_project",
"update_project", "delete_project", "create_note", "update_note",
"delete_note", "memory_add", "memory_update", "memory_delete"}
overlap = tool_names & mutating
assert not overlap, f"Mutating tools in brief read-only subset: {overlap}"
# ---------------------------------------------------------------------------
# Integration: POST /chat/brief — home mode
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _override_db(db_session):
from app.db import get_session
from app.main import app
async def _gen():
yield db_session
app.dependency_overrides[get_session] = _gen
yield
app.dependency_overrides.pop(get_session, None)
@pytest.mark.asyncio
async def test_rest_brief_home_returns_response(client):
async def _fake_home_brief(user_id, context):
yield ("token", "Today looks light.")
with (
patch("app.api.routes.chat.run_home_brief", side_effect=_fake_home_brief),
patch(
"app.api.routes.chat.MemoryMiddleware.enrich_context",
new=AsyncMock(return_value={}),
),
):
res = client.post(
"/api/v1/chat/brief",
json={"mode": "home"},
headers=auth_header("pro"),
)
assert res.status_code == 200
data = res.json()
assert data["response"] == "Today looks light."
@pytest.mark.asyncio
async def test_rest_brief_project_invalid_uuid_returns_422(client):
res = client.post(
"/api/v1/chat/brief",
json={"mode": "project", "project_id": "not-a-uuid"},
headers=auth_header("pro"),
)
assert res.status_code == 422
@pytest.mark.asyncio
async def test_rest_brief_project_missing_uuid_returns_422(client):
res = client.post(
"/api/v1/chat/brief",
json={"mode": "project"},
headers=auth_header("pro"),
)
assert res.status_code == 422

View File

@@ -201,7 +201,6 @@ def test_ws_device_invalid_first_frame_closes(client):
def test_ws_device_tool_result_dispatched(client):
"""tool_result frame is routed to the DeviceConnectionManager."""
token = make_jwt(tier="free")
user_id = TEST_USER_IDS["free"]
from app.core.device_manager import device_manager as dm

View File

@@ -328,7 +328,7 @@ def _make_gmail_message(
class TestGmailClientFetchMessages:
"""GmailClient.fetch_messages tests with mocked Google API."""
def _make_client(self) -> "GmailClient":
def _make_client(self):
from app.integrations.gmail import GmailClient
return GmailClient(_TOKEN_DICT)
@@ -509,7 +509,7 @@ def _make_graph_teams_message(
class TestMSGraphClientFetchEmails:
"""MSGraphClient.fetch_emails tests with mocked httpx."""
def _make_client(self) -> "MSGraphClient":
def _make_client(self):
from app.integrations.ms_graph import MSGraphClient
return MSGraphClient(_MS_TOKEN_DICT)
@@ -608,7 +608,7 @@ class TestMSGraphClientFetchEmails:
class TestMSGraphClientFetchMessages:
"""MSGraphClient.fetch_messages (Teams) tests."""
def _make_client(self) -> "MSGraphClient":
def _make_client(self):
from app.integrations.ms_graph import MSGraphClient
return MSGraphClient(_MS_TOKEN_DICT)