Add _fetch_project_manifest helper that calls read_project_folder_manifest via execute_on_client. Wire it into run_task_brief_research_stream (new optional project_id param) so the <linked_folder> block is prepended to the system prompt when the task belongs to a linked project. Also bind FOLDER_TOOLS into the task-brief tool palette so the agent can read folder files. device_ws extracts project_id / projectId from the task_brief_request frame and forwards it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1527 lines
61 KiB
Python
1527 lines
61 KiB
Python
"""Single-agent runners for home and floating chat contexts."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from datetime import date
|
||
from collections.abc import AsyncGenerator
|
||
from typing import Any, Literal
|
||
|
||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||
from langchain_core.tools import tool
|
||
|
||
from app.agents.client_agent import CLIENT_TOOLS
|
||
from app.agents.note_agent import NOTE_TOOLS
|
||
from app.agents.project_agent import PROJECT_TOOLS
|
||
from app.agents.relations_agent import make_query_relations_tool
|
||
from app.agents.task_agent import TASK_TOOLS
|
||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||
from app.core.agent_session_buffer import session_buffer
|
||
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
|
||
from app.core.llm import get_agent_llm, model_for_agent
|
||
from app.core.memory_middleware import MemoryMiddleware
|
||
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
|
||
from app.db import async_session
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MAX_HISTORY_TURNS = 20
|
||
|
||
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
||
FloatingDomainSection = Literal["task", "timeline", "note"]
|
||
|
||
# Mapping of core-memory language values to natural-language names for prompts.
|
||
_LANGUAGE_NAMES: dict[str, str] = {
|
||
"en": "English", "it": "Italian", "es": "Spanish",
|
||
"fr": "French", "de": "German",
|
||
"english": "English", "italian": "Italian", "italiano": "Italian",
|
||
"spanish": "Spanish", "español": "Spanish",
|
||
"french": "French", "français": "French",
|
||
"german": "German", "deutsch": "German",
|
||
}
|
||
|
||
|
||
def _language_instruction(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt suffix that tells the LLM to respond in the user's language.
|
||
|
||
Returns an empty string when the language is English or unknown — saves tokens.
|
||
"""
|
||
core = context.get("core_memory") or {}
|
||
raw = (core.get("language") or "").strip().lower()
|
||
if not raw:
|
||
return ""
|
||
lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation
|
||
if lang.lower() == "english":
|
||
return ""
|
||
return (
|
||
f"\n\nIMPORTANT: Always respond in {lang}. "
|
||
f"All your output text must be written in {lang}."
|
||
)
|
||
|
||
MANIFEST_TOKEN_BUDGET = 3000 # rough budget for <linked_folder> block
|
||
|
||
|
||
def format_folder_manifest(manifest: dict | None) -> str:
|
||
"""Format a folder manifest into the <linked_folder> block.
|
||
|
||
Truncates by mtime DESC if estimated tokens exceed MANIFEST_TOKEN_BUDGET.
|
||
Returns empty string if manifest is None or has no files.
|
||
"""
|
||
if not manifest or not manifest.get("files"):
|
||
return ""
|
||
files = list(manifest["files"])
|
||
files.sort(key=lambda f: f.get("mtimeMs", 0), reverse=True)
|
||
|
||
header = (
|
||
f"<linked_folder>\npath: {manifest.get('folderPath', '?')} "
|
||
f"({len(files)} files, scanned {manifest.get('lastScannedAt', '?')})\nfiles:\n"
|
||
)
|
||
footer_template = "… {} more files omitted, use read_project_folder_file to access by path\n</linked_folder>"
|
||
|
||
char_budget = MANIFEST_TOKEN_BUDGET * 4 # ~4 chars/token
|
||
body = ""
|
||
included = 0
|
||
for f in files:
|
||
line = f"- /{f['relPath']} [{f.get('kind','text')}] {f.get('summary','')}\n"
|
||
if len(header) + len(body) + len(line) + len(footer_template.format(0)) > char_budget:
|
||
break
|
||
body += line
|
||
included += 1
|
||
omitted = len(files) - included
|
||
if omitted > 0:
|
||
return header + body + footer_template.format(omitted)
|
||
return header + body + "</linked_folder>"
|
||
|
||
|
||
async def _fetch_project_manifest(project_id: str) -> dict | None:
|
||
"""Fetch manifest from Electron via execute_on_client. Returns None if unlinked or error."""
|
||
from app.core.ws_context import execute_on_client
|
||
try:
|
||
result = await execute_on_client(
|
||
action="read_project_folder_manifest",
|
||
data={"projectId": project_id},
|
||
)
|
||
if not result or not result.get("folderPath"):
|
||
return None
|
||
return result
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _datetime_context_injection(context: dict[str, Any]) -> str:
|
||
"""Build a comprehensive DATE CONTEXT block with pre-computed ms-epoch boundaries for common ranges."""
|
||
fp = context.get("format_prefs")
|
||
if not fp or not isinstance(fp, dict):
|
||
return ""
|
||
try:
|
||
from zoneinfo import ZoneInfo
|
||
from datetime import datetime as _dt, timezone as _utc, timedelta as _td
|
||
|
||
tz_name: str = str(fp.get("timezone") or "UTC")
|
||
now_iso: str = str(fp.get("now_iso") or "")
|
||
date_fmt: str = str(fp.get("date_format") or "dd/MM/yyyy")
|
||
time_fmt: str = str(fp.get("time_format") or "24h")
|
||
|
||
tz = ZoneInfo(tz_name)
|
||
if now_iso:
|
||
now_utc = _dt.fromisoformat(now_iso.replace("Z", "+00:00"))
|
||
else:
|
||
now_utc = _dt.now(_utc.utc)
|
||
|
||
now_ms = int(now_utc.timestamp() * 1000)
|
||
now_local = now_utc.astimezone(tz)
|
||
now_local_str = now_local.strftime("%Y-%m-%d %H:%M")
|
||
weekday_str = now_local.strftime("%A")
|
||
y, m, d = now_local.year, now_local.month, now_local.day
|
||
|
||
def _day(year: int, month: int, day: int) -> tuple[int, int]:
|
||
s = _dt(year, month, day, tzinfo=tz)
|
||
e = s + _td(days=1)
|
||
return int(s.timestamp() * 1000), int(e.timestamp() * 1000) - 1
|
||
|
||
def _between(start: "_dt", end_excl: "_dt") -> tuple[int, int]:
|
||
return int(start.timestamp() * 1000), int(end_excl.timestamp() * 1000) - 1
|
||
|
||
today_s, today_e = _day(y, m, d)
|
||
yd = now_local - _td(days=1)
|
||
yesterday_s, yesterday_e = _day(yd.year, yd.month, yd.day)
|
||
tm = now_local + _td(days=1)
|
||
tomorrow_s, tomorrow_e = _day(tm.year, tm.month, tm.day)
|
||
|
||
# ISO week (Mon–Sun)
|
||
monday = _dt(y, m, d, tzinfo=tz) - _td(days=now_local.weekday())
|
||
last_monday = monday - _td(weeks=1)
|
||
next_monday = monday + _td(weeks=1)
|
||
this_week_s, this_week_e = _between(monday, next_monday)
|
||
last_week_s, last_week_e = _between(last_monday, monday)
|
||
next_week_s, next_week_e = _between(next_monday, next_monday + _td(weeks=1))
|
||
|
||
# Calendar months
|
||
this_m_start = _dt(y, m, 1, tzinfo=tz)
|
||
next_m_start = _dt(y + (m // 12), m % 12 + 1, 1, tzinfo=tz)
|
||
last_m_start = _dt(y - (1 if m == 1 else 0), 12 if m == 1 else m - 1, 1, tzinfo=tz)
|
||
next2_m = next_m_start.month % 12 + 1
|
||
next2_y = next_m_start.year + (1 if next_m_start.month == 12 else 0)
|
||
next2_m_start = _dt(next2_y, next2_m, 1, tzinfo=tz)
|
||
this_month_s, this_month_e = _between(this_m_start, next_m_start)
|
||
last_month_s, last_month_e = _between(last_m_start, this_m_start)
|
||
next_month_s, next_month_e = _between(next_m_start, next2_m_start)
|
||
|
||
# Calendar years
|
||
this_yr_s, this_yr_e = _between(_dt(y, 1, 1, tzinfo=tz), _dt(y + 1, 1, 1, tzinfo=tz))
|
||
last_yr_s, last_yr_e = _between(_dt(y - 1, 1, 1, tzinfo=tz), _dt(y, 1, 1, tzinfo=tz))
|
||
|
||
sunday = monday + _td(days=6)
|
||
last_sunday = last_monday + _td(days=6)
|
||
next_sunday = next_monday + _td(days=6)
|
||
|
||
return (
|
||
f"\n\nDATE CONTEXT (timezone: {tz_name}, dateFormat: {date_fmt}, timeFormat: {time_fmt})\n"
|
||
f"now_local: {now_local_str} ({weekday_str})\n"
|
||
f"now_ms: {now_ms}\n\n"
|
||
f"today [{today_s}, {today_e}] {y:04d}-{m:02d}-{d:02d}\n"
|
||
f"tomorrow [{tomorrow_s}, {tomorrow_e}] {tm.strftime('%Y-%m-%d')}\n"
|
||
f"yesterday [{yesterday_s}, {yesterday_e}] {yd.strftime('%Y-%m-%d')}\n"
|
||
f"this_week [{this_week_s}, {this_week_e}] {monday.strftime('%Y-%m-%d')} → {sunday.strftime('%Y-%m-%d')} (Mon–Sun)\n"
|
||
f"last_week [{last_week_s}, {last_week_e}] {last_monday.strftime('%Y-%m-%d')} → {last_sunday.strftime('%Y-%m-%d')}\n"
|
||
f"next_week [{next_week_s}, {next_week_e}] {next_monday.strftime('%Y-%m-%d')} → {next_sunday.strftime('%Y-%m-%d')}\n"
|
||
f"this_month [{this_month_s}, {this_month_e}] {y:04d}-{m:02d}\n"
|
||
f"last_month [{last_month_s}, {last_month_e}] {last_m_start.strftime('%Y-%m')}\n"
|
||
f"next_month [{next_month_s}, {next_month_e}] {next_m_start.strftime('%Y-%m')}\n"
|
||
f"this_year [{this_yr_s}, {this_yr_e}] {y:04d}\n"
|
||
f"last_year [{last_yr_s}, {last_yr_e}] {y - 1:04d}\n\n"
|
||
f"When calling list_tasks_due_today or list_timelines_today, always pass user_timezone=\"{tz_name}\".\n"
|
||
f"When presenting dates, format using dateFormat={date_fmt} and timeFormat={time_fmt}."
|
||
)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _proactive_hints_injection(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt paragraph listing proactive behavioral hints.
|
||
|
||
Returns empty string when no hints or confidence below threshold.
|
||
Capped at 600 chars.
|
||
"""
|
||
hints: list[str] = context.get("proactive_hints") or []
|
||
if not hints:
|
||
return ""
|
||
body = "\n".join(f"- {h}" for h in hints)
|
||
section = f"\n\nI noticed (behavioral patterns):\n{body}"
|
||
if len(section) > 600:
|
||
section = section[:597] + "..."
|
||
return section
|
||
|
||
|
||
def _relational_memory_injection(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt paragraph listing known people/projects from relational memory.
|
||
|
||
Returns empty string when no relational rows or tier is Free.
|
||
Capped at 800 chars to control token spend.
|
||
"""
|
||
relations: list[str] = context.get("relational_memory") or []
|
||
if not relations:
|
||
return ""
|
||
body = "\n".join(f"- {r}" for r in relations)
|
||
section = f"\n\nKnown people & projects:\n{body}"
|
||
if len(section) > 800:
|
||
section = section[:797] + "..."
|
||
return section
|
||
|
||
|
||
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
|
||
|
||
|
||
def _user_identity_injection(context: dict[str, Any]) -> str:
|
||
"""Return a compact user-profile block from core memory onboarding fields.
|
||
|
||
Returns empty string when no onboarding keys are present.
|
||
"""
|
||
core = context.get("core_memory") or {}
|
||
parts: list[str] = []
|
||
for key in _IDENTITY_KEYS:
|
||
val = (core.get(key) or "").strip()
|
||
if val:
|
||
parts.append(f"- {key}: {val}")
|
||
if not parts:
|
||
return ""
|
||
return "\n\nUser profile:\n" + "\n".join(parts)
|
||
|
||
|
||
def _request_context_block(context: dict[str, Any]) -> str:
|
||
"""Return a small block with per-request scope and resolved project context."""
|
||
parts: list[str] = []
|
||
scope = context.get("scope")
|
||
if scope and isinstance(scope, dict):
|
||
parts.append(f"scope: {json.dumps(scope, ensure_ascii=True)}")
|
||
resolved = context.get("resolved_project_id")
|
||
if resolved and isinstance(resolved, str):
|
||
parts.append(f"resolved_project_id: {resolved}")
|
||
return "\n".join(parts)
|
||
|
||
|
||
_HOME_SYSTEM_PROMPT = """\
|
||
You are adiuvAI's home executive assistant.{user_identity}
|
||
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
|
||
|
||
# How you work
|
||
- Use tools before answering anything factual. Never guess counts, dates, or status.
|
||
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
|
||
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
|
||
- Match the user's tone preference. Default to warm-but-direct; stay concise.
|
||
- When the user asks to remember, forget, or update something, use memory tools.
|
||
|
||
# Filter discipline
|
||
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||
- When in doubt, omit `assignee` and return the global result.
|
||
|
||
# Output format
|
||
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
|
||
<project>id</project> <task>id</task> <note>id</note> <timeline>id</timeline>
|
||
|
||
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
|
||
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
|
||
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
|
||
3. One short closing recap (1–2 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
|
||
|
||
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
|
||
|
||
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
|
||
<chart>{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}</chart>
|
||
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
|
||
- data rows must include a "name" field; numeric series keys must match config keys.
|
||
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
|
||
|
||
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
|
||
|
||
# Date filtering
|
||
{date_context}
|
||
|
||
When filtering tasks/timelines/notes by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
|
||
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
|
||
For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_today with user_timezone from DATE CONTEXT.
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Behavioral hints
|
||
{proactive_hints}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_FLOATING_SYSTEM_PROMPT = """\
|
||
You are adiuvAI's floating executive assistant.{user_identity}
|
||
You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope.
|
||
Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most.
|
||
|
||
# How you work
|
||
- Use tools before answering anything factual. Never guess.
|
||
- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant.
|
||
- Match the user's tone preference. Default to warm-but-direct.
|
||
- When the user asks to remember, forget, or update something, use memory tools.
|
||
|
||
# Filter discipline
|
||
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||
- When in doubt, omit `assignee` and return the global result.
|
||
|
||
# Output format
|
||
Plain text only. Do NOT output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed-id wrappers, and do NOT output <chart> blocks — those are for the home assistant.
|
||
|
||
# Date filtering
|
||
{date_context}
|
||
|
||
When filtering by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
|
||
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Behavioral hints
|
||
{proactive_hints}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
|
||
You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
|
||
Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
|
||
|
||
# Research workflow
|
||
Follow these steps in order, using tools:
|
||
1. Read the task fully (title, description, due date, priority, status, project, comments).
|
||
2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
|
||
3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
|
||
4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
|
||
5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
|
||
6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
|
||
7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
|
||
|
||
# Output structure
|
||
Write the briefing in the user's language. Use this exact structure:
|
||
|
||
**What needs to be done**
|
||
(1–2 sentences, concrete and specific — what action the user must take)
|
||
|
||
**Context you should know**
|
||
(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
|
||
|
||
**Suggested first step**
|
||
(one specific, immediately actionable instruction)
|
||
|
||
If this is a writing task, append a canvas block at the very end:
|
||
<canvas kind="email|document|message">
|
||
...ready-to-use draft here...
|
||
</canvas>
|
||
|
||
Do NOT include the canvas block for non-writing tasks.
|
||
Do NOT repeat verbatim task fields the user already sees in the UI.
|
||
Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
|
||
|
||
# Date context
|
||
{date_context}
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
|
||
You are an executive assistant continuing a conversation with your principal.
|
||
You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
|
||
|
||
Your briefing:
|
||
---
|
||
{briefing_context}
|
||
---
|
||
|
||
Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
|
||
Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
|
||
Stay terse — your principal is a busy executive.
|
||
|
||
# Date context
|
||
{date_context}
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
|
||
"You are a strict domain classifier for websocket floating requests. "
|
||
"Return ONLY a JSON object with keys: type, id, section. "
|
||
"Allowed type values: task, timeline, project, node. "
|
||
"Allowed section values: task, timeline, note, or null. "
|
||
"Rules: infer from user message intent first; do not blindly trust scope.type. "
|
||
"If user asks tasks/timeline/notes for a project, set type=project and section accordingly. "
|
||
"If project id is unknown but context.resolved_project_id exists, use it as id. "
|
||
"If id is unknown, use null. "
|
||
"No markdown, no prose, JSON only."
|
||
)
|
||
|
||
|
||
def _as_text(content: Any) -> str:
|
||
if content is None:
|
||
return ""
|
||
if isinstance(content, str):
|
||
return content
|
||
if isinstance(content, list):
|
||
parts: list[str] = []
|
||
for item in content:
|
||
if isinstance(item, str):
|
||
parts.append(item)
|
||
elif isinstance(item, dict):
|
||
text = item.get("text")
|
||
if isinstance(text, str):
|
||
parts.append(text)
|
||
return "".join(parts)
|
||
return str(content)
|
||
|
||
|
||
def _candidate_tokens(message: str) -> list[str]:
|
||
tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower())
|
||
return [token for token in tokens if len(token) >= 3]
|
||
|
||
|
||
async def _resolve_project_id_from_message(message: str) -> str | None:
|
||
"""Resolve likely project UUID from user message using client project list."""
|
||
try:
|
||
result = await execute_on_client(action="select", table="projects")
|
||
except Exception as exc:
|
||
logger.warning("deep_agent: project resolve select failed: %s", exc)
|
||
return None
|
||
|
||
rows = result.get("rows", [])
|
||
if not isinstance(rows, list) or not rows:
|
||
return None
|
||
|
||
tokens = _candidate_tokens(message)
|
||
scored: list[tuple[int, dict[str, Any]]] = []
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
name = str(row.get("name", "")).lower()
|
||
score = sum(1 for token in tokens if token in name)
|
||
if score > 0:
|
||
scored.append((score, row))
|
||
|
||
if not scored:
|
||
return None
|
||
|
||
scored.sort(key=lambda item: item[0], reverse=True)
|
||
top_score = scored[0][0]
|
||
top_rows = [row for score, row in scored if score == top_score]
|
||
if len(top_rows) != 1:
|
||
return None
|
||
|
||
project_id = top_rows[0].get("id")
|
||
return project_id if isinstance(project_id, str) else None
|
||
|
||
|
||
def _needs_project_resolution(message: str) -> bool:
|
||
lowered = message.lower()
|
||
return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"])
|
||
|
||
|
||
async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]:
|
||
prepared = dict(context)
|
||
if _needs_project_resolution(message):
|
||
resolved_project_id = await _resolve_project_id_from_message(message)
|
||
if resolved_project_id:
|
||
prepared["resolved_project_id"] = resolved_project_id
|
||
logger.info("deep_agent: resolved_project_id=%s", resolved_project_id)
|
||
return prepared
|
||
|
||
|
||
def _all_tools() -> list[Any]:
|
||
return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS]
|
||
|
||
|
||
def _trace_id_from_context(context: dict[str, Any]) -> str | None:
|
||
debug = context.get("_debug")
|
||
if isinstance(debug, dict):
|
||
request_id = debug.get("request_id")
|
||
if isinstance(request_id, str) and request_id:
|
||
return request_id
|
||
return None
|
||
|
||
|
||
def _session_id_from_context(context: dict[str, Any]) -> str | None:
|
||
debug = context.get("_debug")
|
||
if isinstance(debug, dict):
|
||
session_id = debug.get("session_id")
|
||
if isinstance(session_id, str) and session_id:
|
||
return session_id
|
||
return None
|
||
|
||
|
||
def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> tuple[str, Any]:
|
||
"""Fetch Langfuse template and compile all per-request slots into one system prompt."""
|
||
template, prompt_obj = get_prompt_or_fallback(name, fallback)
|
||
text = compile_prompt(
|
||
template, prompt_obj,
|
||
date_context=_datetime_context_injection(context).strip(),
|
||
language_instruction=_language_instruction(context).strip(),
|
||
user_identity=_user_identity_injection(context).strip(),
|
||
relational_memory=_relational_memory_injection(context).strip(),
|
||
proactive_hints=_proactive_hints_injection(context).strip(),
|
||
request_context=_request_context_block(context),
|
||
)
|
||
return text, prompt_obj
|
||
|
||
|
||
_TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]</\1>")
|
||
_TIMELINE_DMY_RE = re.compile(r"(?P<d>\d{2})/(?P<m>\d{2})/(?P<y>\d{4})")
|
||
|
||
|
||
def _is_upcoming_timeline_query(message: str) -> bool:
|
||
lowered = message.lower()
|
||
has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered
|
||
has_timeline_topic = any(
|
||
token in lowered
|
||
for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden")
|
||
)
|
||
return has_upcoming and has_timeline_topic
|
||
|
||
|
||
def _timeline_date_in_current_month_or_future(dmy: str) -> bool:
|
||
match = _TIMELINE_DMY_RE.search(dmy)
|
||
if not match:
|
||
return True
|
||
try:
|
||
parsed = date(
|
||
int(match.group("y")),
|
||
int(match.group("m")),
|
||
int(match.group("d")),
|
||
)
|
||
except ValueError:
|
||
return True
|
||
|
||
today = date.today()
|
||
return parsed >= today and parsed.year == today.year and parsed.month == today.month
|
||
|
||
|
||
def _normalize_tagged_list_lines(text: str, message: str) -> str:
|
||
if not text:
|
||
return text
|
||
|
||
upcoming_timeline_only = _is_upcoming_timeline_query(message)
|
||
output_lines: list[str] = []
|
||
|
||
for line in text.splitlines():
|
||
matches = list(_TAG_LINE_RE.finditer(line))
|
||
if not matches:
|
||
output_lines.append(line)
|
||
continue
|
||
|
||
had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)")
|
||
if not had_non_tag_text and len(matches) == 1:
|
||
tag_text = matches[0].group(0)
|
||
if (
|
||
upcoming_timeline_only
|
||
and "<timeline>" in tag_text
|
||
and not _timeline_date_in_current_month_or_future(line)
|
||
):
|
||
continue
|
||
output_lines.append(tag_text)
|
||
continue
|
||
|
||
for match in matches:
|
||
tag_text = match.group(0)
|
||
if (
|
||
upcoming_timeline_only
|
||
and "<timeline>" in tag_text
|
||
and not _timeline_date_in_current_month_or_future(line)
|
||
):
|
||
continue
|
||
output_lines.append(tag_text)
|
||
|
||
return "\n".join(output_lines)
|
||
|
||
|
||
_GENERIC_TAG_RE = re.compile(r"</?(task|project|note|timeline|chart)>", re.IGNORECASE)
|
||
_BRACKETED_ID_RE = re.compile(r"\[(?:[0-9a-fA-F-]{8,}|[A-Za-z0-9_-]{8,})\]")
|
||
_FLOATING_EMPTY_FALLBACK = "No results found."
|
||
|
||
|
||
def _strip_floating_markup_fragment(text: str) -> str:
|
||
if not text:
|
||
return text
|
||
cleaned = _GENERIC_TAG_RE.sub("", text)
|
||
return _BRACKETED_ID_RE.sub("", cleaned)
|
||
|
||
|
||
def _strip_floating_markup(text: str) -> str:
|
||
"""Ensure floating responses stay plain text with no XML-like tag wrappers."""
|
||
if not text:
|
||
return text
|
||
|
||
cleaned = _strip_floating_markup_fragment(text)
|
||
# Collapse excessive spaces introduced by tag/id removal while preserving lines.
|
||
lines = [re.sub(r"[ \t]{2,}", " ", line).strip() for line in cleaned.splitlines()]
|
||
return "\n".join(line for line in lines if line)
|
||
|
||
|
||
def _fallback_from_raw_floating_text(raw_text: str) -> str:
|
||
fallback = _strip_floating_markup_fragment(raw_text or "")
|
||
fallback = re.sub(r"[ \t]{2,}", " ", fallback).strip()
|
||
return fallback or _FLOATING_EMPTY_FALLBACK
|
||
|
||
|
||
class _FloatingStreamSanitizer:
|
||
"""Streaming sanitizer that removes floating markup without buffering the full answer."""
|
||
|
||
def __init__(self) -> None:
|
||
self._pending = ""
|
||
|
||
@staticmethod
|
||
def _split_safe_boundary(text: str) -> tuple[str, str]:
|
||
boundary = len(text)
|
||
|
||
last_lt = text.rfind("<")
|
||
if last_lt != -1 and ">" not in text[last_lt:]:
|
||
boundary = min(boundary, last_lt)
|
||
|
||
last_lb = text.rfind("[")
|
||
if last_lb != -1 and "]" not in text[last_lb:]:
|
||
boundary = min(boundary, last_lb)
|
||
|
||
if boundary == len(text):
|
||
return text, ""
|
||
return text[:boundary], text[boundary:]
|
||
|
||
def feed(self, chunk: str) -> str:
|
||
combined = f"{self._pending}{chunk}"
|
||
safe_text, self._pending = self._split_safe_boundary(combined)
|
||
return _strip_floating_markup_fragment(safe_text)
|
||
|
||
def finalize(self) -> str:
|
||
# Drop dangling unfinished wrappers at the very end.
|
||
tail = re.sub(r"<[^>\n]*$", "", self._pending)
|
||
tail = re.sub(r"\[[^\]\n]*$", "", tail)
|
||
self._pending = ""
|
||
return _strip_floating_markup_fragment(tail)
|
||
|
||
|
||
def _normalize_memory_label(path_or_label: str) -> str:
|
||
value = path_or_label.strip()
|
||
if value.startswith("/memories/"):
|
||
value = value[len("/memories/"):]
|
||
value = value.strip("/")
|
||
return value
|
||
|
||
|
||
def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
@tool
|
||
async def memory_list_blocks() -> str:
|
||
"""List all core memory blocks currently stored for the user."""
|
||
logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id)
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
blocks = await memory.list_core_blocks(user_id)
|
||
if not blocks:
|
||
return "No memory blocks found."
|
||
lines = [f"- {b['label']}: {b['value']}" for b in blocks]
|
||
return "Memory blocks:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def memory_get(path_or_label: str) -> str:
|
||
"""Get one memory block by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_get trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
value = await memory.get_core_block(user_id, label)
|
||
if value is None:
|
||
return f"Memory block '{label}' not found."
|
||
return f"Memory block '{label}':\n{value}"
|
||
|
||
@tool
|
||
async def memory_create(path_or_label: str, value: str) -> str:
|
||
"""Create or overwrite a memory block value by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_create trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.update_core(user_id, label, value, trace_id=trace_id)
|
||
return f"Memory block '{label}' saved."
|
||
|
||
@tool
|
||
async def memory_append(path_or_label: str, content: str) -> str:
|
||
"""Append content to a memory block, creating it if missing."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_append trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.append_core(user_id, label, content)
|
||
return f"Memory block '{label}' appended."
|
||
|
||
@tool
|
||
async def memory_replace(path_or_label: str, old_string: str, new_string: str) -> str:
|
||
"""Replace one exact string in a memory block."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_replace trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
changed = await memory.replace_core(user_id, label, old_string, new_string)
|
||
if not changed:
|
||
return f"No replacement made in '{label}' (old string not found)."
|
||
return f"Memory block '{label}' updated."
|
||
|
||
@tool
|
||
async def memory_delete(path_or_label: str) -> str:
|
||
"""Delete a memory block by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_delete trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
deleted = await memory.delete_core(user_id, label)
|
||
if not deleted:
|
||
return f"Memory block '{label}' not found."
|
||
return f"Memory block '{label}' deleted."
|
||
|
||
@tool
|
||
async def archival_memory_insert(content: str) -> str:
|
||
"""Insert a long-term archival memory entry."""
|
||
logger.info("deep_agent: archival_memory_insert trace=%s user=%s", trace_id or "-", user_id)
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.insert_archival(user_id, content, source="assistant")
|
||
return "Archival memory saved."
|
||
|
||
@tool
|
||
async def archival_memory_search(query: str, top_k: int = 5) -> str:
|
||
"""Search long-term archival memory by semantic fallback (keyword currently)."""
|
||
logger.info("deep_agent: archival_memory_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_archival(user_id, query, top_k=top_k)
|
||
if not results:
|
||
return "No archival memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Archival memory results:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def conversation_search(query: str, top_k: int = 5) -> str:
|
||
"""Search recall memory from prior episodic conversation summaries."""
|
||
logger.info("deep_agent: conversation_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_recall(user_id, query, top_k=top_k)
|
||
if not results:
|
||
return "No recall memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Recall memory results:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def search_associative(query: str, limit: int = 5) -> str:
|
||
"""Semantic search across associative (archival) memory for a given query.
|
||
|
||
Use this to surface long-term memories related to a topic, client, or task
|
||
that may not appear in recent episodes.
|
||
|
||
query: natural-language search phrase.
|
||
limit: max results (default 5).
|
||
"""
|
||
logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_archival(user_id, query, top_k=limit)
|
||
if not results:
|
||
return "No associative memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Associative memory results:\n" + "\n".join(lines)
|
||
|
||
return [
|
||
memory_list_blocks,
|
||
memory_get,
|
||
memory_create,
|
||
memory_append,
|
||
memory_replace,
|
||
memory_delete,
|
||
archival_memory_insert,
|
||
archival_memory_search,
|
||
conversation_search,
|
||
search_associative,
|
||
]
|
||
|
||
|
||
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
"""Return memory tools that only read — safe for the read-only brief-agent subset."""
|
||
all_mem = _memory_tools(user_id, trace_id)
|
||
_read_names = {
|
||
"memory_list_blocks", "memory_get", "archival_memory_search",
|
||
"conversation_search", "search_associative",
|
||
}
|
||
return [t for t in all_mem if t.name in _read_names]
|
||
|
||
|
||
def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
"""Return the full tool palette for Stage-1 task brief research (read-only)."""
|
||
return [
|
||
*TASK_TOOLS,
|
||
*PROJECT_TOOLS,
|
||
*NOTE_TOOLS,
|
||
*TIMELINE_TOOLS,
|
||
*CLIENT_TOOLS,
|
||
*_read_only_memory_tools(user_id, trace_id),
|
||
make_query_relations_tool(user_id, trace_id),
|
||
]
|
||
|
||
|
||
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
|
||
return [*_all_tools(), *_memory_tools(user_id, trace_id)]
|
||
|
||
|
||
def _detect_domain_section(message: str) -> FloatingDomainSection | None:
|
||
lowered = message.lower()
|
||
if any(keyword in lowered for keyword in ["timeline", "milestone", "release", "schedule"]):
|
||
return "timeline"
|
||
if any(keyword in lowered for keyword in ["task", "tasks", "todo", "attivit", "azione"]):
|
||
return "task"
|
||
if any(keyword in lowered for keyword in ["note", "notes", "memo", "document"]):
|
||
return "note"
|
||
return None
|
||
|
||
|
||
def _normalize_domain_payload(payload: dict[str, Any], fallback_id: str | None) -> dict[str, str | None]:
|
||
type_raw = str(payload.get("type") or "").strip().lower()
|
||
domain_type: FloatingDomainType = "task"
|
||
if type_raw in {"task", "timeline", "project", "node"}:
|
||
domain_type = type_raw
|
||
|
||
id_value = payload.get("id")
|
||
domain_id = id_value if isinstance(id_value, str) and id_value.strip() else None
|
||
if domain_type == "project" and not domain_id:
|
||
domain_id = fallback_id
|
||
|
||
section_raw = payload.get("section")
|
||
section: FloatingDomainSection | None = None
|
||
if isinstance(section_raw, str):
|
||
section_candidate = section_raw.strip().lower()
|
||
if section_candidate in {"task", "timeline", "note"}:
|
||
section = section_candidate
|
||
|
||
if domain_type != "project":
|
||
section = None
|
||
|
||
return {
|
||
"type": domain_type,
|
||
"id": domain_id,
|
||
"section": section,
|
||
}
|
||
|
||
|
||
def _parse_json_object(text: str) -> dict[str, Any] | None:
|
||
raw = text.strip()
|
||
if not raw:
|
||
return None
|
||
try:
|
||
parsed = json.loads(raw)
|
||
return parsed if isinstance(parsed, dict) else None
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
match = re.search(r"\{.*\}", raw, re.DOTALL)
|
||
if not match:
|
||
return None
|
||
try:
|
||
parsed = json.loads(match.group(0))
|
||
except json.JSONDecodeError:
|
||
return None
|
||
return parsed if isinstance(parsed, dict) else None
|
||
|
||
|
||
def _infer_floating_domain_rule_based(message: str, context: dict[str, Any]) -> dict[str, str | None]:
|
||
section = _detect_domain_section(message)
|
||
scope = context.get("scope") if isinstance(context, dict) else None
|
||
resolved_project_id = context.get("resolved_project_id") if isinstance(context, dict) else None
|
||
project_id = resolved_project_id if isinstance(resolved_project_id, str) and resolved_project_id else None
|
||
|
||
if isinstance(scope, dict):
|
||
scope_type = str(scope.get("type") or "").strip().lower()
|
||
scope_id = scope.get("id")
|
||
scope_id_value = scope_id if isinstance(scope_id, str) and scope_id else None
|
||
|
||
if scope_type in {"task", "tasks"}:
|
||
return {"type": "task", "id": scope_id_value, "section": None}
|
||
if scope_type in {"project", "projects"}:
|
||
project_scope_id = scope_id_value or project_id
|
||
return {
|
||
"type": "project",
|
||
"id": project_scope_id,
|
||
"section": section,
|
||
}
|
||
if scope_type in {"note", "notes"}:
|
||
return {
|
||
"type": "node",
|
||
"id": scope_id_value,
|
||
"section": None,
|
||
}
|
||
if scope_type in {"timeline", "timelines"}:
|
||
return {"type": "timeline", "id": scope_id_value, "section": None}
|
||
|
||
lowered = message.lower()
|
||
if any(keyword in lowered for keyword in ["project", "progetto", "client"]) or project_id:
|
||
return {
|
||
"type": "project",
|
||
"id": project_id,
|
||
"section": section,
|
||
}
|
||
if section == "timeline":
|
||
return {"type": "timeline", "id": None, "section": None}
|
||
if section == "note":
|
||
return {"type": "node", "id": None, "section": None}
|
||
return {"type": "task", "id": None, "section": None}
|
||
|
||
|
||
async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[str, str | None]:
|
||
resolved_project_id = context.get("resolved_project_id") if isinstance(context, dict) else None
|
||
project_id = resolved_project_id if isinstance(resolved_project_id, str) and resolved_project_id else None
|
||
|
||
classifier_context = {
|
||
"scope": context.get("scope") if isinstance(context.get("scope"), dict) else None,
|
||
"resolved_project_id": project_id,
|
||
}
|
||
|
||
try:
|
||
llm = get_agent_llm("classifier")
|
||
classifier_messages = [
|
||
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
|
||
HumanMessage(
|
||
content=(
|
||
f"Message:\n{message}\n\n"
|
||
f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}"
|
||
)
|
||
),
|
||
]
|
||
lf = get_langfuse()
|
||
_, classifier_prompt_obj = get_prompt_or_fallback(
|
||
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
|
||
)
|
||
|
||
# Extract user/session from context for Langfuse attribution
|
||
_debug = context.get("_debug") if isinstance(context, dict) else None
|
||
_lf_user = (_debug or {}).get("user_id") if isinstance(_debug, dict) else None
|
||
_lf_session = (_debug or {}).get("session_id") if isinstance(_debug, dict) else None
|
||
|
||
with langfuse_context(user_id=_lf_user, session_id=_lf_session):
|
||
if lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name="floating-classifier",
|
||
model=model_for_agent("classifier"),
|
||
prompt=classifier_prompt_obj,
|
||
input=classifier_messages,
|
||
) as gen:
|
||
response = await llm.ainvoke(classifier_messages)
|
||
gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
else:
|
||
response = await llm.ainvoke(classifier_messages)
|
||
parsed = _parse_json_object(_as_text(response.content))
|
||
if parsed is not None:
|
||
domain = _normalize_domain_payload(parsed, project_id)
|
||
logger.info(
|
||
"deep_agent: floating_domain_classified type=%s id=%s section=%s",
|
||
domain.get("type"),
|
||
domain.get("id"),
|
||
domain.get("section"),
|
||
)
|
||
return domain
|
||
logger.warning("deep_agent: floating_domain classifier returned non-json output")
|
||
except Exception as exc:
|
||
logger.warning("deep_agent: floating_domain classifier failed: %s", exc)
|
||
|
||
return _infer_floating_domain_rule_based(message, context)
|
||
|
||
|
||
def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]:
|
||
if not history:
|
||
return []
|
||
turns = history[-MAX_HISTORY_TURNS:]
|
||
result: list[Any] = []
|
||
for turn in turns:
|
||
role = turn.get("role", "")
|
||
content = turn.get("content", "")
|
||
if not content:
|
||
continue
|
||
if role == "user":
|
||
result.append(HumanMessage(content=content))
|
||
elif role == "assistant":
|
||
result.append(AIMessage(content=content))
|
||
return result
|
||
|
||
|
||
async def _run_single_agent(
|
||
*,
|
||
user_id: str,
|
||
system_prompt: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
max_steps: int = 6,
|
||
langfuse_prompt: Any = None,
|
||
agent_name: str = "agent",
|
||
conversation_history: list[dict[str, str]] | None = None,
|
||
) -> str:
|
||
trace_id = _trace_id_from_context(context)
|
||
session_id = _session_id_from_context(context)
|
||
lf = get_langfuse()
|
||
llm = get_agent_llm(agent_name)
|
||
tools = _all_tools_for_user(user_id, trace_id)
|
||
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
|
||
llm_with_tools = llm.bind_tools(tools)
|
||
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||
messages: list[Any] = [
|
||
SystemMessage(content=system_prompt),
|
||
*history_messages,
|
||
HumanMessage(content=message),
|
||
]
|
||
|
||
tool_calls_count = 0
|
||
collected: list[dict[str, Any]] = []
|
||
set_tool_result_collector(collected)
|
||
|
||
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
|
||
_lf_ctx.__enter__()
|
||
|
||
_span_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="agent",
|
||
name=agent_name,
|
||
metadata={"user_id": user_id, "session_id": trace_id},
|
||
input=message,
|
||
)
|
||
if lf else None
|
||
)
|
||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||
_messages_to_save: list[Any] | None = None
|
||
|
||
try:
|
||
for _ in range(max_steps):
|
||
_gen_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name=f"{agent_name}-llm",
|
||
model=model_for_agent(agent_name),
|
||
prompt=langfuse_prompt,
|
||
input=messages,
|
||
)
|
||
if lf else None
|
||
)
|
||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||
if _gen_ctx:
|
||
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
_gen_ctx.__exit__(None, None, None)
|
||
|
||
messages.append(response)
|
||
|
||
if not response.tool_calls:
|
||
final_text = _as_text(response.content)
|
||
logger.info(
|
||
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
len(final_text),
|
||
)
|
||
if _span:
|
||
_span.update(output=final_text)
|
||
_messages_to_save = messages[1:] # strip SystemMessage; save full tool history
|
||
return final_text
|
||
|
||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||
for call in response.tool_calls:
|
||
tool_calls_count += 1
|
||
call_id = str(call.get("id", ""))
|
||
call_name = str(call.get("name", ""))
|
||
call_args = call.get("args", {})
|
||
logger.info(
|
||
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
|
||
call_id,
|
||
call_name,
|
||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||
)
|
||
|
||
tool_fn = tool_map.get(call_name)
|
||
if tool_fn is None:
|
||
tool_output = f"Unknown tool: {call_name}"
|
||
elif lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="tool",
|
||
name=call_name,
|
||
input=call_args,
|
||
) as tool_obs:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
tool_obs.update(output=str(tool_output)[:8000])
|
||
else:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
|
||
logger.info(
|
||
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
|
||
call_id,
|
||
call_name,
|
||
str(tool_output)[:1200],
|
||
)
|
||
|
||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||
|
||
final = await llm.ainvoke(messages)
|
||
final_text = _as_text(final.content)
|
||
messages.append(AIMessage(content=final_text))
|
||
logger.info(
|
||
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
len(final_text),
|
||
)
|
||
if _span:
|
||
_span.update(output=final_text)
|
||
_messages_to_save = messages[1:]
|
||
return final_text
|
||
finally:
|
||
if session_id and _messages_to_save is not None:
|
||
session_buffer.set(user_id, session_id, _messages_to_save)
|
||
clear_tool_result_collector()
|
||
if _span_ctx:
|
||
_span_ctx.__exit__(None, None, None)
|
||
_lf_ctx.__exit__(None, None, None)
|
||
if lf:
|
||
lf.flush()
|
||
|
||
|
||
async def _run_single_agent_stream(
|
||
*,
|
||
user_id: str,
|
||
system_prompt: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
max_steps: int = 6,
|
||
langfuse_prompt: Any = None,
|
||
agent_name: str = "agent",
|
||
tools: list[Any] | None = None,
|
||
conversation_history: list[dict[str, str]] | None = None,
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
trace_id = _trace_id_from_context(context)
|
||
session_id = _session_id_from_context(context)
|
||
lf = get_langfuse()
|
||
llm = get_agent_llm(agent_name)
|
||
if tools is None:
|
||
tools = _all_tools_for_user(user_id, trace_id)
|
||
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
|
||
llm_with_tools = llm.bind_tools(tools)
|
||
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||
messages: list[Any] = [
|
||
SystemMessage(content=system_prompt),
|
||
*history_messages,
|
||
HumanMessage(content=message),
|
||
]
|
||
|
||
tool_calls_count = 0
|
||
streamed_chars = 0
|
||
collected: list[dict[str, Any]] = []
|
||
set_tool_result_collector(collected)
|
||
|
||
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
|
||
_lf_ctx.__enter__()
|
||
|
||
_span_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="agent",
|
||
name=f"{agent_name}-stream",
|
||
metadata={"user_id": user_id, "session_id": trace_id},
|
||
input=message,
|
||
)
|
||
if lf else None
|
||
)
|
||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||
streamed_text: list[str] = []
|
||
_messages_to_save: list[Any] | None = None
|
||
|
||
try:
|
||
for _ in range(max_steps):
|
||
_gen_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name=f"{agent_name}-llm",
|
||
model=model_for_agent(agent_name),
|
||
prompt=langfuse_prompt,
|
||
input=messages,
|
||
)
|
||
if lf else None
|
||
)
|
||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||
if _gen_ctx:
|
||
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
_gen_ctx.__exit__(None, None, None)
|
||
|
||
if not response.tool_calls:
|
||
# Yield the content from the ainvoke response directly — no second LLM call.
|
||
# Previously, messages.append(response) was called first, so the re-stream
|
||
# received [System, Human, AI] and regenerated a response without tools bound.
|
||
final_text = _as_text(response.content)
|
||
if final_text:
|
||
streamed_chars += len(final_text)
|
||
streamed_text.append(final_text)
|
||
yield "token", final_text
|
||
logger.info(
|
||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
streamed_chars,
|
||
)
|
||
if _span:
|
||
_span.update(output="".join(streamed_text))
|
||
messages.append(response)
|
||
_messages_to_save = messages[1:] # strip SystemMessage
|
||
return
|
||
|
||
messages.append(response)
|
||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||
for call in response.tool_calls:
|
||
tool_calls_count += 1
|
||
call_id = str(call.get("id", ""))
|
||
call_name = str(call.get("name", ""))
|
||
call_args = call.get("args", {})
|
||
logger.info(
|
||
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
|
||
call_id,
|
||
call_name,
|
||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||
)
|
||
|
||
tool_fn = tool_map.get(call_name)
|
||
if tool_fn is None:
|
||
tool_output = f"Unknown tool: {call_name}"
|
||
elif lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="tool",
|
||
name=call_name,
|
||
input=call_args,
|
||
) as tool_obs:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
tool_obs.update(output=str(tool_output)[:8000])
|
||
else:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
|
||
logger.info(
|
||
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
|
||
call_id,
|
||
call_name,
|
||
str(tool_output)[:1200],
|
||
)
|
||
|
||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||
|
||
fallback_chunks: list[str] = []
|
||
async for chunk in llm.astream(messages):
|
||
token = _as_text(getattr(chunk, "content", ""))
|
||
if token:
|
||
streamed_chars += len(token)
|
||
streamed_text.append(token)
|
||
fallback_chunks.append(token)
|
||
yield "token", token
|
||
messages.append(AIMessage(content="".join(fallback_chunks)))
|
||
_messages_to_save = messages[1:]
|
||
logger.info(
|
||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
streamed_chars,
|
||
)
|
||
if _span:
|
||
_span.update(output="".join(streamed_text))
|
||
finally:
|
||
if session_id and _messages_to_save is not None:
|
||
session_buffer.set(user_id, session_id, _messages_to_save)
|
||
clear_tool_result_collector()
|
||
if _span_ctx:
|
||
_span_ctx.__exit__(None, None, None)
|
||
_lf_ctx.__exit__(None, None, None)
|
||
if lf:
|
||
lf.flush()
|
||
|
||
|
||
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
||
prepared_context = await _prepare_context(message, context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
|
||
response = await _run_single_agent(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="home-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
)
|
||
return _normalize_tagged_list_lines(response, message)
|
||
|
||
|
||
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
|
||
prepared_context = await _prepare_context(message, context)
|
||
domain = await _infer_floating_domain(message, prepared_context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
|
||
response = await _run_single_agent(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="floating-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
)
|
||
sanitized = _strip_floating_markup(response)
|
||
if not sanitized and response:
|
||
sanitized = _fallback_from_raw_floating_text(response)
|
||
return sanitized, domain
|
||
|
||
|
||
async def run_home_stream(
|
||
user_id: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
prepared_context = await _prepare_context(message, context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
|
||
text_chunks: list[str] = []
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="home-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
):
|
||
event_type, data = event
|
||
if event_type != "token":
|
||
yield event
|
||
continue
|
||
text_chunks.append(str(data or ""))
|
||
|
||
normalized = _normalize_tagged_list_lines("".join(text_chunks), message)
|
||
if normalized:
|
||
yield "token", normalized
|
||
|
||
|
||
async def run_floating_stream(
|
||
user_id: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
prepared_context = await _prepare_context(message, context)
|
||
domain = await _infer_floating_domain(message, prepared_context)
|
||
yield "floating_domain", domain
|
||
|
||
brief_mode: bool = bool(context.get("brief_mode"))
|
||
briefing_context_text: str = str(context.get("briefing_context") or "").strip()
|
||
|
||
if brief_mode and briefing_context_text:
|
||
# Stage 2: inject briefing as ground truth context.
|
||
# Pre-substitute {briefing_context} in the template (handles both Langfuse {{}} and fallback {})
|
||
# before compile_prompt sees the remaining standard variables.
|
||
template, langfuse_prompt = get_prompt_or_fallback(
|
||
"task_brief_followup_system",
|
||
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT,
|
||
)
|
||
system_prompt = compile_prompt(
|
||
template, langfuse_prompt,
|
||
date_context=_datetime_context_injection(prepared_context).strip(),
|
||
language_instruction=_language_instruction(prepared_context).strip(),
|
||
user_identity=_user_identity_injection(prepared_context).strip(),
|
||
relational_memory=_relational_memory_injection(prepared_context).strip(),
|
||
proactive_hints=_proactive_hints_injection(prepared_context).strip(),
|
||
request_context=_request_context_block(prepared_context),
|
||
briefing_context=briefing_context_text,
|
||
)
|
||
else:
|
||
system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
|
||
sanitizer = _FloatingStreamSanitizer()
|
||
emitted_sanitized = False
|
||
raw_chunks: list[str] = []
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="floating-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
):
|
||
event_type, data = event
|
||
if event_type != "token":
|
||
yield event
|
||
continue
|
||
|
||
raw_chunk = str(data or "")
|
||
raw_chunks.append(raw_chunk)
|
||
sanitized_chunk = sanitizer.feed(raw_chunk)
|
||
if sanitized_chunk:
|
||
emitted_sanitized = True
|
||
yield "token", sanitized_chunk
|
||
|
||
tail = sanitizer.finalize()
|
||
if tail:
|
||
emitted_sanitized = True
|
||
yield "token", tail
|
||
|
||
if not emitted_sanitized and raw_chunks:
|
||
yield "token", _fallback_from_raw_floating_text("".join(raw_chunks))
|
||
|
||
|
||
async def run_task_brief_research_stream(
|
||
user_id: str,
|
||
task_id: str,
|
||
context: dict[str, Any],
|
||
project_id: str | None = None,
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
"""Stage-1 executive assistant: deep research for one task.
|
||
|
||
Yields ``("token", chunk)`` events like other stream runners.
|
||
The final concatenated text may contain a ``<canvas kind="...">...</canvas>`` block
|
||
which the WS handler strips and emits as a ``canvas_draft`` mutation.
|
||
"""
|
||
from app.agents.folder_agent import FOLDER_TOOLS
|
||
|
||
prepared_context = await _prepare_context(f"task:{task_id}", context)
|
||
tools = [*_brief_research_tools(user_id, _trace_id_from_context(prepared_context)), *FOLDER_TOOLS]
|
||
|
||
# Inject task_id so the agent knows what to look up first.
|
||
research_message = (
|
||
f"Prepare a briefing dossier for task ID: {task_id}\n"
|
||
"Follow the research workflow: read the task, then project, then client, "
|
||
"then cross-project relations, then relevant memory. "
|
||
"End with a concrete suggested first step. "
|
||
"If this is a writing task, include a <canvas kind=\"...\"> draft."
|
||
)
|
||
|
||
system_prompt, langfuse_prompt = _build_system_prompt(
|
||
"task_brief_research_system",
|
||
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT,
|
||
prepared_context,
|
||
)
|
||
|
||
manifest_block = ""
|
||
if project_id:
|
||
manifest = await _fetch_project_manifest(project_id)
|
||
manifest_block = format_folder_manifest(manifest)
|
||
system_prompt = system_prompt + ("\n\n" + manifest_block if manifest_block else "")
|
||
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=research_message,
|
||
context=prepared_context,
|
||
max_steps=12,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="task-brief-agent",
|
||
tools=tools,
|
||
conversation_history=None,
|
||
):
|
||
yield event
|
||
|
||
|
||
async def update_core_memory(user_id: str, key: str, value: str) -> None:
|
||
"""Compatibility helper kept for callers that expect explicit memory update API."""
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.update_core(user_id, key, value)
|