"""Single-agent runners for home and floating chat contexts."""
from __future__ import annotations
import json
import logging
import re
from datetime import date
from collections.abc import AsyncGenerator
from typing import Any, Literal
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from app.agents.client_agent import CLIENT_TOOLS
from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.relations_agent import make_query_relations_tool
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.agent_session_buffer import session_buffer
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent
from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session
logger = logging.getLogger(__name__)
MAX_HISTORY_TURNS = 20
FloatingDomainType = Literal["task", "timeline", "project", "node"]
FloatingDomainSection = Literal["task", "timeline", "note"]
# Mapping of core-memory language values to natural-language names for prompts.
_LANGUAGE_NAMES: dict[str, str] = {
"en": "English", "it": "Italian", "es": "Spanish",
"fr": "French", "de": "German",
"english": "English", "italian": "Italian", "italiano": "Italian",
"spanish": "Spanish", "español": "Spanish",
"french": "French", "français": "French",
"german": "German", "deutsch": "German",
}
def _language_instruction(context: dict[str, Any]) -> str:
"""Return a system-prompt suffix that tells the LLM to respond in the user's language.
Returns an empty string when the language is English or unknown — saves tokens.
"""
core = context.get("core_memory") or {}
raw = (core.get("language") or "").strip().lower()
if not raw:
return ""
lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation
if lang.lower() == "english":
return ""
return (
f"\n\nIMPORTANT: Always respond in {lang}. "
f"All your output text must be written in {lang}."
)
def _datetime_context_injection(context: dict[str, Any]) -> str:
"""Build a comprehensive DATE CONTEXT block with pre-computed ms-epoch boundaries for common ranges."""
fp = context.get("format_prefs")
if not fp or not isinstance(fp, dict):
return ""
try:
from zoneinfo import ZoneInfo
from datetime import datetime as _dt, timezone as _utc, timedelta as _td
tz_name: str = str(fp.get("timezone") or "UTC")
now_iso: str = str(fp.get("now_iso") or "")
date_fmt: str = str(fp.get("date_format") or "dd/MM/yyyy")
time_fmt: str = str(fp.get("time_format") or "24h")
tz = ZoneInfo(tz_name)
if now_iso:
now_utc = _dt.fromisoformat(now_iso.replace("Z", "+00:00"))
else:
now_utc = _dt.now(_utc.utc)
now_ms = int(now_utc.timestamp() * 1000)
now_local = now_utc.astimezone(tz)
now_local_str = now_local.strftime("%Y-%m-%d %H:%M")
weekday_str = now_local.strftime("%A")
y, m, d = now_local.year, now_local.month, now_local.day
def _day(year: int, month: int, day: int) -> tuple[int, int]:
s = _dt(year, month, day, tzinfo=tz)
e = s + _td(days=1)
return int(s.timestamp() * 1000), int(e.timestamp() * 1000) - 1
def _between(start: "_dt", end_excl: "_dt") -> tuple[int, int]:
return int(start.timestamp() * 1000), int(end_excl.timestamp() * 1000) - 1
today_s, today_e = _day(y, m, d)
yd = now_local - _td(days=1)
yesterday_s, yesterday_e = _day(yd.year, yd.month, yd.day)
tm = now_local + _td(days=1)
tomorrow_s, tomorrow_e = _day(tm.year, tm.month, tm.day)
# ISO week (Mon–Sun)
monday = _dt(y, m, d, tzinfo=tz) - _td(days=now_local.weekday())
last_monday = monday - _td(weeks=1)
next_monday = monday + _td(weeks=1)
this_week_s, this_week_e = _between(monday, next_monday)
last_week_s, last_week_e = _between(last_monday, monday)
next_week_s, next_week_e = _between(next_monday, next_monday + _td(weeks=1))
# Calendar months
this_m_start = _dt(y, m, 1, tzinfo=tz)
next_m_start = _dt(y + (m // 12), m % 12 + 1, 1, tzinfo=tz)
last_m_start = _dt(y - (1 if m == 1 else 0), 12 if m == 1 else m - 1, 1, tzinfo=tz)
next2_m = next_m_start.month % 12 + 1
next2_y = next_m_start.year + (1 if next_m_start.month == 12 else 0)
next2_m_start = _dt(next2_y, next2_m, 1, tzinfo=tz)
this_month_s, this_month_e = _between(this_m_start, next_m_start)
last_month_s, last_month_e = _between(last_m_start, this_m_start)
next_month_s, next_month_e = _between(next_m_start, next2_m_start)
# Calendar years
this_yr_s, this_yr_e = _between(_dt(y, 1, 1, tzinfo=tz), _dt(y + 1, 1, 1, tzinfo=tz))
last_yr_s, last_yr_e = _between(_dt(y - 1, 1, 1, tzinfo=tz), _dt(y, 1, 1, tzinfo=tz))
sunday = monday + _td(days=6)
last_sunday = last_monday + _td(days=6)
next_sunday = next_monday + _td(days=6)
return (
f"\n\nDATE CONTEXT (timezone: {tz_name}, dateFormat: {date_fmt}, timeFormat: {time_fmt})\n"
f"now_local: {now_local_str} ({weekday_str})\n"
f"now_ms: {now_ms}\n\n"
f"today [{today_s}, {today_e}] {y:04d}-{m:02d}-{d:02d}\n"
f"tomorrow [{tomorrow_s}, {tomorrow_e}] {tm.strftime('%Y-%m-%d')}\n"
f"yesterday [{yesterday_s}, {yesterday_e}] {yd.strftime('%Y-%m-%d')}\n"
f"this_week [{this_week_s}, {this_week_e}] {monday.strftime('%Y-%m-%d')} → {sunday.strftime('%Y-%m-%d')} (Mon–Sun)\n"
f"last_week [{last_week_s}, {last_week_e}] {last_monday.strftime('%Y-%m-%d')} → {last_sunday.strftime('%Y-%m-%d')}\n"
f"next_week [{next_week_s}, {next_week_e}] {next_monday.strftime('%Y-%m-%d')} → {next_sunday.strftime('%Y-%m-%d')}\n"
f"this_month [{this_month_s}, {this_month_e}] {y:04d}-{m:02d}\n"
f"last_month [{last_month_s}, {last_month_e}] {last_m_start.strftime('%Y-%m')}\n"
f"next_month [{next_month_s}, {next_month_e}] {next_m_start.strftime('%Y-%m')}\n"
f"this_year [{this_yr_s}, {this_yr_e}] {y:04d}\n"
f"last_year [{last_yr_s}, {last_yr_e}] {y - 1:04d}\n\n"
f"When calling list_tasks_due_today or list_timelines_today, always pass user_timezone=\"{tz_name}\".\n"
f"When presenting dates, format using dateFormat={date_fmt} and timeFormat={time_fmt}."
)
except Exception:
return ""
def _proactive_hints_injection(context: dict[str, Any]) -> str:
"""Return a system-prompt paragraph listing proactive behavioral hints.
Returns empty string when no hints or confidence below threshold.
Capped at 600 chars.
"""
hints: list[str] = context.get("proactive_hints") or []
if not hints:
return ""
body = "\n".join(f"- {h}" for h in hints)
section = f"\n\nI noticed (behavioral patterns):\n{body}"
if len(section) > 600:
section = section[:597] + "..."
return section
def _relational_memory_injection(context: dict[str, Any]) -> str:
"""Return a system-prompt paragraph listing known people/projects from relational memory.
Returns empty string when no relational rows or tier is Free.
Capped at 800 chars to control token spend.
"""
relations: list[str] = context.get("relational_memory") or []
if not relations:
return ""
body = "\n".join(f"- {r}" for r in relations)
section = f"\n\nKnown people & projects:\n{body}"
if len(section) > 800:
section = section[:797] + "..."
return section
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
def _user_identity_injection(context: dict[str, Any]) -> str:
"""Return a compact user-profile block from core memory onboarding fields.
Returns empty string when no onboarding keys are present.
"""
core = context.get("core_memory") or {}
parts: list[str] = []
for key in _IDENTITY_KEYS:
val = (core.get(key) or "").strip()
if val:
parts.append(f"- {key}: {val}")
if not parts:
return ""
return "\n\nUser profile:\n" + "\n".join(parts)
def _request_context_block(context: dict[str, Any]) -> str:
"""Return a small block with per-request scope and resolved project context."""
parts: list[str] = []
scope = context.get("scope")
if scope and isinstance(scope, dict):
parts.append(f"scope: {json.dumps(scope, ensure_ascii=True)}")
resolved = context.get("resolved_project_id")
if resolved and isinstance(resolved, str):
parts.append(f"resolved_project_id: {resolved}")
return "\n".join(parts)
_HOME_SYSTEM_PROMPT = """\
You are adiuvAI's home executive assistant.{user_identity}
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
# How you work
- Use tools before answering anything factual. Never guess counts, dates, or status.
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
- Match the user's tone preference. Default to warm-but-direct; stay concise.
- When the user asks to remember, forget, or update something, use memory tools.
# Filter discipline
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
- The user's own name in the User profile block is for context only — it is NOT a default filter.
- When in doubt, omit `assignee` and return the global result.
# Output format
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
idididid
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
3. One short closing recap (1–2 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
- data rows must include a "name" field; numeric series keys must match config keys.
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
# Date filtering
{date_context}
When filtering tasks/timelines/notes by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_today with user_timezone from DATE CONTEXT.
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Behavioral hints
{proactive_hints}
# Request context
{request_context}\
"""
_FLOATING_SYSTEM_PROMPT = """\
You are adiuvAI's floating executive assistant.{user_identity}
You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope.
Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most.
# How you work
- Use tools before answering anything factual. Never guess.
- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant.
- Match the user's tone preference. Default to warm-but-direct.
- When the user asks to remember, forget, or update something, use memory tools.
# Filter discipline
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
- The user's own name in the User profile block is for context only — it is NOT a default filter.
- When in doubt, omit `assignee` and return the global result.
# Output format
Plain text only. Do NOT output XML/HTML-like tags such as , , , , or any bracketed-id wrappers, and do NOT output blocks — those are for the home assistant.
# Date filtering
{date_context}
When filtering by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Behavioral hints
{proactive_hints}
# Request context
{request_context}\
"""
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
# Research workflow
Follow these steps in order, using tools:
1. Read the task fully (title, description, due date, priority, status, project, comments).
2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
# Output structure
Write the briefing in the user's language. Use this exact structure:
**What needs to be done**
(1–2 sentences, concrete and specific — what action the user must take)
**Context you should know**
(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
**Suggested first step**
(one specific, immediately actionable instruction)
If this is a writing task, append a canvas block at the very end:
Do NOT include the canvas block for non-writing tasks.
Do NOT repeat verbatim task fields the user already sees in the UI.
Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
You are an executive assistant continuing a conversation with your principal.
You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
Your briefing:
---
{briefing_context}
---
Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
Stay terse — your principal is a busy executive.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
"You are a strict domain classifier for websocket floating requests. "
"Return ONLY a JSON object with keys: type, id, section. "
"Allowed type values: task, timeline, project, node. "
"Allowed section values: task, timeline, note, or null. "
"Rules: infer from user message intent first; do not blindly trust scope.type. "
"If user asks tasks/timeline/notes for a project, set type=project and section accordingly. "
"If project id is unknown but context.resolved_project_id exists, use it as id. "
"If id is unknown, use null. "
"No markdown, no prose, JSON only."
)
def _as_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content)
def _candidate_tokens(message: str) -> list[str]:
tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower())
return [token for token in tokens if len(token) >= 3]
async def _resolve_project_id_from_message(message: str) -> str | None:
"""Resolve likely project UUID from user message using client project list."""
try:
result = await execute_on_client(action="select", table="projects")
except Exception as exc:
logger.warning("deep_agent: project resolve select failed: %s", exc)
return None
rows = result.get("rows", [])
if not isinstance(rows, list) or not rows:
return None
tokens = _candidate_tokens(message)
scored: list[tuple[int, dict[str, Any]]] = []
for row in rows:
if not isinstance(row, dict):
continue
name = str(row.get("name", "")).lower()
score = sum(1 for token in tokens if token in name)
if score > 0:
scored.append((score, row))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
top_score = scored[0][0]
top_rows = [row for score, row in scored if score == top_score]
if len(top_rows) != 1:
return None
project_id = top_rows[0].get("id")
return project_id if isinstance(project_id, str) else None
def _needs_project_resolution(message: str) -> bool:
lowered = message.lower()
return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"])
async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]:
prepared = dict(context)
if _needs_project_resolution(message):
resolved_project_id = await _resolve_project_id_from_message(message)
if resolved_project_id:
prepared["resolved_project_id"] = resolved_project_id
logger.info("deep_agent: resolved_project_id=%s", resolved_project_id)
return prepared
def _all_tools() -> list[Any]:
return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS]
def _trace_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
request_id = debug.get("request_id")
if isinstance(request_id, str) and request_id:
return request_id
return None
def _session_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
session_id = debug.get("session_id")
if isinstance(session_id, str) and session_id:
return session_id
return None
def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> tuple[str, Any]:
"""Fetch Langfuse template and compile all per-request slots into one system prompt."""
template, prompt_obj = get_prompt_or_fallback(name, fallback)
text = compile_prompt(
template, prompt_obj,
date_context=_datetime_context_injection(context).strip(),
language_instruction=_language_instruction(context).strip(),
user_identity=_user_identity_injection(context).strip(),
relational_memory=_relational_memory_injection(context).strip(),
proactive_hints=_proactive_hints_injection(context).strip(),
request_context=_request_context_block(context),
)
return text, prompt_obj
_TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]\1>")
_TIMELINE_DMY_RE = re.compile(r"(?P\d{2})/(?P\d{2})/(?P\d{4})")
def _is_upcoming_timeline_query(message: str) -> bool:
lowered = message.lower()
has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered
has_timeline_topic = any(
token in lowered
for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden")
)
return has_upcoming and has_timeline_topic
def _timeline_date_in_current_month_or_future(dmy: str) -> bool:
match = _TIMELINE_DMY_RE.search(dmy)
if not match:
return True
try:
parsed = date(
int(match.group("y")),
int(match.group("m")),
int(match.group("d")),
)
except ValueError:
return True
today = date.today()
return parsed >= today and parsed.year == today.year and parsed.month == today.month
def _normalize_tagged_list_lines(text: str, message: str) -> str:
if not text:
return text
upcoming_timeline_only = _is_upcoming_timeline_query(message)
output_lines: list[str] = []
for line in text.splitlines():
matches = list(_TAG_LINE_RE.finditer(line))
if not matches:
output_lines.append(line)
continue
had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)")
if not had_non_tag_text and len(matches) == 1:
tag_text = matches[0].group(0)
if (
upcoming_timeline_only
and "" in tag_text
and not _timeline_date_in_current_month_or_future(line)
):
continue
output_lines.append(tag_text)
continue
for match in matches:
tag_text = match.group(0)
if (
upcoming_timeline_only
and "" in tag_text
and not _timeline_date_in_current_month_or_future(line)
):
continue
output_lines.append(tag_text)
return "\n".join(output_lines)
_GENERIC_TAG_RE = re.compile(r"?(task|project|note|timeline|chart)>", re.IGNORECASE)
_BRACKETED_ID_RE = re.compile(r"\[(?:[0-9a-fA-F-]{8,}|[A-Za-z0-9_-]{8,})\]")
_FLOATING_EMPTY_FALLBACK = "No results found."
def _strip_floating_markup_fragment(text: str) -> str:
if not text:
return text
cleaned = _GENERIC_TAG_RE.sub("", text)
return _BRACKETED_ID_RE.sub("", cleaned)
def _strip_floating_markup(text: str) -> str:
"""Ensure floating responses stay plain text with no XML-like tag wrappers."""
if not text:
return text
cleaned = _strip_floating_markup_fragment(text)
# Collapse excessive spaces introduced by tag/id removal while preserving lines.
lines = [re.sub(r"[ \t]{2,}", " ", line).strip() for line in cleaned.splitlines()]
return "\n".join(line for line in lines if line)
def _fallback_from_raw_floating_text(raw_text: str) -> str:
fallback = _strip_floating_markup_fragment(raw_text or "")
fallback = re.sub(r"[ \t]{2,}", " ", fallback).strip()
return fallback or _FLOATING_EMPTY_FALLBACK
class _FloatingStreamSanitizer:
"""Streaming sanitizer that removes floating markup without buffering the full answer."""
def __init__(self) -> None:
self._pending = ""
@staticmethod
def _split_safe_boundary(text: str) -> tuple[str, str]:
boundary = len(text)
last_lt = text.rfind("<")
if last_lt != -1 and ">" not in text[last_lt:]:
boundary = min(boundary, last_lt)
last_lb = text.rfind("[")
if last_lb != -1 and "]" not in text[last_lb:]:
boundary = min(boundary, last_lb)
if boundary == len(text):
return text, ""
return text[:boundary], text[boundary:]
def feed(self, chunk: str) -> str:
combined = f"{self._pending}{chunk}"
safe_text, self._pending = self._split_safe_boundary(combined)
return _strip_floating_markup_fragment(safe_text)
def finalize(self) -> str:
# Drop dangling unfinished wrappers at the very end.
tail = re.sub(r"<[^>\n]*$", "", self._pending)
tail = re.sub(r"\[[^\]\n]*$", "", tail)
self._pending = ""
return _strip_floating_markup_fragment(tail)
def _normalize_memory_label(path_or_label: str) -> str:
value = path_or_label.strip()
if value.startswith("/memories/"):
value = value[len("/memories/"):]
value = value.strip("/")
return value
def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
@tool
async def memory_list_blocks() -> str:
"""List all core memory blocks currently stored for the user."""
logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id)
async with async_session() as db:
memory = MemoryMiddleware(db)
blocks = await memory.list_core_blocks(user_id)
if not blocks:
return "No memory blocks found."
lines = [f"- {b['label']}: {b['value']}" for b in blocks]
return "Memory blocks:\n" + "\n".join(lines)
@tool
async def memory_get(path_or_label: str) -> str:
"""Get one memory block by label or /memories/