Files
api/app/core/deep_agent.py

1330 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Single-agent runners for home and contextual chat contexts."""
from __future__ import annotations
import json
import logging
import re
from datetime import date
from collections.abc import AsyncGenerator
from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from app.agents.client_agent import CLIENT_TOOLS
from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.relations_agent import make_query_relations_tool
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.scout_session_buffer import session_buffer
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent
from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session
logger = logging.getLogger(__name__)
MAX_HISTORY_TURNS = 20
# Mapping of core-memory language values to natural-language names for prompts.
_LANGUAGE_NAMES: dict[str, str] = {
"en": "English", "it": "Italian", "es": "Spanish",
"fr": "French", "de": "German",
"english": "English", "italian": "Italian", "italiano": "Italian",
"spanish": "Spanish", "español": "Spanish",
"french": "French", "français": "French",
"german": "German", "deutsch": "German",
}
def _language_instruction(context: dict[str, Any]) -> str:
"""Return a system-prompt suffix that tells the LLM to respond in the user's language.
Returns an empty string when the language is English or unknown — saves tokens.
"""
core = context.get("core_memory") or {}
raw = (core.get("language") or "").strip().lower()
if not raw:
return ""
lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation
if lang.lower() == "english":
return ""
return (
f"\n\nIMPORTANT: Always respond in {lang}. "
f"All your output text must be written in {lang}."
)
MANIFEST_TOKEN_BUDGET = 3000 # rough budget for <linked_folder> block
def format_folder_manifest(manifest: dict | None) -> str:
"""Format a folder manifest into the <linked_folder> block.
Truncates by mtime DESC if estimated tokens exceed MANIFEST_TOKEN_BUDGET.
Returns empty string if manifest is None or has no files.
"""
if not manifest or not manifest.get("files"):
return ""
files = list(manifest["files"])
files.sort(key=lambda f: f.get("mtimeMs", 0), reverse=True)
header = (
f"<linked_folder>\npath: {manifest.get('folderPath', '?')} "
f"({len(files)} files, scanned {manifest.get('lastScannedAt', '?')})\nfiles:\n"
)
footer_template = "{} more files omitted, use read_project_folder_file to access by path\n</linked_folder>"
char_budget = MANIFEST_TOKEN_BUDGET * 4 # ~4 chars/token
body = ""
included = 0
for f in files:
line = f"- /{f['relPath']} [{f.get('kind','text')}] {f.get('summary','')}\n"
if len(header) + len(body) + len(line) + len(footer_template.format(0)) > char_budget:
break
body += line
included += 1
omitted = len(files) - included
if omitted > 0:
return header + body + footer_template.format(omitted)
return header + body + "</linked_folder>"
async def _fetch_project_manifest(project_id: str) -> dict | None:
"""Fetch manifest from Electron via execute_on_client. Returns None if unlinked or error."""
from app.core.ws_context import execute_on_client
try:
result = await execute_on_client(
action="read_project_folder_manifest",
data={"projectId": project_id},
)
if not result or not result.get("folderPath"):
return None
return result
except Exception:
return None
async def build_brief_multi_project_manifest() -> str:
"""Build a compact multi-project manifest for the daily brief agent.
Calls execute_on_client('list_projects_with_folder_manifests') and keeps
the top 5 most-recently-modified files per project.
"""
try:
result = await execute_on_client(
action="list_projects_with_folder_manifests",
data={},
)
except Exception:
return ""
projects = (result or {}).get("projects") or []
if not projects:
return ""
blocks: list[str] = ["<linked_folders>"]
any_entry = False
for p in projects:
all_files = p.get("files", []) or []
files = sorted(all_files, key=lambda f: f.get("mtimeMs", 0), reverse=True)[:5]
blocks.append(f"project: {p.get('projectName','?')} [{p.get('projectId','?')}]")
blocks.append(f" path: {p.get('folderPath','?')} (scanned {p.get('lastScannedAt','?')})")
if not all_files:
blocks.append(" (no indexed files yet — folder is linked but empty or unscanned)")
else:
for f in files:
blocks.append(f" - /{f['relPath']} [{f.get('kind','text')}] {f.get('summary','')}")
if len(all_files) > 5:
blocks.append(f"{len(all_files) - 5} more files (use read_project_folder_file by relPath)")
any_entry = True
if not any_entry:
return ""
blocks.append("</linked_folders>")
return "\n".join(blocks)
def _datetime_context_injection(context: dict[str, Any]) -> str:
"""Build a comprehensive DATE CONTEXT block with pre-computed ms-epoch boundaries for common ranges."""
fp = context.get("format_prefs")
if not fp or not isinstance(fp, dict):
return ""
try:
from zoneinfo import ZoneInfo
from datetime import datetime as _dt, timezone as _utc, timedelta as _td
tz_name: str = str(fp.get("timezone") or "UTC")
now_iso: str = str(fp.get("now_iso") or "")
date_fmt: str = str(fp.get("date_format") or "dd/MM/yyyy")
time_fmt: str = str(fp.get("time_format") or "24h")
tz = ZoneInfo(tz_name)
if now_iso:
now_utc = _dt.fromisoformat(now_iso.replace("Z", "+00:00"))
else:
now_utc = _dt.now(_utc.utc)
now_ms = int(now_utc.timestamp() * 1000)
now_local = now_utc.astimezone(tz)
now_local_str = now_local.strftime("%Y-%m-%d %H:%M")
weekday_str = now_local.strftime("%A")
y, m, d = now_local.year, now_local.month, now_local.day
def _day(year: int, month: int, day: int) -> tuple[int, int]:
s = _dt(year, month, day, tzinfo=tz)
e = s + _td(days=1)
return int(s.timestamp() * 1000), int(e.timestamp() * 1000) - 1
def _between(start: "_dt", end_excl: "_dt") -> tuple[int, int]:
return int(start.timestamp() * 1000), int(end_excl.timestamp() * 1000) - 1
today_s, today_e = _day(y, m, d)
yd = now_local - _td(days=1)
yesterday_s, yesterday_e = _day(yd.year, yd.month, yd.day)
tm = now_local + _td(days=1)
tomorrow_s, tomorrow_e = _day(tm.year, tm.month, tm.day)
# ISO week (MonSun)
monday = _dt(y, m, d, tzinfo=tz) - _td(days=now_local.weekday())
last_monday = monday - _td(weeks=1)
next_monday = monday + _td(weeks=1)
this_week_s, this_week_e = _between(monday, next_monday)
last_week_s, last_week_e = _between(last_monday, monday)
next_week_s, next_week_e = _between(next_monday, next_monday + _td(weeks=1))
# Calendar months
this_m_start = _dt(y, m, 1, tzinfo=tz)
next_m_start = _dt(y + (m // 12), m % 12 + 1, 1, tzinfo=tz)
last_m_start = _dt(y - (1 if m == 1 else 0), 12 if m == 1 else m - 1, 1, tzinfo=tz)
next2_m = next_m_start.month % 12 + 1
next2_y = next_m_start.year + (1 if next_m_start.month == 12 else 0)
next2_m_start = _dt(next2_y, next2_m, 1, tzinfo=tz)
this_month_s, this_month_e = _between(this_m_start, next_m_start)
last_month_s, last_month_e = _between(last_m_start, this_m_start)
next_month_s, next_month_e = _between(next_m_start, next2_m_start)
# Calendar years
this_yr_s, this_yr_e = _between(_dt(y, 1, 1, tzinfo=tz), _dt(y + 1, 1, 1, tzinfo=tz))
last_yr_s, last_yr_e = _between(_dt(y - 1, 1, 1, tzinfo=tz), _dt(y, 1, 1, tzinfo=tz))
sunday = monday + _td(days=6)
last_sunday = last_monday + _td(days=6)
next_sunday = next_monday + _td(days=6)
return (
f"\n\nDATE CONTEXT (timezone: {tz_name}, dateFormat: {date_fmt}, timeFormat: {time_fmt})\n"
f"now_local: {now_local_str} ({weekday_str})\n"
f"now_ms: {now_ms}\n\n"
f"today [{today_s}, {today_e}] {y:04d}-{m:02d}-{d:02d}\n"
f"tomorrow [{tomorrow_s}, {tomorrow_e}] {tm.strftime('%Y-%m-%d')}\n"
f"yesterday [{yesterday_s}, {yesterday_e}] {yd.strftime('%Y-%m-%d')}\n"
f"this_week [{this_week_s}, {this_week_e}] {monday.strftime('%Y-%m-%d')}{sunday.strftime('%Y-%m-%d')} (MonSun)\n"
f"last_week [{last_week_s}, {last_week_e}] {last_monday.strftime('%Y-%m-%d')}{last_sunday.strftime('%Y-%m-%d')}\n"
f"next_week [{next_week_s}, {next_week_e}] {next_monday.strftime('%Y-%m-%d')}{next_sunday.strftime('%Y-%m-%d')}\n"
f"this_month [{this_month_s}, {this_month_e}] {y:04d}-{m:02d}\n"
f"last_month [{last_month_s}, {last_month_e}] {last_m_start.strftime('%Y-%m')}\n"
f"next_month [{next_month_s}, {next_month_e}] {next_m_start.strftime('%Y-%m')}\n"
f"this_year [{this_yr_s}, {this_yr_e}] {y:04d}\n"
f"last_year [{last_yr_s}, {last_yr_e}] {y - 1:04d}\n\n"
f"When calling list_tasks_due_today or list_timelines_today, always pass user_timezone=\"{tz_name}\".\n"
f"When presenting dates, format using dateFormat={date_fmt} and timeFormat={time_fmt}."
)
except Exception:
return ""
def _proactive_hints_injection(context: dict[str, Any]) -> str:
"""Return a system-prompt paragraph listing proactive behavioral hints.
Returns empty string when no hints or confidence below threshold.
Capped at 600 chars.
"""
hints: list[str] = context.get("proactive_hints") or []
if not hints:
return ""
body = "\n".join(f"- {h}" for h in hints)
section = f"\n\nI noticed (behavioral patterns):\n{body}"
if len(section) > 600:
section = section[:597] + "..."
return section
def _relational_memory_injection(context: dict[str, Any]) -> str:
"""Return a system-prompt paragraph listing known people/projects from relational memory.
Returns empty string when no relational rows or tier is Free.
Capped at 800 chars to control token spend.
"""
relations: list[str] = context.get("relational_memory") or []
if not relations:
return ""
body = "\n".join(f"- {r}" for r in relations)
section = f"\n\nKnown people & projects:\n{body}"
if len(section) > 800:
section = section[:797] + "..."
return section
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
def _user_identity_injection(context: dict[str, Any]) -> str:
"""Return a compact user-profile block from core memory onboarding fields.
Returns empty string when no onboarding keys are present.
"""
core = context.get("core_memory") or {}
parts: list[str] = []
for key in _IDENTITY_KEYS:
val = (core.get(key) or "").strip()
if val:
parts.append(f"- {key}: {val}")
if not parts:
return ""
return "\n\nUser profile:\n" + "\n".join(parts)
def _request_context_block(context: dict[str, Any]) -> str:
"""Return a small block with per-request scope and resolved project context."""
parts: list[str] = []
scope = context.get("scope")
if scope and isinstance(scope, dict):
parts.append(f"scope: {json.dumps(scope, ensure_ascii=True)}")
resolved = context.get("resolved_project_id")
if resolved and isinstance(resolved, str):
parts.append(f"resolved_project_id: {resolved}")
return "\n".join(parts)
_HOME_SYSTEM_PROMPT = """\
You are adiuvAI's home executive assistant.{user_identity}
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
# How you work
- Use tools before answering anything factual. Never guess counts, dates, or status.
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
- Match the user's tone preference. Default to warm-but-direct; stay concise.
- When the user asks to remember, forget, or update something, use memory tools.
# Filter discipline
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
- The user's own name in the User profile block is for context only — it is NOT a default filter.
- When in doubt, omit `assignee` and return the global result.
# Output format
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
<project>id</project> <task>id</task> <note>id</note> <timeline>id</timeline>
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
3. One short closing recap (12 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
<chart>{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}</chart>
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
- data rows must include a "name" field; numeric series keys must match config keys.
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
# Date filtering
{date_context}
When filtering tasks/timelines/notes by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_today with user_timezone from DATE CONTEXT.
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Behavioral hints
{proactive_hints}
# Request context
{request_context}\
"""
_CONTEXTUAL_SYSTEM_PROMPT = """You are adiuvAI's contextual assistant. The user is working inside the app and has opened a side chat anchored to a specific view ("current view"). Help them act on that view: recap, plan, create entities, answer questions.
Rules:
1. Base context (current view summary) is provided every turn. Treat it as ground truth for ids and names; never invent them.
2. ALL reads go through `get_page_details`. The legacy tools `list_projects`, `get_project`, `list_tasks`, `get_task`, `list_notes`, `get_note` are NOT available in this channel — do not attempt to call them. To find an entity by name, call `get_page_details({entityType: 'projects_all' | 'tasks_all' | 'timeline_all'})` to list, then `get_page_details({entityType: '<type>', entityId})` for the full snapshot.
3. When the user requests an action that creates or updates an entity:
- If the current view is a project and no project is specified, use the current project automatically.
- If the current view is the global Tasks / Projects / Timeline list and no project is specified, ASK before attaching to any project. Don't silently create orphan entities.
4. The current view can change mid-conversation (user navigates). When you see a system message "User navigated to ...", treat the new view as the active context. Prior turns remain visible but the active scope shifts.
5. Notes: you can read note bodies via `get_page_details({entityType:'note'})`. You CANNOT edit, summarize-to-replace, or append. Tell the user "note editing is coming in a later release" if asked.
6. Be concise. Default to 1-3 short paragraphs. Bullet lists fine. Don't restate the user's request.
7. Never expose ids in prose. Use names. Ids only travel through tool calls.
# Date context
{date_context}
# Language
{language_instruction}
"""
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
# Research workflow
Follow these steps in order, using tools:
1. Read the task fully (title, description, due date, priority, status, project, comments).
2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
# Output structure
Write the briefing in the user's language. Use this exact structure:
**What needs to be done**
(12 sentences, concrete and specific — what action the user must take)
**Context you should know**
(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
**Suggested first step**
(one specific, immediately actionable instruction)
If this is a writing task, append a canvas block at the very end:
<canvas kind="email|document|message">
...ready-to-use draft here...
</canvas>
Do NOT include the canvas block for non-writing tasks.
Do NOT repeat verbatim task fields the user already sees in the UI.
Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
You are an executive assistant continuing a conversation with your principal.
You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
Your briefing:
---
{briefing_context}
---
Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
Stay terse — your principal is a busy executive.
# Date context
{date_context}
# Language
{language_instruction}
# Known people & projects
{relational_memory}
# Request context
{request_context}\
"""
def _as_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content)
def _candidate_tokens(message: str) -> list[str]:
tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower())
return [token for token in tokens if len(token) >= 3]
async def _resolve_project_id_from_message(message: str) -> str | None:
"""Resolve likely project UUID from user message using client project list."""
try:
result = await execute_on_client(action="select", table="projects")
except Exception as exc:
logger.warning("deep_agent: project resolve select failed: %s", exc)
return None
rows = result.get("rows", [])
if not isinstance(rows, list) or not rows:
return None
tokens = _candidate_tokens(message)
scored: list[tuple[int, dict[str, Any]]] = []
for row in rows:
if not isinstance(row, dict):
continue
name = str(row.get("name", "")).lower()
score = sum(1 for token in tokens if token in name)
if score > 0:
scored.append((score, row))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
top_score = scored[0][0]
top_rows = [row for score, row in scored if score == top_score]
if len(top_rows) != 1:
return None
project_id = top_rows[0].get("id")
return project_id if isinstance(project_id, str) else None
def _needs_project_resolution(message: str) -> bool:
lowered = message.lower()
return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"])
async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]:
prepared = dict(context)
if _needs_project_resolution(message):
resolved_project_id = await _resolve_project_id_from_message(message)
if resolved_project_id:
prepared["resolved_project_id"] = resolved_project_id
logger.info("deep_agent: resolved_project_id=%s", resolved_project_id)
return prepared
def _all_tools() -> list[Any]:
return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS]
# ── Contextual sidebar tools ──────────────────────────────────────────
@tool
async def get_page_details(
entity_type: str = "",
entity_id: str = "",
) -> str:
"""Fetch full details for the entity currently in view.
entity_type: one of 'project' | 'task' | 'note' | 'timeline_event' |
'tasks_all' | 'projects_all' | 'timeline_all'.
entity_id: UUID of the entity for singular entity views. Omit for list views.
The Electron drizzle-executor fulfils this op against local SQLite and
returns the row(s) as a JSON tool result.
"""
result = await execute_on_client(
action="get_page_details",
table=entity_type or "unknown",
data={"entityId": entity_id or None},
)
if not result:
return "No details found."
return str(result)
def _contextual_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return the tool palette for the contextual sidebar agent.
Read ops go through get_page_details only — legacy list_*/get_* tools
return shallow snapshots and cause the agent to under-answer (see
smoke trace 0b46841484ba7d024ed9f8d5ac8b1df0). Writes are limited
to entity creation + task update; note edits are next-sprint.
"""
from app.agents.note_agent import create_note # noqa: PLC0415
from app.agents.task_agent import create_task, update_task # noqa: PLC0415
from app.agents.timeline_agent import create_timeline # noqa: PLC0415
return [
get_page_details,
create_task,
update_task,
create_note,
create_timeline,
*_memory_tools(user_id, trace_id),
]
def _trace_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
request_id = debug.get("request_id")
if isinstance(request_id, str) and request_id:
return request_id
return None
def _session_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
session_id = debug.get("session_id")
if isinstance(session_id, str) and session_id:
return session_id
return None
def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> tuple[str, Any]:
"""Fetch Langfuse template and compile all per-request slots into one system prompt."""
template, prompt_obj = get_prompt_or_fallback(name, fallback)
text = compile_prompt(
template, prompt_obj,
date_context=_datetime_context_injection(context).strip(),
language_instruction=_language_instruction(context).strip(),
user_identity=_user_identity_injection(context).strip(),
relational_memory=_relational_memory_injection(context).strip(),
proactive_hints=_proactive_hints_injection(context).strip(),
request_context=_request_context_block(context),
)
return text, prompt_obj
_TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]</\1>")
_TIMELINE_DMY_RE = re.compile(r"(?P<d>\d{2})/(?P<m>\d{2})/(?P<y>\d{4})")
def _is_upcoming_timeline_query(message: str) -> bool:
lowered = message.lower()
has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered
has_timeline_topic = any(
token in lowered
for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden")
)
return has_upcoming and has_timeline_topic
def _timeline_date_in_current_month_or_future(dmy: str) -> bool:
match = _TIMELINE_DMY_RE.search(dmy)
if not match:
return True
try:
parsed = date(
int(match.group("y")),
int(match.group("m")),
int(match.group("d")),
)
except ValueError:
return True
today = date.today()
return parsed >= today and parsed.year == today.year and parsed.month == today.month
def _normalize_tagged_list_lines(text: str, message: str) -> str:
if not text:
return text
upcoming_timeline_only = _is_upcoming_timeline_query(message)
output_lines: list[str] = []
for line in text.splitlines():
matches = list(_TAG_LINE_RE.finditer(line))
if not matches:
output_lines.append(line)
continue
had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)")
if not had_non_tag_text and len(matches) == 1:
tag_text = matches[0].group(0)
if (
upcoming_timeline_only
and "<timeline>" in tag_text
and not _timeline_date_in_current_month_or_future(line)
):
continue
output_lines.append(tag_text)
continue
for match in matches:
tag_text = match.group(0)
if (
upcoming_timeline_only
and "<timeline>" in tag_text
and not _timeline_date_in_current_month_or_future(line)
):
continue
output_lines.append(tag_text)
return "\n".join(output_lines)
def _normalize_memory_label(path_or_label: str) -> str:
value = path_or_label.strip()
if value.startswith("/memories/"):
value = value[len("/memories/"):]
value = value.strip("/")
return value
def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
@tool
async def memory_list_blocks() -> str:
"""List all core memory blocks currently stored for the user."""
logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id)
async with async_session() as db:
memory = MemoryMiddleware(db)
blocks = await memory.list_core_blocks(user_id)
if not blocks:
return "No memory blocks found."
lines = [f"- {b['label']}: {b['value']}" for b in blocks]
return "Memory blocks:\n" + "\n".join(lines)
@tool
async def memory_get(path_or_label: str) -> str:
"""Get one memory block by label or /memories/<label> path."""
label = _normalize_memory_label(path_or_label)
logger.info("deep_agent: memory_get trace=%s user=%s label=%s", trace_id or "-", user_id, label)
if not label:
return "Invalid memory label."
async with async_session() as db:
memory = MemoryMiddleware(db)
value = await memory.get_core_block(user_id, label)
if value is None:
return f"Memory block '{label}' not found."
return f"Memory block '{label}':\n{value}"
@tool
async def memory_create(path_or_label: str, value: str) -> str:
"""Create or overwrite a memory block value by label or /memories/<label> path."""
label = _normalize_memory_label(path_or_label)
logger.info("deep_agent: memory_create trace=%s user=%s label=%s", trace_id or "-", user_id, label)
if not label:
return "Invalid memory label."
async with async_session() as db:
memory = MemoryMiddleware(db)
await memory.update_core(user_id, label, value, trace_id=trace_id)
return f"Memory block '{label}' saved."
@tool
async def memory_append(path_or_label: str, content: str) -> str:
"""Append content to a memory block, creating it if missing."""
label = _normalize_memory_label(path_or_label)
logger.info("deep_agent: memory_append trace=%s user=%s label=%s", trace_id or "-", user_id, label)
if not label:
return "Invalid memory label."
async with async_session() as db:
memory = MemoryMiddleware(db)
await memory.append_core(user_id, label, content)
return f"Memory block '{label}' appended."
@tool
async def memory_replace(path_or_label: str, old_string: str, new_string: str) -> str:
"""Replace one exact string in a memory block."""
label = _normalize_memory_label(path_or_label)
logger.info("deep_agent: memory_replace trace=%s user=%s label=%s", trace_id or "-", user_id, label)
if not label:
return "Invalid memory label."
async with async_session() as db:
memory = MemoryMiddleware(db)
changed = await memory.replace_core(user_id, label, old_string, new_string)
if not changed:
return f"No replacement made in '{label}' (old string not found)."
return f"Memory block '{label}' updated."
@tool
async def memory_delete(path_or_label: str) -> str:
"""Delete a memory block by label or /memories/<label> path."""
label = _normalize_memory_label(path_or_label)
logger.info("deep_agent: memory_delete trace=%s user=%s label=%s", trace_id or "-", user_id, label)
if not label:
return "Invalid memory label."
async with async_session() as db:
memory = MemoryMiddleware(db)
deleted = await memory.delete_core(user_id, label)
if not deleted:
return f"Memory block '{label}' not found."
return f"Memory block '{label}' deleted."
@tool
async def archival_memory_insert(content: str) -> str:
"""Insert a long-term archival memory entry."""
logger.info("deep_agent: archival_memory_insert trace=%s user=%s", trace_id or "-", user_id)
async with async_session() as db:
memory = MemoryMiddleware(db)
await memory.insert_archival(user_id, content, source="assistant")
return "Archival memory saved."
@tool
async def archival_memory_search(query: str, top_k: int = 5) -> str:
"""Search long-term archival memory by semantic fallback (keyword currently)."""
logger.info("deep_agent: archival_memory_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
async with async_session() as db:
memory = MemoryMiddleware(db)
results = await memory.search_archival(user_id, query, top_k=top_k)
if not results:
return "No archival memory results found."
lines = [f"- {item}" for item in results]
return "Archival memory results:\n" + "\n".join(lines)
@tool
async def conversation_search(query: str, top_k: int = 5) -> str:
"""Search recall memory from prior episodic conversation summaries."""
logger.info("deep_agent: conversation_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
async with async_session() as db:
memory = MemoryMiddleware(db)
results = await memory.search_recall(user_id, query, top_k=top_k)
if not results:
return "No recall memory results found."
lines = [f"- {item}" for item in results]
return "Recall memory results:\n" + "\n".join(lines)
@tool
async def search_associative(query: str, limit: int = 5) -> str:
"""Semantic search across associative (archival) memory for a given query.
Use this to surface long-term memories related to a topic, client, or task
that may not appear in recent episodes.
query: natural-language search phrase.
limit: max results (default 5).
"""
logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
async with async_session() as db:
memory = MemoryMiddleware(db)
results = await memory.search_archival(user_id, query, top_k=limit)
if not results:
return "No associative memory results found."
lines = [f"- {item}" for item in results]
return "Associative memory results:\n" + "\n".join(lines)
return [
memory_list_blocks,
memory_get,
memory_create,
memory_append,
memory_replace,
memory_delete,
archival_memory_insert,
archival_memory_search,
conversation_search,
search_associative,
]
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return memory tools that only read — safe for the read-only brief-agent subset."""
all_mem = _memory_tools(user_id, trace_id)
_read_names = {
"memory_list_blocks", "memory_get", "archival_memory_search",
"conversation_search", "search_associative",
}
return [t for t in all_mem if t.name in _read_names]
def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]:
"""Return the full tool palette for Stage-1 task brief research (read-only)."""
return [
*TASK_TOOLS,
*PROJECT_TOOLS,
*NOTE_TOOLS,
*TIMELINE_TOOLS,
*CLIENT_TOOLS,
*_read_only_memory_tools(user_id, trace_id),
make_query_relations_tool(user_id, trace_id),
]
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
return [*_all_tools(), *_memory_tools(user_id, trace_id)]
def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]:
if not history:
return []
turns = history[-MAX_HISTORY_TURNS:]
result: list[Any] = []
for turn in turns:
role = turn.get("role", "")
content = turn.get("content", "")
if not content:
continue
if role == "user":
result.append(HumanMessage(content=content))
elif role == "assistant":
result.append(AIMessage(content=content))
return result
async def _run_single_agent(
*,
user_id: str,
system_prompt: str,
message: str,
context: dict[str, Any],
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
conversation_history: list[dict[str, str]] | None = None,
) -> str:
trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse()
llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id)
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
llm_with_tools = llm.bind_tools(tools)
_buffered = session_buffer.get(user_id, session_id) if session_id else None
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
messages: list[Any] = [
SystemMessage(content=system_prompt),
*history_messages,
HumanMessage(content=message),
]
tool_calls_count = 0
collected: list[dict[str, Any]] = []
set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = (
lf.start_as_current_observation(
as_type="agent",
name=agent_name,
metadata={"user_id": user_id, "session_id": trace_id},
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
_messages_to_save: list[Any] | None = None
try:
for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=model_for_agent(agent_name),
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
messages.append(response)
if not response.tool_calls:
final_text = _as_text(response.content)
logger.info(
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d",
trace_id or "-",
user_id,
tool_calls_count,
len(final_text),
)
if _span:
_span.update(output=final_text)
_messages_to_save = messages[1:] # strip SystemMessage; save full tool history
return final_text
tool_map = {tool_def.name: tool_def for tool_def in tools}
for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
call_id,
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
elif lf:
with lf.start_as_current_observation(
as_type="tool",
name=call_name,
input=call_args,
) as tool_obs:
tool_output = await tool_fn.ainvoke(call_args)
tool_obs.update(output=str(tool_output)[:8000])
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
call_id,
call_name,
str(tool_output)[:1200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
messages.append(AIMessage(content=final_text))
logger.info(
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
trace_id or "-",
user_id,
tool_calls_count,
len(final_text),
)
if _span:
_span.update(output=final_text)
_messages_to_save = messages[1:]
return final_text
finally:
if session_id and _messages_to_save is not None:
session_buffer.set(user_id, session_id, _messages_to_save)
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def _run_single_agent_stream(
*,
user_id: str,
system_prompt: str,
message: str,
context: dict[str, Any],
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
tools: list[Any] | None = None,
conversation_history: list[dict[str, str]] | None = None,
) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse()
llm = get_agent_llm(agent_name)
if tools is None:
tools = _all_tools_for_user(user_id, trace_id)
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
llm_with_tools = llm.bind_tools(tools)
_buffered = session_buffer.get(user_id, session_id) if session_id else None
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
messages: list[Any] = [
SystemMessage(content=system_prompt),
*history_messages,
HumanMessage(content=message),
]
tool_calls_count = 0
streamed_chars = 0
collected: list[dict[str, Any]] = []
set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = (
lf.start_as_current_observation(
as_type="agent",
name=f"{agent_name}-stream",
metadata={"user_id": user_id, "session_id": trace_id},
input=message,
)
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
streamed_text: list[str] = []
_messages_to_save: list[Any] | None = None
try:
for _ in range(max_steps):
_gen_ctx = (
lf.start_as_current_observation(
as_type="generation",
name=f"{agent_name}-llm",
model=model_for_agent(agent_name),
prompt=langfuse_prompt,
input=messages,
)
if lf else None
)
_gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx:
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None)
if not response.tool_calls:
# Yield the content from the ainvoke response directly — no second LLM call.
# Previously, messages.append(response) was called first, so the re-stream
# received [System, Human, AI] and regenerated a response without tools bound.
final_text = _as_text(response.content)
if final_text:
streamed_chars += len(final_text)
streamed_text.append(final_text)
yield "token", final_text
logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
trace_id or "-",
user_id,
tool_calls_count,
streamed_chars,
)
if _span:
_span.update(output="".join(streamed_text))
messages.append(response)
_messages_to_save = messages[1:] # strip SystemMessage
return
messages.append(response)
tool_map = {tool_def.name: tool_def for tool_def in tools}
for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", ""))
call_name = str(call.get("name", ""))
call_args = call.get("args", {})
logger.info(
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
call_id,
call_name,
json.dumps(call_args, ensure_ascii=True)[:800],
)
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
elif lf:
with lf.start_as_current_observation(
as_type="tool",
name=call_name,
input=call_args,
) as tool_obs:
tool_output = await tool_fn.ainvoke(call_args)
tool_obs.update(output=str(tool_output)[:8000])
else:
tool_output = await tool_fn.ainvoke(call_args)
logger.info(
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
call_id,
call_name,
str(tool_output)[:1200],
)
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
fallback_chunks: list[str] = []
async for chunk in llm.astream(messages):
token = _as_text(getattr(chunk, "content", ""))
if token:
streamed_chars += len(token)
streamed_text.append(token)
fallback_chunks.append(token)
yield "token", token
messages.append(AIMessage(content="".join(fallback_chunks)))
_messages_to_save = messages[1:]
logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
trace_id or "-",
user_id,
tool_calls_count,
streamed_chars,
)
if _span:
_span.update(output="".join(streamed_text))
finally:
if session_id and _messages_to_save is not None:
session_buffer.set(user_id, session_id, _messages_to_save)
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf:
lf.flush()
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
response = await _run_single_agent(
user_id=user_id,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
conversation_history=context.get("conversation_history"),
)
return _normalize_tagged_list_lines(response, message)
async def run_home_stream(
user_id: str,
message: str,
context: dict[str, Any],
project_id: str | None = None,
) -> AsyncGenerator[tuple[str, Any], None]:
from app.agents.folder_agent import FOLDER_TOOLS
prepared_context = await _prepare_context(message, context)
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
manifest_block = ""
if project_id:
manifest = await _fetch_project_manifest(project_id)
manifest_block = format_folder_manifest(manifest)
if not manifest_block:
# No specific project context — surface all linked folders so the agent
# can answer questions like "tell me about project X" using its files.
manifest_block = await build_brief_multi_project_manifest()
system_prompt = system_prompt + ("\n\n" + manifest_block if manifest_block else "")
trace_id = _trace_id_from_context(prepared_context)
tools = [*_all_tools_for_user(user_id, trace_id), *FOLDER_TOOLS]
text_chunks: list[str] = []
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
tools=tools,
conversation_history=context.get("conversation_history"),
):
event_type, data = event
if event_type != "token":
yield event
continue
text_chunks.append(str(data or ""))
normalized = _normalize_tagged_list_lines("".join(text_chunks), message)
if normalized:
yield "token", normalized
async def run_contextual_stream(
user_id: str,
message: str,
context: dict[str, Any],
scope: "ContextualScope", # type: ignore[name-defined]
) -> AsyncGenerator[tuple[str, Any], None]:
"""Run the contextual agent for a single user turn.
Injects the rendered scope block into the system prompt and exposes
the contextual tool set.
Note-edit tools (propose_note_edit) are intentionally excluded.
*context contract*: callers MUST include ``context["_debug"]["session_id"]``
(a non-empty str) so that ``_session_id_from_context`` can extract it for
tracing and episode storage downstream. The WS handler in device_ws.py
satisfies this by always populating ``_debug`` before calling this function.
"""
from app.schemas.contextual import ContextualScope, render_scope_block # noqa: PLC0415
prepared_context = await _prepare_context(message, context)
trace_id = _trace_id_from_context(prepared_context)
system_prompt, langfuse_prompt = _build_system_prompt(
"contextual_system", _CONTEXTUAL_SYSTEM_PROMPT, prepared_context,
)
scope_block = render_scope_block(scope)
system_prompt = system_prompt + f"\n\n## Current view\n{scope_block}"
tools = _contextual_tools(user_id, trace_id)
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message=message,
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="contextual-agent",
tools=tools,
conversation_history=context.get("conversation_history"),
):
yield event
async def run_task_brief_research_stream(
user_id: str,
task_id: str,
context: dict[str, Any],
project_id: str | None = None,
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stage-1 executive assistant: deep research for one task.
Yields ``("token", chunk)`` events like other stream runners.
The final concatenated text may contain a ``<canvas kind="...">...</canvas>`` block
which the WS handler strips and emits as a ``canvas_draft`` mutation.
"""
from app.agents.folder_agent import FOLDER_TOOLS
prepared_context = await _prepare_context(f"task:{task_id}", context)
tools = [*_brief_research_tools(user_id, _trace_id_from_context(prepared_context)), *FOLDER_TOOLS]
# Inject task_id so the agent knows what to look up first.
research_message = (
f"Prepare a briefing dossier for task ID: {task_id}\n"
"Follow the research workflow: read the task, then project, then client, "
"then cross-project relations, then relevant memory. "
"End with a concrete suggested first step. "
"If this is a writing task, include a <canvas kind=\"...\"> draft."
)
system_prompt, langfuse_prompt = _build_system_prompt(
"task_brief_research_system",
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT,
prepared_context,
)
manifest_block = ""
if project_id:
manifest = await _fetch_project_manifest(project_id)
manifest_block = format_folder_manifest(manifest)
system_prompt = system_prompt + ("\n\n" + manifest_block if manifest_block else "")
async for event in _run_single_agent_stream(
user_id=user_id,
system_prompt=system_prompt,
message=research_message,
context=prepared_context,
max_steps=12,
langfuse_prompt=langfuse_prompt,
agent_name="task-brief-agent",
tools=tools,
conversation_history=None,
):
yield event
async def update_core_memory(user_id: str, key: str, value: str) -> None:
"""Compatibility helper kept for callers that expect explicit memory update API."""
async with async_session() as db:
memory = MemoryMiddleware(db)
await memory.update_core(user_id, key, value)