Add task brief research agent: Stage 1 deep-research + canvas draft emission

- run_task_brief_research() runner with brief-specific tool set and max_steps=12
- New agents: client_agent (list_clients, get_client) and relations_agent (query_relations)
- search_associative tool wrapping MemoryMiddleware semantic search
- BRIEF_RESEARCH_TOOLS constant: read-only task/project/note/timeline + memory + client/relations
- canvas block extraction in output_formatter (splits visible text from <canvas> draft)
- device_ws.py: task_brief_research request type; emits canvas_draft mutation on stream_end
- Stage 2 briefMode: briefing_context injected into floating system prompt when present
- briefingContext kwarg wired through compile_prompt call chain

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto
2026-05-04 15:09:58 +02:00
parent 6f4c68b359
commit 67562b8092
9 changed files with 427 additions and 5 deletions

View File

@@ -56,6 +56,10 @@ LLM_MODEL_CLOUD_PROCESSOR=
# A small model (e.g. gpt-4o-mini) is sufficient. # A small model (e.g. gpt-4o-mini) is sufficient.
# LLM_MODEL_BRIEF_AGENT= # LLM_MODEL_BRIEF_AGENT=
# Task-brief-agent — per-task deep research (Stage 1 executive assistant).
# Needs tool-use + reasoning; a capable model recommended (e.g. gpt-4o, gemini-2.5-flash).
# LLM_MODEL_TASK_BRIEF_AGENT=
# Setup-agent — guided journey to build an AgentConfig via WebSocket chat. # Setup-agent — guided journey to build an AgentConfig via WebSocket chat.
LLM_MODEL_SETUP_AGENT= LLM_MODEL_SETUP_AGENT=

View File

@@ -0,0 +1,52 @@
"""Client agent — read-only tools for the clients table."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.tools import tool
from app.core.ws_context import execute_on_client
@tool
async def list_clients(search: str = "", limit: int = 20) -> str:
"""List clients, optionally filtered by a name/email substring search.
search: optional substring to match against client name or email.
limit: max rows to return (default 20).
"""
filters: dict[str, Any] = {"limit": limit}
if search:
filters["search"] = search
result = await execute_on_client(action="select", table="clients", filters=filters)
rows = result.get("rows", [])
if not rows:
return "No clients found."
lines = [
f"- {r.get('name', '?')} (id: {r.get('id')}, email: {r.get('email', '')}, "
f"company: {r.get('company', '')})"
for r in rows
]
return f"Found {len(rows)} client(s):\n" + "\n".join(lines)
@tool
async def get_client(id: str) -> str:
"""Get full details for one client by UUID.
id: the client's UUID.
"""
if not id:
return "Client id is required."
result = await execute_on_client(action="get", table="clients", data={"id": id})
row = result.get("row") or result.get("rows", [None])[0] if result else None
if not row:
return f"Client '{id}' not found."
return f"Client details:\n{json.dumps(row, ensure_ascii=False, indent=2)}"
CLIENT_TOOLS: list[Any] = [list_clients, get_client]

View File

@@ -0,0 +1,63 @@
"""Relations agent — read-only tool wrapping MemoryMiddleware.query_relations."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import tool
from app.core.memory_middleware import MemoryMiddleware
from app.db import async_session
# Injected at tool-factory time by _brief_research_tools(); not a module-level global.
# Each tool closure captures the user_id bound at factory time.
def make_query_relations_tool(user_id: str, trace_id: str | None = None) -> Any:
"""Return a query_relations tool bound to *user_id*."""
@tool
async def query_relations(
subject_label: str = "",
predicate: str = "",
object_label: str = "",
limit: int = 10,
) -> str:
"""Query the relational memory graph for entity relationships.
Returns rows where subject ↔ predicate ↔ object match the given filters.
All parameters are optional — omit to retrieve all relations up to limit.
subject_label: entity label on the left side (e.g. a client name, "Acme Corp").
predicate: relationship type (e.g. "mentioned_in", "works_at", "related_to").
object_label: entity label on the right side (e.g. a project name, "Website Redesign").
limit: max rows to return (default 10).
"""
import logging
logger = logging.getLogger(__name__)
logger.info(
"relations_agent: query_relations trace=%s user=%s subject=%r predicate=%r object=%r",
trace_id or "-", user_id, subject_label, predicate, object_label,
)
async with async_session() as db:
memory = MemoryMiddleware(db)
rows = await memory.query_relations(
user_id=user_id,
subject=subject_label or None,
predicate=predicate or None,
object_=object_label or None,
limit=limit,
)
if not rows:
return "No relational memory entries found for the given filters."
lines = [
f"- {r.subject_label} —[{r.predicate}]→ {r.object_label}"
+ (f" (confidence: {r.confidence:.2f})" if r.confidence is not None else "")
for r in rows
]
return f"Found {len(rows)} relation(s):\n" + "\n".join(lines)
return query_relations

View File

@@ -43,7 +43,8 @@ from app.api.routes.agent_setup import handle_journey_message, handle_journey_st
from app.config.settings import settings from app.config.settings import settings
from app.core.agent_runner import trigger_pending_runs from app.core.agent_runner import trigger_pending_runs
from app.core.brief_agent import run_home_brief, run_project_brief from app.core.brief_agent import run_home_brief, run_project_brief
from app.core.deep_agent import run_floating_stream, run_home_stream from app.core.deep_agent import run_floating_stream, run_home_stream, run_task_brief_research_stream
from app.core.output_formatter import extract_canvas_block
from app.core.device_manager import device_manager from app.core.device_manager import device_manager
from app.core.memory_middleware import MemoryMiddleware from app.core.memory_middleware import MemoryMiddleware
from app.core.output_formatter import StreamFormatter from app.core.output_formatter import StreamFormatter
@@ -164,6 +165,11 @@ async def _message_loop(websocket: WebSocket, user_id: str) -> None:
_handle_brief_request(websocket, user_id, frame) _handle_brief_request(websocket, user_id, frame)
) )
elif frame_type == WsFrameType.task_brief_request:
asyncio.create_task(
_handle_task_brief_request(websocket, user_id, frame)
)
elif frame_type == WsFrameType.journey_start: elif frame_type == WsFrameType.journey_start:
asyncio.create_task( asyncio.create_task(
_handle_journey_start(websocket, user_id, frame) _handle_journey_start(websocket, user_id, frame)
@@ -415,6 +421,97 @@ async def _handle_brief_request(
) )
# ── v6 Task Brief Handler ────────────────────────────────────────────
async def _handle_task_brief_request(
websocket: WebSocket,
user_id: str,
frame: dict,
) -> None:
"""Handle a task_brief_request frame — Stage-1 executive assistant deep research.
Streams the briefing markdown back to the client.
On stream_end, emits a ``canvas_draft`` mutation if the agent produced one.
"""
request_id = frame.get("request_id") or str(uuid4())
session_id = frame.get("session_id") or str(uuid4())
task_id: str = frame.get("task_id") or frame.get("taskId") or ""
logger.info(
"device_ws: task_brief_request_start user=%s req=%s task=%s [cache_miss]",
user_id, request_id, task_id,
)
if not task_id:
await websocket.send_text(
WsStreamEnd(request_id=request_id, error="task_id is required").model_dump_json()
)
return
async with async_session() as db:
memory = MemoryMiddleware(db)
memory_context = await memory.enrich_context(
user_id,
f"task brief: {task_id}",
trace_id=request_id,
session_id=session_id,
)
context: dict = {
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
"format_prefs": frame.get("format_prefs"),
**memory_context,
}
executor = await _make_ws_executor(websocket, user_id)
set_client_executor(executor)
response_chunks: list[str] = []
try:
event_stream = run_task_brief_research_stream(user_id, task_id, context)
formatter = StreamFormatter(request_id=request_id)
async for ws_frame in formatter.format(event_stream):
if ws_frame.type == "stream_text": # type: ignore[union-attr]
response_chunks.append(ws_frame.chunk) # type: ignore[union-attr]
await websocket.send_text(ws_frame.model_dump_json())
elif ws_frame.type == "stream_start":
await websocket.send_text(ws_frame.model_dump_json())
# stream_end is emitted below with mutations — skip formatter's version
except Exception as exc:
logger.error(
"device_ws: task_brief_request failed user=%s req=%s task=%s: %s",
user_id, request_id, task_id, exc,
)
await websocket.send_text(
WsStreamEnd(request_id=request_id, error=str(exc)).model_dump_json()
)
return
finally:
clear_client_executor()
# Extract canvas block then emit stream_end with optional mutations.
full_response = "".join(response_chunks)
_visible, canvas_content, canvas_kind = extract_canvas_block(full_response)
mutations: list[dict] = []
if canvas_content:
mutations.append({
"type": "canvas_draft",
"content": canvas_content,
"kind": canvas_kind,
})
await websocket.send_text(
WsStreamEnd(request_id=request_id, mutations=mutations or None).model_dump_json()
)
logger.info(
"device_ws: task_brief_request_end user=%s req=%s task=%s response_chars=%d canvas=%s",
user_id, request_id, task_id, len(full_response), canvas_kind or "none",
)
# ── v4 Journey Handlers ───────────────────────────────────────────── # ── v4 Journey Handlers ─────────────────────────────────────────────

View File

@@ -29,6 +29,7 @@ class Settings(BaseSettings):
LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner) LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner)
LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner) LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner)
LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs) LLM_MODEL_BRIEF_AGENT: str = "" # brief-agent (home + project text briefs)
LLM_MODEL_TASK_BRIEF_AGENT: str = "" # task-brief-agent (per-task deep research)
LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey
LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide) LLM_MODEL_MEMORY_EXTRACTOR: str = "" # memory-extractor (Phase 2 extract/decide)
LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining) LLM_MODEL_MEMORY_MINER: str = "" # memory-miner (Phase 5 proactive mining)

View File

@@ -12,8 +12,10 @@ from typing import Any, Literal
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool from langchain_core.tools import tool
from app.agents.client_agent import CLIENT_TOOLS
from app.agents.note_agent import NOTE_TOOLS from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.relations_agent import make_query_relations_tool
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.agent_session_buffer import session_buffer from app.core.agent_session_buffer import session_buffer
@@ -303,6 +305,80 @@ For specific dates not listed, compute local-midnight in the user timezone and c
{request_context}\ {request_context}\
""" """
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
# Research workflow
Follow these steps in order, using tools:
1. Read the task fully (title, description, due date, priority, status, project, comments).
2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
# Output structure
Write the briefing in the user's language. Use this exact structure:
**What needs to be done**
(12 sentences, concrete and specific — what action the user must take)
**Context you should know**
(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
**Suggested first step**
(one specific, immediately actionable instruction)
If this is a writing task, append a canvas block at the very end:
<canvas kind="email|document|message">
...ready-to-use draft here...
</canvas>
Do NOT include the canvas block for non-writing tasks.
Do NOT repeat verbatim task fields the user already sees in the UI.
Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
You are an executive assistant continuing a conversation with your principal.
You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
Your briefing:
---
{briefing_context}
---
Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
Stay terse — your principal is a busy executive.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = ( _FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
"You are a strict domain classifier for websocket floating requests. " "You are a strict domain classifier for websocket floating requests. "
"Return ONLY a JSON object with keys: type, id, section. " "Return ONLY a JSON object with keys: type, id, section. "
@@ -679,6 +755,25 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
lines = [f"- {item}" for item in results] lines = [f"- {item}" for item in results]
return "Recall memory results:\n" + "\n".join(lines) return "Recall memory results:\n" + "\n".join(lines)
@tool
async def search_associative(query: str, limit: int = 5) -> str:
"""Semantic search across associative (archival) memory for a given query.
Use this to surface long-term memories related to a topic, client, or task
that may not appear in recent episodes.
query: natural-language search phrase.
limit: max results (default 5).
"""
logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
async with async_session() as db:
memory = MemoryMiddleware(db)
results = await memory.search_archival(user_id, query, top_k=limit)
if not results:
return "No associative memory results found."
lines = [f"- {item}" for item in results]
return "Associative memory results:\n" + "\n".join(lines)
return [ return [
memory_list_blocks, memory_list_blocks,
memory_get, memory_get,
@@ -689,16 +784,33 @@ def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
archival_memory_insert, archival_memory_insert,
archival_memory_search, archival_memory_search,
conversation_search, conversation_search,
search_associative,
] ]
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]: def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return memory tools that only read — safe for the read-only brief-agent subset.""" """Return memory tools that only read — safe for the read-only brief-agent subset."""
all_mem = _memory_tools(user_id, trace_id) all_mem = _memory_tools(user_id, trace_id)
_read_names = {"memory_list_blocks", "memory_get", "archival_memory_search", "conversation_search"} _read_names = {
"memory_list_blocks", "memory_get", "archival_memory_search",
"conversation_search", "search_associative",
}
return [t for t in all_mem if t.name in _read_names] return [t for t in all_mem if t.name in _read_names]
def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return the full tool palette for Stage-1 task brief research (read-only)."""
return [
*TASK_TOOLS,
*PROJECT_TOOLS,
*NOTE_TOOLS,
*TIMELINE_TOOLS,
*CLIENT_TOOLS,
*_read_only_memory_tools(user_id, trace_id),
make_query_relations_tool(user_id, trace_id),
]
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]: def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
return [*_all_tools(), *_memory_tools(user_id, trace_id)] return [*_all_tools(), *_memory_tools(user_id, trace_id)]
@@ -1249,6 +1361,28 @@ async def run_floating_stream(
domain = await _infer_floating_domain(message, prepared_context) domain = await _infer_floating_domain(message, prepared_context)
yield "floating_domain", domain yield "floating_domain", domain
brief_mode: bool = bool(context.get("brief_mode"))
briefing_context_text: str = str(context.get("briefing_context") or "").strip()
if brief_mode and briefing_context_text:
# Stage 2: inject briefing as ground truth context.
# Pre-substitute {briefing_context} in the template (handles both Langfuse {{}} and fallback {})
# before compile_prompt sees the remaining standard variables.
template, langfuse_prompt = get_prompt_or_fallback(
"task_brief_followup_system",
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT,
)
system_prompt = compile_prompt(
template, langfuse_prompt,
date_context=_datetime_context_injection(prepared_context).strip(),
language_instruction=_language_instruction(prepared_context).strip(),
user_identity=_user_identity_injection(prepared_context).strip(),
relational_memory=_relational_memory_injection(prepared_context).strip(),
proactive_hints=_proactive_hints_injection(prepared_context).strip(),
request_context=_request_context_block(prepared_context),
briefing_context=briefing_context_text,
)
else:
system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context) system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
sanitizer = _FloatingStreamSanitizer() sanitizer = _FloatingStreamSanitizer()
emitted_sanitized = False emitted_sanitized = False
@@ -1283,6 +1417,49 @@ async def run_floating_stream(
yield "token", _fallback_from_raw_floating_text("".join(raw_chunks)) yield "token", _fallback_from_raw_floating_text("".join(raw_chunks))
async def run_task_brief_research_stream(
user_id: str,
task_id: str,
context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stage-1 executive assistant: deep research for one task.
Yields ``("token", chunk)`` events like other stream runners.
The final concatenated text may contain a ``<canvas kind="...">...</canvas>`` block
which the WS handler strips and emits as a ``canvas_draft`` mutation.
"""
prepared_context = await _prepare_context(f"task:{task_id}", context)
tools = _brief_research_tools(user_id, _trace_id_from_context(prepared_context))
# Inject task_id so the agent knows what to look up first.
research_message = (
f"Prepare a briefing dossier for task ID: {task_id}\n"
"Follow the research workflow: read the task, then project, then client, "
"then cross-project relations, then relevant memory. "
"End with a concrete suggested first step. "
"If this is a writing task, include a <canvas kind=\"...\"> draft."
)
system_prompt, langfuse_prompt = _build_system_prompt(
"task_brief_research_system",
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT,
prepared_context,
)
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message=research_message,
context=prepared_context,
max_steps=12,
langfuse_prompt=langfuse_prompt,
agent_name="task-brief-agent",
tools=tools,
conversation_history=None,
):
yield event
async def update_core_memory(user_id: str, key: str, value: str) -> None: async def update_core_memory(user_id: str, key: str, value: str) -> None:
"""Compatibility helper kept for callers that expect explicit memory update API.""" """Compatibility helper kept for callers that expect explicit memory update API."""
async with async_session() as db: async with async_session() as db:

View File

@@ -107,6 +107,7 @@ _AGENT_MODEL_SETTINGS: dict[str, Callable[[], str]] = {
"unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL, "unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL,
"cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL, "cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL,
"brief-agent": lambda: settings.LLM_MODEL_BRIEF_AGENT or settings.LLM_MODEL, "brief-agent": lambda: settings.LLM_MODEL_BRIEF_AGENT or settings.LLM_MODEL,
"task-brief-agent": lambda: settings.LLM_MODEL_TASK_BRIEF_AGENT or settings.LLM_MODEL,
"setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL, "setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL,
"memory-extractor": lambda: settings.LLM_MODEL_MEMORY_EXTRACTOR or "gpt-4o-mini", "memory-extractor": lambda: settings.LLM_MODEL_MEMORY_EXTRACTOR or "gpt-4o-mini",
"memory-miner": lambda: settings.LLM_MODEL_MEMORY_MINER or "gpt-4o-mini", "memory-miner": lambda: settings.LLM_MODEL_MEMORY_MINER or "gpt-4o-mini",

View File

@@ -2,11 +2,35 @@
from __future__ import annotations from __future__ import annotations
import re
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from typing import Any from typing import Any
from app.schemas import WsFloatingDomain, WsStreamEnd, WsStreamStart, WsStreamText from app.schemas import WsFloatingDomain, WsStreamEnd, WsStreamStart, WsStreamText
# Matches <canvas kind="...">...</canvas> blocks (single-line or multiline).
_CANVAS_BLOCK_RE = re.compile(
r'<canvas\s+kind=["\']([^"\']+)["\']>(.*?)</canvas>',
re.DOTALL | re.IGNORECASE,
)
def extract_canvas_block(text: str) -> tuple[str, str | None, str | None]:
"""Strip the first <canvas kind="...">...</canvas> block from *text*.
Returns ``(visible_text, canvas_content, canvas_kind)``.
``canvas_content`` and ``canvas_kind`` are ``None`` when no block is found.
"""
match = _CANVAS_BLOCK_RE.search(text)
if not match:
return text, None, None
canvas_kind = match.group(1).strip()
canvas_content = match.group(2).strip()
visible = text[: match.start()] + text[match.end() :]
visible = visible.strip()
return visible, canvas_content, canvas_kind
WsFrame = WsStreamStart | WsStreamText | WsStreamEnd | WsFloatingDomain WsFrame = WsStreamStart | WsStreamText | WsStreamEnd | WsFloatingDomain

View File

@@ -87,6 +87,8 @@ class WsFrameType(str, Enum):
journey_reply = "journey_reply" journey_reply = "journey_reply"
# ── v5 brief frame types ────────────────────────────────────────── # ── v5 brief frame types ──────────────────────────────────────────
brief_request = "brief_request" brief_request = "brief_request"
# ── v6 task brief frame types ─────────────────────────────────────
task_brief_request = "task_brief_request"
class WsToolCall(BaseModel): class WsToolCall(BaseModel):
@@ -209,6 +211,7 @@ class WsStreamEnd(BaseModel):
type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end
request_id: str request_id: str
error: str | None = None error: str | None = None
mutations: list[dict[str, Any]] | None = None
class WsDomain(BaseModel): class WsDomain(BaseModel):