New agent runner. Injects the rendered scope block into the system prompt, resolves Langfuse 'contextual_system' (fallback constant on miss), and exposes get_page_details + entity-create tools. Note-edit tools (propose_note_edit) intentionally excluded — next sprint. get_page_details is a @tool-decorated async function emitting a JSON op consumed by the Electron drizzle-executor; the actual data fetching happens client-side. _contextual_tools() assembles the safe tool palette. Tools follow the existing @tool decorator pattern from langchain_core.tools. NOTE: test_run_contextual.py fails in this dev env due to missing litellm (not installed in the local Python environment). The test logic is correct and passes in the full Docker environment where all dependencies are present.
1691 lines
68 KiB
Python
1691 lines
68 KiB
Python
"""Single-agent runners for home and floating chat contexts."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from datetime import date
|
||
from collections.abc import AsyncGenerator
|
||
from typing import Any, Literal
|
||
|
||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||
from langchain_core.tools import tool
|
||
|
||
from app.agents.client_agent import CLIENT_TOOLS
|
||
from app.agents.note_agent import NOTE_TOOLS
|
||
from app.agents.project_agent import PROJECT_TOOLS
|
||
from app.agents.relations_agent import make_query_relations_tool
|
||
from app.agents.task_agent import TASK_TOOLS
|
||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||
from app.core.agent_session_buffer import session_buffer
|
||
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
|
||
from app.core.llm import get_agent_llm, model_for_agent
|
||
from app.core.memory_middleware import MemoryMiddleware
|
||
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
|
||
from app.db import async_session
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MAX_HISTORY_TURNS = 20
|
||
|
||
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
||
FloatingDomainSection = Literal["task", "timeline", "note"]
|
||
|
||
# Mapping of core-memory language values to natural-language names for prompts.
|
||
_LANGUAGE_NAMES: dict[str, str] = {
|
||
"en": "English", "it": "Italian", "es": "Spanish",
|
||
"fr": "French", "de": "German",
|
||
"english": "English", "italian": "Italian", "italiano": "Italian",
|
||
"spanish": "Spanish", "español": "Spanish",
|
||
"french": "French", "français": "French",
|
||
"german": "German", "deutsch": "German",
|
||
}
|
||
|
||
|
||
def _language_instruction(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt suffix that tells the LLM to respond in the user's language.
|
||
|
||
Returns an empty string when the language is English or unknown — saves tokens.
|
||
"""
|
||
core = context.get("core_memory") or {}
|
||
raw = (core.get("language") or "").strip().lower()
|
||
if not raw:
|
||
return ""
|
||
lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation
|
||
if lang.lower() == "english":
|
||
return ""
|
||
return (
|
||
f"\n\nIMPORTANT: Always respond in {lang}. "
|
||
f"All your output text must be written in {lang}."
|
||
)
|
||
|
||
MANIFEST_TOKEN_BUDGET = 3000 # rough budget for <linked_folder> block
|
||
|
||
|
||
def format_folder_manifest(manifest: dict | None) -> str:
|
||
"""Format a folder manifest into the <linked_folder> block.
|
||
|
||
Truncates by mtime DESC if estimated tokens exceed MANIFEST_TOKEN_BUDGET.
|
||
Returns empty string if manifest is None or has no files.
|
||
"""
|
||
if not manifest or not manifest.get("files"):
|
||
return ""
|
||
files = list(manifest["files"])
|
||
files.sort(key=lambda f: f.get("mtimeMs", 0), reverse=True)
|
||
|
||
header = (
|
||
f"<linked_folder>\npath: {manifest.get('folderPath', '?')} "
|
||
f"({len(files)} files, scanned {manifest.get('lastScannedAt', '?')})\nfiles:\n"
|
||
)
|
||
footer_template = "… {} more files omitted, use read_project_folder_file to access by path\n</linked_folder>"
|
||
|
||
char_budget = MANIFEST_TOKEN_BUDGET * 4 # ~4 chars/token
|
||
body = ""
|
||
included = 0
|
||
for f in files:
|
||
line = f"- /{f['relPath']} [{f.get('kind','text')}] {f.get('summary','')}\n"
|
||
if len(header) + len(body) + len(line) + len(footer_template.format(0)) > char_budget:
|
||
break
|
||
body += line
|
||
included += 1
|
||
omitted = len(files) - included
|
||
if omitted > 0:
|
||
return header + body + footer_template.format(omitted)
|
||
return header + body + "</linked_folder>"
|
||
|
||
|
||
async def _fetch_project_manifest(project_id: str) -> dict | None:
|
||
"""Fetch manifest from Electron via execute_on_client. Returns None if unlinked or error."""
|
||
from app.core.ws_context import execute_on_client
|
||
try:
|
||
result = await execute_on_client(
|
||
action="read_project_folder_manifest",
|
||
data={"projectId": project_id},
|
||
)
|
||
if not result or not result.get("folderPath"):
|
||
return None
|
||
return result
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def build_brief_multi_project_manifest() -> str:
|
||
"""Build a compact multi-project manifest for the daily brief agent.
|
||
|
||
Calls execute_on_client('list_projects_with_folder_manifests') and keeps
|
||
the top 5 most-recently-modified files per project.
|
||
"""
|
||
try:
|
||
result = await execute_on_client(
|
||
action="list_projects_with_folder_manifests",
|
||
data={},
|
||
)
|
||
except Exception:
|
||
return ""
|
||
projects = (result or {}).get("projects") or []
|
||
if not projects:
|
||
return ""
|
||
blocks: list[str] = ["<linked_folders>"]
|
||
any_entry = False
|
||
for p in projects:
|
||
all_files = p.get("files", []) or []
|
||
files = sorted(all_files, key=lambda f: f.get("mtimeMs", 0), reverse=True)[:5]
|
||
blocks.append(f"project: {p.get('projectName','?')} [{p.get('projectId','?')}]")
|
||
blocks.append(f" path: {p.get('folderPath','?')} (scanned {p.get('lastScannedAt','?')})")
|
||
if not all_files:
|
||
blocks.append(" (no indexed files yet — folder is linked but empty or unscanned)")
|
||
else:
|
||
for f in files:
|
||
blocks.append(f" - /{f['relPath']} [{f.get('kind','text')}] {f.get('summary','')}")
|
||
if len(all_files) > 5:
|
||
blocks.append(f" … {len(all_files) - 5} more files (use read_project_folder_file by relPath)")
|
||
any_entry = True
|
||
if not any_entry:
|
||
return ""
|
||
blocks.append("</linked_folders>")
|
||
return "\n".join(blocks)
|
||
|
||
|
||
def _datetime_context_injection(context: dict[str, Any]) -> str:
|
||
"""Build a comprehensive DATE CONTEXT block with pre-computed ms-epoch boundaries for common ranges."""
|
||
fp = context.get("format_prefs")
|
||
if not fp or not isinstance(fp, dict):
|
||
return ""
|
||
try:
|
||
from zoneinfo import ZoneInfo
|
||
from datetime import datetime as _dt, timezone as _utc, timedelta as _td
|
||
|
||
tz_name: str = str(fp.get("timezone") or "UTC")
|
||
now_iso: str = str(fp.get("now_iso") or "")
|
||
date_fmt: str = str(fp.get("date_format") or "dd/MM/yyyy")
|
||
time_fmt: str = str(fp.get("time_format") or "24h")
|
||
|
||
tz = ZoneInfo(tz_name)
|
||
if now_iso:
|
||
now_utc = _dt.fromisoformat(now_iso.replace("Z", "+00:00"))
|
||
else:
|
||
now_utc = _dt.now(_utc.utc)
|
||
|
||
now_ms = int(now_utc.timestamp() * 1000)
|
||
now_local = now_utc.astimezone(tz)
|
||
now_local_str = now_local.strftime("%Y-%m-%d %H:%M")
|
||
weekday_str = now_local.strftime("%A")
|
||
y, m, d = now_local.year, now_local.month, now_local.day
|
||
|
||
def _day(year: int, month: int, day: int) -> tuple[int, int]:
|
||
s = _dt(year, month, day, tzinfo=tz)
|
||
e = s + _td(days=1)
|
||
return int(s.timestamp() * 1000), int(e.timestamp() * 1000) - 1
|
||
|
||
def _between(start: "_dt", end_excl: "_dt") -> tuple[int, int]:
|
||
return int(start.timestamp() * 1000), int(end_excl.timestamp() * 1000) - 1
|
||
|
||
today_s, today_e = _day(y, m, d)
|
||
yd = now_local - _td(days=1)
|
||
yesterday_s, yesterday_e = _day(yd.year, yd.month, yd.day)
|
||
tm = now_local + _td(days=1)
|
||
tomorrow_s, tomorrow_e = _day(tm.year, tm.month, tm.day)
|
||
|
||
# ISO week (Mon–Sun)
|
||
monday = _dt(y, m, d, tzinfo=tz) - _td(days=now_local.weekday())
|
||
last_monday = monday - _td(weeks=1)
|
||
next_monday = monday + _td(weeks=1)
|
||
this_week_s, this_week_e = _between(monday, next_monday)
|
||
last_week_s, last_week_e = _between(last_monday, monday)
|
||
next_week_s, next_week_e = _between(next_monday, next_monday + _td(weeks=1))
|
||
|
||
# Calendar months
|
||
this_m_start = _dt(y, m, 1, tzinfo=tz)
|
||
next_m_start = _dt(y + (m // 12), m % 12 + 1, 1, tzinfo=tz)
|
||
last_m_start = _dt(y - (1 if m == 1 else 0), 12 if m == 1 else m - 1, 1, tzinfo=tz)
|
||
next2_m = next_m_start.month % 12 + 1
|
||
next2_y = next_m_start.year + (1 if next_m_start.month == 12 else 0)
|
||
next2_m_start = _dt(next2_y, next2_m, 1, tzinfo=tz)
|
||
this_month_s, this_month_e = _between(this_m_start, next_m_start)
|
||
last_month_s, last_month_e = _between(last_m_start, this_m_start)
|
||
next_month_s, next_month_e = _between(next_m_start, next2_m_start)
|
||
|
||
# Calendar years
|
||
this_yr_s, this_yr_e = _between(_dt(y, 1, 1, tzinfo=tz), _dt(y + 1, 1, 1, tzinfo=tz))
|
||
last_yr_s, last_yr_e = _between(_dt(y - 1, 1, 1, tzinfo=tz), _dt(y, 1, 1, tzinfo=tz))
|
||
|
||
sunday = monday + _td(days=6)
|
||
last_sunday = last_monday + _td(days=6)
|
||
next_sunday = next_monday + _td(days=6)
|
||
|
||
return (
|
||
f"\n\nDATE CONTEXT (timezone: {tz_name}, dateFormat: {date_fmt}, timeFormat: {time_fmt})\n"
|
||
f"now_local: {now_local_str} ({weekday_str})\n"
|
||
f"now_ms: {now_ms}\n\n"
|
||
f"today [{today_s}, {today_e}] {y:04d}-{m:02d}-{d:02d}\n"
|
||
f"tomorrow [{tomorrow_s}, {tomorrow_e}] {tm.strftime('%Y-%m-%d')}\n"
|
||
f"yesterday [{yesterday_s}, {yesterday_e}] {yd.strftime('%Y-%m-%d')}\n"
|
||
f"this_week [{this_week_s}, {this_week_e}] {monday.strftime('%Y-%m-%d')} → {sunday.strftime('%Y-%m-%d')} (Mon–Sun)\n"
|
||
f"last_week [{last_week_s}, {last_week_e}] {last_monday.strftime('%Y-%m-%d')} → {last_sunday.strftime('%Y-%m-%d')}\n"
|
||
f"next_week [{next_week_s}, {next_week_e}] {next_monday.strftime('%Y-%m-%d')} → {next_sunday.strftime('%Y-%m-%d')}\n"
|
||
f"this_month [{this_month_s}, {this_month_e}] {y:04d}-{m:02d}\n"
|
||
f"last_month [{last_month_s}, {last_month_e}] {last_m_start.strftime('%Y-%m')}\n"
|
||
f"next_month [{next_month_s}, {next_month_e}] {next_m_start.strftime('%Y-%m')}\n"
|
||
f"this_year [{this_yr_s}, {this_yr_e}] {y:04d}\n"
|
||
f"last_year [{last_yr_s}, {last_yr_e}] {y - 1:04d}\n\n"
|
||
f"When calling list_tasks_due_today or list_timelines_today, always pass user_timezone=\"{tz_name}\".\n"
|
||
f"When presenting dates, format using dateFormat={date_fmt} and timeFormat={time_fmt}."
|
||
)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _proactive_hints_injection(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt paragraph listing proactive behavioral hints.
|
||
|
||
Returns empty string when no hints or confidence below threshold.
|
||
Capped at 600 chars.
|
||
"""
|
||
hints: list[str] = context.get("proactive_hints") or []
|
||
if not hints:
|
||
return ""
|
||
body = "\n".join(f"- {h}" for h in hints)
|
||
section = f"\n\nI noticed (behavioral patterns):\n{body}"
|
||
if len(section) > 600:
|
||
section = section[:597] + "..."
|
||
return section
|
||
|
||
|
||
def _relational_memory_injection(context: dict[str, Any]) -> str:
|
||
"""Return a system-prompt paragraph listing known people/projects from relational memory.
|
||
|
||
Returns empty string when no relational rows or tier is Free.
|
||
Capped at 800 chars to control token spend.
|
||
"""
|
||
relations: list[str] = context.get("relational_memory") or []
|
||
if not relations:
|
||
return ""
|
||
body = "\n".join(f"- {r}" for r in relations)
|
||
section = f"\n\nKnown people & projects:\n{body}"
|
||
if len(section) > 800:
|
||
section = section[:797] + "..."
|
||
return section
|
||
|
||
|
||
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
|
||
|
||
|
||
def _user_identity_injection(context: dict[str, Any]) -> str:
|
||
"""Return a compact user-profile block from core memory onboarding fields.
|
||
|
||
Returns empty string when no onboarding keys are present.
|
||
"""
|
||
core = context.get("core_memory") or {}
|
||
parts: list[str] = []
|
||
for key in _IDENTITY_KEYS:
|
||
val = (core.get(key) or "").strip()
|
||
if val:
|
||
parts.append(f"- {key}: {val}")
|
||
if not parts:
|
||
return ""
|
||
return "\n\nUser profile:\n" + "\n".join(parts)
|
||
|
||
|
||
def _request_context_block(context: dict[str, Any]) -> str:
|
||
"""Return a small block with per-request scope and resolved project context."""
|
||
parts: list[str] = []
|
||
scope = context.get("scope")
|
||
if scope and isinstance(scope, dict):
|
||
parts.append(f"scope: {json.dumps(scope, ensure_ascii=True)}")
|
||
resolved = context.get("resolved_project_id")
|
||
if resolved and isinstance(resolved, str):
|
||
parts.append(f"resolved_project_id: {resolved}")
|
||
return "\n".join(parts)
|
||
|
||
|
||
_HOME_SYSTEM_PROMPT = """\
|
||
You are adiuvAI's home executive assistant.{user_identity}
|
||
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
|
||
|
||
# How you work
|
||
- Use tools before answering anything factual. Never guess counts, dates, or status.
|
||
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
|
||
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
|
||
- Match the user's tone preference. Default to warm-but-direct; stay concise.
|
||
- When the user asks to remember, forget, or update something, use memory tools.
|
||
|
||
# Filter discipline
|
||
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||
- When in doubt, omit `assignee` and return the global result.
|
||
|
||
# Output format
|
||
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
|
||
<project>id</project> <task>id</task> <note>id</note> <timeline>id</timeline>
|
||
|
||
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
|
||
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
|
||
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
|
||
3. One short closing recap (1–2 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
|
||
|
||
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
|
||
|
||
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
|
||
<chart>{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}</chart>
|
||
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
|
||
- data rows must include a "name" field; numeric series keys must match config keys.
|
||
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
|
||
|
||
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
|
||
|
||
# Date filtering
|
||
{date_context}
|
||
|
||
When filtering tasks/timelines/notes by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
|
||
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
|
||
For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_today with user_timezone from DATE CONTEXT.
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Behavioral hints
|
||
{proactive_hints}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_FLOATING_SYSTEM_PROMPT = """\
|
||
You are adiuvAI's floating executive assistant.{user_identity}
|
||
You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope.
|
||
Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most.
|
||
|
||
# How you work
|
||
- Use tools before answering anything factual. Never guess.
|
||
- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant.
|
||
- Match the user's tone preference. Default to warm-but-direct.
|
||
- When the user asks to remember, forget, or update something, use memory tools.
|
||
|
||
# Filter discipline
|
||
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||
- When in doubt, omit `assignee` and return the global result.
|
||
|
||
# Output format
|
||
Plain text only. Do NOT output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed-id wrappers, and do NOT output <chart> blocks — those are for the home assistant.
|
||
|
||
# Date filtering
|
||
{date_context}
|
||
|
||
When filtering by date, take dueDateFrom / dueDateTo (ms epoch UTC) verbatim from the DATE CONTEXT boundary table above. Do NOT compute boundaries from now_ms yourself.
|
||
For specific dates not listed, compute local-midnight in the user timezone and convert to UTC ms.
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Behavioral hints
|
||
{proactive_hints}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_CONTEXTUAL_SYSTEM_PROMPT = """You are adiuvAI's contextual assistant. The user is working inside the app and has opened a side chat anchored to a specific view ("current view"). Help them act on that view: recap, plan, create entities, answer questions.
|
||
|
||
Rules:
|
||
1. Base context (current view summary) is provided every turn. Treat it as ground truth for ids and names; never invent them.
|
||
2. When the user asks about details not in the base context (e.g. "what tasks are blocking the launch milestone"), call `get_page_details` for the relevant entity before answering. Don't guess.
|
||
3. When the user requests an action that creates or updates an entity:
|
||
- If the current view is a project and no project is specified, use the current project automatically.
|
||
- If the current view is the global Tasks / Projects / Timeline list and no project is specified, ASK before attaching to any project. Don't silently create orphan entities.
|
||
4. The current view can change mid-conversation (user navigates). When you see a system message "User navigated to ...", treat the new view as the active context. Prior turns remain visible but the active scope shifts.
|
||
5. Notes: you can read note bodies via `get_page_details({entityType:'note'})`. You CANNOT edit, summarize-to-replace, or append. Tell the user "note editing is coming in a later release" if asked.
|
||
6. Be concise. Default to 1-3 short paragraphs. Bullet lists fine. Don't restate the user's request.
|
||
7. Never expose ids in prose. Use names. Ids only travel through tool calls.
|
||
"""
|
||
|
||
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT = """\
|
||
You are an executive assistant preparing a briefing dossier for your principal before they act on a specific task.
|
||
Your job: gather all relevant context, synthesize it into a tight actionable dossier, and — if the task requires writing (email, message, document) — produce a ready-to-use draft.{user_identity}
|
||
|
||
# Research workflow
|
||
Follow these steps in order, using tools:
|
||
1. Read the task fully (title, description, due date, priority, status, project, comments).
|
||
2. Fetch the parent project (`get_project`) to understand scope, aiSummary, and any linked client.
|
||
3. If the project has a clientId: call `get_client(id)` to retrieve full client details.
|
||
4. Call `query_relations` (subject_label=client_name or task subject) to find cross-project connections — e.g. the same client appearing in multiple projects.
|
||
5. Search associative memory (`search_associative`) and archival memory (`archival_memory_search`) using the task title + client name as query phrases to surface relevant past interactions.
|
||
6. Read core memory blocks for tone preference, language, and user style: `memory_get("tone_preference")`, `memory_get("language")`.
|
||
7. Determine task kind: is this a writing task (email reply, message, follow-up, proposal)? If yes, draft a ready-to-send piece.
|
||
|
||
# Output structure
|
||
Write the briefing in the user's language. Use this exact structure:
|
||
|
||
**What needs to be done**
|
||
(1–2 sentences, concrete and specific — what action the user must take)
|
||
|
||
**Context you should know**
|
||
(bullet points covering: client background, related projects, prior interactions, tone/style notes, any relevant deadlines or dependencies)
|
||
|
||
**Suggested first step**
|
||
(one specific, immediately actionable instruction)
|
||
|
||
If this is a writing task, append a canvas block at the very end:
|
||
<canvas kind="email|document|message">
|
||
...ready-to-use draft here...
|
||
</canvas>
|
||
|
||
Do NOT include the canvas block for non-writing tasks.
|
||
Do NOT repeat verbatim task fields the user already sees in the UI.
|
||
Be concrete — no vague advice. Every bullet should be a fact that changes what the user does.
|
||
|
||
# Date context
|
||
{date_context}
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT = """\
|
||
You are an executive assistant continuing a conversation with your principal.
|
||
You have already prepared and delivered a research briefing for the active task. The user has read it.{user_identity}
|
||
|
||
Your briefing:
|
||
---
|
||
{briefing_context}
|
||
---
|
||
|
||
Continue from here. Do NOT repeat the briefing. Refer to it when relevant.
|
||
Help the user execute: edit drafts, refine wording, look up additional details, plan next steps.
|
||
Stay terse — your principal is a busy executive.
|
||
|
||
# Date context
|
||
{date_context}
|
||
|
||
# Language
|
||
{language_instruction}
|
||
|
||
# Known people & projects
|
||
{relational_memory}
|
||
|
||
# Request context
|
||
{request_context}\
|
||
"""
|
||
|
||
_FLOATING_DOMAIN_CLASSIFIER_PROMPT = (
|
||
"You are a strict domain classifier for websocket floating requests. "
|
||
"Return ONLY a JSON object with keys: type, id, section. "
|
||
"Allowed type values: task, timeline, project, node. "
|
||
"Allowed section values: task, timeline, note, or null. "
|
||
"Rules: infer from user message intent first; do not blindly trust scope.type. "
|
||
"If user asks tasks/timeline/notes for a project, set type=project and section accordingly. "
|
||
"If project id is unknown but context.resolved_project_id exists, use it as id. "
|
||
"If id is unknown, use null. "
|
||
"No markdown, no prose, JSON only."
|
||
)
|
||
|
||
|
||
def _as_text(content: Any) -> str:
|
||
if content is None:
|
||
return ""
|
||
if isinstance(content, str):
|
||
return content
|
||
if isinstance(content, list):
|
||
parts: list[str] = []
|
||
for item in content:
|
||
if isinstance(item, str):
|
||
parts.append(item)
|
||
elif isinstance(item, dict):
|
||
text = item.get("text")
|
||
if isinstance(text, str):
|
||
parts.append(text)
|
||
return "".join(parts)
|
||
return str(content)
|
||
|
||
|
||
def _candidate_tokens(message: str) -> list[str]:
|
||
tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower())
|
||
return [token for token in tokens if len(token) >= 3]
|
||
|
||
|
||
async def _resolve_project_id_from_message(message: str) -> str | None:
|
||
"""Resolve likely project UUID from user message using client project list."""
|
||
try:
|
||
result = await execute_on_client(action="select", table="projects")
|
||
except Exception as exc:
|
||
logger.warning("deep_agent: project resolve select failed: %s", exc)
|
||
return None
|
||
|
||
rows = result.get("rows", [])
|
||
if not isinstance(rows, list) or not rows:
|
||
return None
|
||
|
||
tokens = _candidate_tokens(message)
|
||
scored: list[tuple[int, dict[str, Any]]] = []
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
name = str(row.get("name", "")).lower()
|
||
score = sum(1 for token in tokens if token in name)
|
||
if score > 0:
|
||
scored.append((score, row))
|
||
|
||
if not scored:
|
||
return None
|
||
|
||
scored.sort(key=lambda item: item[0], reverse=True)
|
||
top_score = scored[0][0]
|
||
top_rows = [row for score, row in scored if score == top_score]
|
||
if len(top_rows) != 1:
|
||
return None
|
||
|
||
project_id = top_rows[0].get("id")
|
||
return project_id if isinstance(project_id, str) else None
|
||
|
||
|
||
def _needs_project_resolution(message: str) -> bool:
|
||
lowered = message.lower()
|
||
return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"])
|
||
|
||
|
||
async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]:
|
||
prepared = dict(context)
|
||
if _needs_project_resolution(message):
|
||
resolved_project_id = await _resolve_project_id_from_message(message)
|
||
if resolved_project_id:
|
||
prepared["resolved_project_id"] = resolved_project_id
|
||
logger.info("deep_agent: resolved_project_id=%s", resolved_project_id)
|
||
return prepared
|
||
|
||
|
||
def _all_tools() -> list[Any]:
|
||
return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS]
|
||
|
||
|
||
# ── Contextual sidebar tools ──────────────────────────────────────────
|
||
|
||
|
||
@tool
|
||
async def get_page_details(
|
||
entity_type: str = "",
|
||
entity_id: str = "",
|
||
) -> str:
|
||
"""Fetch full details for the entity currently in view.
|
||
|
||
entity_type: one of 'project' | 'task' | 'note' | 'timeline_event' |
|
||
'tasks_all' | 'projects_all' | 'timeline_all'.
|
||
entity_id: UUID of the entity for singular entity views. Omit for list views.
|
||
|
||
The Electron drizzle-executor fulfils this op against local SQLite and
|
||
returns the row(s) as a JSON tool result.
|
||
"""
|
||
result = await execute_on_client(
|
||
action="get_page_details",
|
||
table=entity_type or "unknown",
|
||
data={"entityId": entity_id or None},
|
||
)
|
||
if not result:
|
||
return "No details found."
|
||
return str(result)
|
||
|
||
|
||
def _contextual_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
"""Return the tool palette for the contextual sidebar agent.
|
||
|
||
Includes get_page_details, entity-create/update tools, and memory tools.
|
||
Note-edit tools (propose_note_edit) are intentionally excluded — next sprint.
|
||
"""
|
||
from app.agents.note_agent import create_note, list_notes, get_note # noqa: PLC0415
|
||
from app.agents.task_agent import create_task, update_task, list_tasks # noqa: PLC0415
|
||
from app.agents.timeline_agent import create_timeline, list_timelines # noqa: PLC0415
|
||
from app.agents.project_agent import PROJECT_TOOLS # noqa: PLC0415
|
||
|
||
return [
|
||
get_page_details,
|
||
create_task,
|
||
update_task,
|
||
list_tasks,
|
||
create_note,
|
||
list_notes,
|
||
get_note,
|
||
create_timeline,
|
||
list_timelines,
|
||
*PROJECT_TOOLS,
|
||
*_memory_tools(user_id, trace_id),
|
||
]
|
||
|
||
|
||
def _trace_id_from_context(context: dict[str, Any]) -> str | None:
|
||
debug = context.get("_debug")
|
||
if isinstance(debug, dict):
|
||
request_id = debug.get("request_id")
|
||
if isinstance(request_id, str) and request_id:
|
||
return request_id
|
||
return None
|
||
|
||
|
||
def _session_id_from_context(context: dict[str, Any]) -> str | None:
|
||
debug = context.get("_debug")
|
||
if isinstance(debug, dict):
|
||
session_id = debug.get("session_id")
|
||
if isinstance(session_id, str) and session_id:
|
||
return session_id
|
||
return None
|
||
|
||
|
||
def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> tuple[str, Any]:
|
||
"""Fetch Langfuse template and compile all per-request slots into one system prompt."""
|
||
template, prompt_obj = get_prompt_or_fallback(name, fallback)
|
||
text = compile_prompt(
|
||
template, prompt_obj,
|
||
date_context=_datetime_context_injection(context).strip(),
|
||
language_instruction=_language_instruction(context).strip(),
|
||
user_identity=_user_identity_injection(context).strip(),
|
||
relational_memory=_relational_memory_injection(context).strip(),
|
||
proactive_hints=_proactive_hints_injection(context).strip(),
|
||
request_context=_request_context_block(context),
|
||
)
|
||
return text, prompt_obj
|
||
|
||
|
||
_TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]</\1>")
|
||
_TIMELINE_DMY_RE = re.compile(r"(?P<d>\d{2})/(?P<m>\d{2})/(?P<y>\d{4})")
|
||
|
||
|
||
def _is_upcoming_timeline_query(message: str) -> bool:
|
||
lowered = message.lower()
|
||
has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered
|
||
has_timeline_topic = any(
|
||
token in lowered
|
||
for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden")
|
||
)
|
||
return has_upcoming and has_timeline_topic
|
||
|
||
|
||
def _timeline_date_in_current_month_or_future(dmy: str) -> bool:
|
||
match = _TIMELINE_DMY_RE.search(dmy)
|
||
if not match:
|
||
return True
|
||
try:
|
||
parsed = date(
|
||
int(match.group("y")),
|
||
int(match.group("m")),
|
||
int(match.group("d")),
|
||
)
|
||
except ValueError:
|
||
return True
|
||
|
||
today = date.today()
|
||
return parsed >= today and parsed.year == today.year and parsed.month == today.month
|
||
|
||
|
||
def _normalize_tagged_list_lines(text: str, message: str) -> str:
|
||
if not text:
|
||
return text
|
||
|
||
upcoming_timeline_only = _is_upcoming_timeline_query(message)
|
||
output_lines: list[str] = []
|
||
|
||
for line in text.splitlines():
|
||
matches = list(_TAG_LINE_RE.finditer(line))
|
||
if not matches:
|
||
output_lines.append(line)
|
||
continue
|
||
|
||
had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)")
|
||
if not had_non_tag_text and len(matches) == 1:
|
||
tag_text = matches[0].group(0)
|
||
if (
|
||
upcoming_timeline_only
|
||
and "<timeline>" in tag_text
|
||
and not _timeline_date_in_current_month_or_future(line)
|
||
):
|
||
continue
|
||
output_lines.append(tag_text)
|
||
continue
|
||
|
||
for match in matches:
|
||
tag_text = match.group(0)
|
||
if (
|
||
upcoming_timeline_only
|
||
and "<timeline>" in tag_text
|
||
and not _timeline_date_in_current_month_or_future(line)
|
||
):
|
||
continue
|
||
output_lines.append(tag_text)
|
||
|
||
return "\n".join(output_lines)
|
||
|
||
|
||
_GENERIC_TAG_RE = re.compile(r"</?(task|project|note|timeline|chart)>", re.IGNORECASE)
|
||
_BRACKETED_ID_RE = re.compile(r"\[(?:[0-9a-fA-F-]{8,}|[A-Za-z0-9_-]{8,})\]")
|
||
_FLOATING_EMPTY_FALLBACK = "No results found."
|
||
|
||
|
||
def _strip_floating_markup_fragment(text: str) -> str:
|
||
if not text:
|
||
return text
|
||
cleaned = _GENERIC_TAG_RE.sub("", text)
|
||
return _BRACKETED_ID_RE.sub("", cleaned)
|
||
|
||
|
||
def _strip_floating_markup(text: str) -> str:
|
||
"""Ensure floating responses stay plain text with no XML-like tag wrappers."""
|
||
if not text:
|
||
return text
|
||
|
||
cleaned = _strip_floating_markup_fragment(text)
|
||
# Collapse excessive spaces introduced by tag/id removal while preserving lines.
|
||
lines = [re.sub(r"[ \t]{2,}", " ", line).strip() for line in cleaned.splitlines()]
|
||
return "\n".join(line for line in lines if line)
|
||
|
||
|
||
def _fallback_from_raw_floating_text(raw_text: str) -> str:
|
||
fallback = _strip_floating_markup_fragment(raw_text or "")
|
||
fallback = re.sub(r"[ \t]{2,}", " ", fallback).strip()
|
||
return fallback or _FLOATING_EMPTY_FALLBACK
|
||
|
||
|
||
class _FloatingStreamSanitizer:
|
||
"""Streaming sanitizer that removes floating markup without buffering the full answer."""
|
||
|
||
def __init__(self) -> None:
|
||
self._pending = ""
|
||
|
||
@staticmethod
|
||
def _split_safe_boundary(text: str) -> tuple[str, str]:
|
||
boundary = len(text)
|
||
|
||
last_lt = text.rfind("<")
|
||
if last_lt != -1 and ">" not in text[last_lt:]:
|
||
boundary = min(boundary, last_lt)
|
||
|
||
last_lb = text.rfind("[")
|
||
if last_lb != -1 and "]" not in text[last_lb:]:
|
||
boundary = min(boundary, last_lb)
|
||
|
||
if boundary == len(text):
|
||
return text, ""
|
||
return text[:boundary], text[boundary:]
|
||
|
||
def feed(self, chunk: str) -> str:
|
||
combined = f"{self._pending}{chunk}"
|
||
safe_text, self._pending = self._split_safe_boundary(combined)
|
||
return _strip_floating_markup_fragment(safe_text)
|
||
|
||
def finalize(self) -> str:
|
||
# Drop dangling unfinished wrappers at the very end.
|
||
tail = re.sub(r"<[^>\n]*$", "", self._pending)
|
||
tail = re.sub(r"\[[^\]\n]*$", "", tail)
|
||
self._pending = ""
|
||
return _strip_floating_markup_fragment(tail)
|
||
|
||
|
||
def _normalize_memory_label(path_or_label: str) -> str:
|
||
value = path_or_label.strip()
|
||
if value.startswith("/memories/"):
|
||
value = value[len("/memories/"):]
|
||
value = value.strip("/")
|
||
return value
|
||
|
||
|
||
def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
@tool
|
||
async def memory_list_blocks() -> str:
|
||
"""List all core memory blocks currently stored for the user."""
|
||
logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id)
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
blocks = await memory.list_core_blocks(user_id)
|
||
if not blocks:
|
||
return "No memory blocks found."
|
||
lines = [f"- {b['label']}: {b['value']}" for b in blocks]
|
||
return "Memory blocks:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def memory_get(path_or_label: str) -> str:
|
||
"""Get one memory block by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_get trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
value = await memory.get_core_block(user_id, label)
|
||
if value is None:
|
||
return f"Memory block '{label}' not found."
|
||
return f"Memory block '{label}':\n{value}"
|
||
|
||
@tool
|
||
async def memory_create(path_or_label: str, value: str) -> str:
|
||
"""Create or overwrite a memory block value by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_create trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.update_core(user_id, label, value, trace_id=trace_id)
|
||
return f"Memory block '{label}' saved."
|
||
|
||
@tool
|
||
async def memory_append(path_or_label: str, content: str) -> str:
|
||
"""Append content to a memory block, creating it if missing."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_append trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.append_core(user_id, label, content)
|
||
return f"Memory block '{label}' appended."
|
||
|
||
@tool
|
||
async def memory_replace(path_or_label: str, old_string: str, new_string: str) -> str:
|
||
"""Replace one exact string in a memory block."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_replace trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
changed = await memory.replace_core(user_id, label, old_string, new_string)
|
||
if not changed:
|
||
return f"No replacement made in '{label}' (old string not found)."
|
||
return f"Memory block '{label}' updated."
|
||
|
||
@tool
|
||
async def memory_delete(path_or_label: str) -> str:
|
||
"""Delete a memory block by label or /memories/<label> path."""
|
||
label = _normalize_memory_label(path_or_label)
|
||
logger.info("deep_agent: memory_delete trace=%s user=%s label=%s", trace_id or "-", user_id, label)
|
||
if not label:
|
||
return "Invalid memory label."
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
deleted = await memory.delete_core(user_id, label)
|
||
if not deleted:
|
||
return f"Memory block '{label}' not found."
|
||
return f"Memory block '{label}' deleted."
|
||
|
||
@tool
|
||
async def archival_memory_insert(content: str) -> str:
|
||
"""Insert a long-term archival memory entry."""
|
||
logger.info("deep_agent: archival_memory_insert trace=%s user=%s", trace_id or "-", user_id)
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.insert_archival(user_id, content, source="assistant")
|
||
return "Archival memory saved."
|
||
|
||
@tool
|
||
async def archival_memory_search(query: str, top_k: int = 5) -> str:
|
||
"""Search long-term archival memory by semantic fallback (keyword currently)."""
|
||
logger.info("deep_agent: archival_memory_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_archival(user_id, query, top_k=top_k)
|
||
if not results:
|
||
return "No archival memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Archival memory results:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def conversation_search(query: str, top_k: int = 5) -> str:
|
||
"""Search recall memory from prior episodic conversation summaries."""
|
||
logger.info("deep_agent: conversation_search trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_recall(user_id, query, top_k=top_k)
|
||
if not results:
|
||
return "No recall memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Recall memory results:\n" + "\n".join(lines)
|
||
|
||
@tool
|
||
async def search_associative(query: str, limit: int = 5) -> str:
|
||
"""Semantic search across associative (archival) memory for a given query.
|
||
|
||
Use this to surface long-term memories related to a topic, client, or task
|
||
that may not appear in recent episodes.
|
||
|
||
query: natural-language search phrase.
|
||
limit: max results (default 5).
|
||
"""
|
||
logger.info("deep_agent: search_associative trace=%s user=%s query=%s", trace_id or "-", user_id, query[:80])
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
results = await memory.search_archival(user_id, query, top_k=limit)
|
||
if not results:
|
||
return "No associative memory results found."
|
||
lines = [f"- {item}" for item in results]
|
||
return "Associative memory results:\n" + "\n".join(lines)
|
||
|
||
return [
|
||
memory_list_blocks,
|
||
memory_get,
|
||
memory_create,
|
||
memory_append,
|
||
memory_replace,
|
||
memory_delete,
|
||
archival_memory_insert,
|
||
archival_memory_search,
|
||
conversation_search,
|
||
search_associative,
|
||
]
|
||
|
||
|
||
def _read_only_memory_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
"""Return memory tools that only read — safe for the read-only brief-agent subset."""
|
||
all_mem = _memory_tools(user_id, trace_id)
|
||
_read_names = {
|
||
"memory_list_blocks", "memory_get", "archival_memory_search",
|
||
"conversation_search", "search_associative",
|
||
}
|
||
return [t for t in all_mem if t.name in _read_names]
|
||
|
||
|
||
def _brief_research_tools(user_id: str, trace_id: str | None) -> list[Any]:
|
||
"""Return the full tool palette for Stage-1 task brief research (read-only)."""
|
||
return [
|
||
*TASK_TOOLS,
|
||
*PROJECT_TOOLS,
|
||
*NOTE_TOOLS,
|
||
*TIMELINE_TOOLS,
|
||
*CLIENT_TOOLS,
|
||
*_read_only_memory_tools(user_id, trace_id),
|
||
make_query_relations_tool(user_id, trace_id),
|
||
]
|
||
|
||
|
||
def _all_tools_for_user(user_id: str, trace_id: str | None) -> list[Any]:
|
||
return [*_all_tools(), *_memory_tools(user_id, trace_id)]
|
||
|
||
|
||
def _detect_domain_section(message: str) -> FloatingDomainSection | None:
|
||
lowered = message.lower()
|
||
if any(keyword in lowered for keyword in ["timeline", "milestone", "release", "schedule"]):
|
||
return "timeline"
|
||
if any(keyword in lowered for keyword in ["task", "tasks", "todo", "attivit", "azione"]):
|
||
return "task"
|
||
if any(keyword in lowered for keyword in ["note", "notes", "memo", "document"]):
|
||
return "note"
|
||
return None
|
||
|
||
|
||
def _normalize_domain_payload(payload: dict[str, Any], fallback_id: str | None) -> dict[str, str | None]:
|
||
type_raw = str(payload.get("type") or "").strip().lower()
|
||
domain_type: FloatingDomainType = "task"
|
||
if type_raw in {"task", "timeline", "project", "node"}:
|
||
domain_type = type_raw
|
||
|
||
id_value = payload.get("id")
|
||
domain_id = id_value if isinstance(id_value, str) and id_value.strip() else None
|
||
if domain_type == "project" and not domain_id:
|
||
domain_id = fallback_id
|
||
|
||
section_raw = payload.get("section")
|
||
section: FloatingDomainSection | None = None
|
||
if isinstance(section_raw, str):
|
||
section_candidate = section_raw.strip().lower()
|
||
if section_candidate in {"task", "timeline", "note"}:
|
||
section = section_candidate
|
||
|
||
if domain_type != "project":
|
||
section = None
|
||
|
||
return {
|
||
"type": domain_type,
|
||
"id": domain_id,
|
||
"section": section,
|
||
}
|
||
|
||
|
||
def _parse_json_object(text: str) -> dict[str, Any] | None:
|
||
raw = text.strip()
|
||
if not raw:
|
||
return None
|
||
try:
|
||
parsed = json.loads(raw)
|
||
return parsed if isinstance(parsed, dict) else None
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
match = re.search(r"\{.*\}", raw, re.DOTALL)
|
||
if not match:
|
||
return None
|
||
try:
|
||
parsed = json.loads(match.group(0))
|
||
except json.JSONDecodeError:
|
||
return None
|
||
return parsed if isinstance(parsed, dict) else None
|
||
|
||
|
||
def _infer_floating_domain_rule_based(message: str, context: dict[str, Any]) -> dict[str, str | None]:
|
||
section = _detect_domain_section(message)
|
||
scope = context.get("scope") if isinstance(context, dict) else None
|
||
resolved_project_id = context.get("resolved_project_id") if isinstance(context, dict) else None
|
||
project_id = resolved_project_id if isinstance(resolved_project_id, str) and resolved_project_id else None
|
||
|
||
if isinstance(scope, dict):
|
||
scope_type = str(scope.get("type") or "").strip().lower()
|
||
scope_id = scope.get("id")
|
||
scope_id_value = scope_id if isinstance(scope_id, str) and scope_id else None
|
||
|
||
if scope_type in {"task", "tasks"}:
|
||
return {"type": "task", "id": scope_id_value, "section": None}
|
||
if scope_type in {"project", "projects"}:
|
||
project_scope_id = scope_id_value or project_id
|
||
return {
|
||
"type": "project",
|
||
"id": project_scope_id,
|
||
"section": section,
|
||
}
|
||
if scope_type in {"note", "notes"}:
|
||
return {
|
||
"type": "node",
|
||
"id": scope_id_value,
|
||
"section": None,
|
||
}
|
||
if scope_type in {"timeline", "timelines"}:
|
||
return {"type": "timeline", "id": scope_id_value, "section": None}
|
||
|
||
lowered = message.lower()
|
||
if any(keyword in lowered for keyword in ["project", "progetto", "client"]) or project_id:
|
||
return {
|
||
"type": "project",
|
||
"id": project_id,
|
||
"section": section,
|
||
}
|
||
if section == "timeline":
|
||
return {"type": "timeline", "id": None, "section": None}
|
||
if section == "note":
|
||
return {"type": "node", "id": None, "section": None}
|
||
return {"type": "task", "id": None, "section": None}
|
||
|
||
|
||
async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[str, str | None]:
|
||
resolved_project_id = context.get("resolved_project_id") if isinstance(context, dict) else None
|
||
project_id = resolved_project_id if isinstance(resolved_project_id, str) and resolved_project_id else None
|
||
|
||
classifier_context = {
|
||
"scope": context.get("scope") if isinstance(context.get("scope"), dict) else None,
|
||
"resolved_project_id": project_id,
|
||
}
|
||
|
||
try:
|
||
llm = get_agent_llm("classifier")
|
||
classifier_messages = [
|
||
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
|
||
HumanMessage(
|
||
content=(
|
||
f"Message:\n{message}\n\n"
|
||
f"Context:\n{json.dumps(classifier_context, ensure_ascii=True)}"
|
||
)
|
||
),
|
||
]
|
||
lf = get_langfuse()
|
||
_, classifier_prompt_obj = get_prompt_or_fallback(
|
||
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
|
||
)
|
||
|
||
# Extract user/session from context for Langfuse attribution
|
||
_debug = context.get("_debug") if isinstance(context, dict) else None
|
||
_lf_user = (_debug or {}).get("user_id") if isinstance(_debug, dict) else None
|
||
_lf_session = (_debug or {}).get("session_id") if isinstance(_debug, dict) else None
|
||
|
||
with langfuse_context(user_id=_lf_user, session_id=_lf_session):
|
||
if lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name="floating-classifier",
|
||
model=model_for_agent("classifier"),
|
||
prompt=classifier_prompt_obj,
|
||
input=classifier_messages,
|
||
) as gen:
|
||
response = await llm.ainvoke(classifier_messages)
|
||
gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
else:
|
||
response = await llm.ainvoke(classifier_messages)
|
||
parsed = _parse_json_object(_as_text(response.content))
|
||
if parsed is not None:
|
||
domain = _normalize_domain_payload(parsed, project_id)
|
||
logger.info(
|
||
"deep_agent: floating_domain_classified type=%s id=%s section=%s",
|
||
domain.get("type"),
|
||
domain.get("id"),
|
||
domain.get("section"),
|
||
)
|
||
return domain
|
||
logger.warning("deep_agent: floating_domain classifier returned non-json output")
|
||
except Exception as exc:
|
||
logger.warning("deep_agent: floating_domain classifier failed: %s", exc)
|
||
|
||
return _infer_floating_domain_rule_based(message, context)
|
||
|
||
|
||
def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]:
|
||
if not history:
|
||
return []
|
||
turns = history[-MAX_HISTORY_TURNS:]
|
||
result: list[Any] = []
|
||
for turn in turns:
|
||
role = turn.get("role", "")
|
||
content = turn.get("content", "")
|
||
if not content:
|
||
continue
|
||
if role == "user":
|
||
result.append(HumanMessage(content=content))
|
||
elif role == "assistant":
|
||
result.append(AIMessage(content=content))
|
||
return result
|
||
|
||
|
||
async def _run_single_agent(
|
||
*,
|
||
user_id: str,
|
||
system_prompt: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
max_steps: int = 6,
|
||
langfuse_prompt: Any = None,
|
||
agent_name: str = "agent",
|
||
conversation_history: list[dict[str, str]] | None = None,
|
||
) -> str:
|
||
trace_id = _trace_id_from_context(context)
|
||
session_id = _session_id_from_context(context)
|
||
lf = get_langfuse()
|
||
llm = get_agent_llm(agent_name)
|
||
tools = _all_tools_for_user(user_id, trace_id)
|
||
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
|
||
llm_with_tools = llm.bind_tools(tools)
|
||
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||
messages: list[Any] = [
|
||
SystemMessage(content=system_prompt),
|
||
*history_messages,
|
||
HumanMessage(content=message),
|
||
]
|
||
|
||
tool_calls_count = 0
|
||
collected: list[dict[str, Any]] = []
|
||
set_tool_result_collector(collected)
|
||
|
||
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
|
||
_lf_ctx.__enter__()
|
||
|
||
_span_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="agent",
|
||
name=agent_name,
|
||
metadata={"user_id": user_id, "session_id": trace_id},
|
||
input=message,
|
||
)
|
||
if lf else None
|
||
)
|
||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||
_messages_to_save: list[Any] | None = None
|
||
|
||
try:
|
||
for _ in range(max_steps):
|
||
_gen_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name=f"{agent_name}-llm",
|
||
model=model_for_agent(agent_name),
|
||
prompt=langfuse_prompt,
|
||
input=messages,
|
||
)
|
||
if lf else None
|
||
)
|
||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||
if _gen_ctx:
|
||
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
_gen_ctx.__exit__(None, None, None)
|
||
|
||
messages.append(response)
|
||
|
||
if not response.tool_calls:
|
||
final_text = _as_text(response.content)
|
||
logger.info(
|
||
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
len(final_text),
|
||
)
|
||
if _span:
|
||
_span.update(output=final_text)
|
||
_messages_to_save = messages[1:] # strip SystemMessage; save full tool history
|
||
return final_text
|
||
|
||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||
for call in response.tool_calls:
|
||
tool_calls_count += 1
|
||
call_id = str(call.get("id", ""))
|
||
call_name = str(call.get("name", ""))
|
||
call_args = call.get("args", {})
|
||
logger.info(
|
||
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
|
||
call_id,
|
||
call_name,
|
||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||
)
|
||
|
||
tool_fn = tool_map.get(call_name)
|
||
if tool_fn is None:
|
||
tool_output = f"Unknown tool: {call_name}"
|
||
elif lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="tool",
|
||
name=call_name,
|
||
input=call_args,
|
||
) as tool_obs:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
tool_obs.update(output=str(tool_output)[:8000])
|
||
else:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
|
||
logger.info(
|
||
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
|
||
call_id,
|
||
call_name,
|
||
str(tool_output)[:1200],
|
||
)
|
||
|
||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||
|
||
final = await llm.ainvoke(messages)
|
||
final_text = _as_text(final.content)
|
||
messages.append(AIMessage(content=final_text))
|
||
logger.info(
|
||
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
len(final_text),
|
||
)
|
||
if _span:
|
||
_span.update(output=final_text)
|
||
_messages_to_save = messages[1:]
|
||
return final_text
|
||
finally:
|
||
if session_id and _messages_to_save is not None:
|
||
session_buffer.set(user_id, session_id, _messages_to_save)
|
||
clear_tool_result_collector()
|
||
if _span_ctx:
|
||
_span_ctx.__exit__(None, None, None)
|
||
_lf_ctx.__exit__(None, None, None)
|
||
if lf:
|
||
lf.flush()
|
||
|
||
|
||
async def _run_single_agent_stream(
|
||
*,
|
||
user_id: str,
|
||
system_prompt: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
max_steps: int = 6,
|
||
langfuse_prompt: Any = None,
|
||
agent_name: str = "agent",
|
||
tools: list[Any] | None = None,
|
||
conversation_history: list[dict[str, str]] | None = None,
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
trace_id = _trace_id_from_context(context)
|
||
session_id = _session_id_from_context(context)
|
||
lf = get_langfuse()
|
||
llm = get_agent_llm(agent_name)
|
||
if tools is None:
|
||
tools = _all_tools_for_user(user_id, trace_id)
|
||
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
|
||
llm_with_tools = llm.bind_tools(tools)
|
||
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||
messages: list[Any] = [
|
||
SystemMessage(content=system_prompt),
|
||
*history_messages,
|
||
HumanMessage(content=message),
|
||
]
|
||
|
||
tool_calls_count = 0
|
||
streamed_chars = 0
|
||
collected: list[dict[str, Any]] = []
|
||
set_tool_result_collector(collected)
|
||
|
||
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
|
||
_lf_ctx.__enter__()
|
||
|
||
_span_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="agent",
|
||
name=f"{agent_name}-stream",
|
||
metadata={"user_id": user_id, "session_id": trace_id},
|
||
input=message,
|
||
)
|
||
if lf else None
|
||
)
|
||
_span = _span_ctx.__enter__() if _span_ctx else None
|
||
streamed_text: list[str] = []
|
||
_messages_to_save: list[Any] | None = None
|
||
|
||
try:
|
||
for _ in range(max_steps):
|
||
_gen_ctx = (
|
||
lf.start_as_current_observation(
|
||
as_type="generation",
|
||
name=f"{agent_name}-llm",
|
||
model=model_for_agent(agent_name),
|
||
prompt=langfuse_prompt,
|
||
input=messages,
|
||
)
|
||
if lf else None
|
||
)
|
||
_gen = _gen_ctx.__enter__() if _gen_ctx else None
|
||
response: AIMessage = await llm_with_tools.ainvoke(messages)
|
||
if _gen_ctx:
|
||
_gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
|
||
_gen_ctx.__exit__(None, None, None)
|
||
|
||
if not response.tool_calls:
|
||
# Yield the content from the ainvoke response directly — no second LLM call.
|
||
# Previously, messages.append(response) was called first, so the re-stream
|
||
# received [System, Human, AI] and regenerated a response without tools bound.
|
||
final_text = _as_text(response.content)
|
||
if final_text:
|
||
streamed_chars += len(final_text)
|
||
streamed_text.append(final_text)
|
||
yield "token", final_text
|
||
logger.info(
|
||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
streamed_chars,
|
||
)
|
||
if _span:
|
||
_span.update(output="".join(streamed_text))
|
||
messages.append(response)
|
||
_messages_to_save = messages[1:] # strip SystemMessage
|
||
return
|
||
|
||
messages.append(response)
|
||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||
for call in response.tool_calls:
|
||
tool_calls_count += 1
|
||
call_id = str(call.get("id", ""))
|
||
call_name = str(call.get("name", ""))
|
||
call_args = call.get("args", {})
|
||
logger.info(
|
||
"deep_agent: AI->Tool tool_call_id=%s tool=%s args=%s",
|
||
call_id,
|
||
call_name,
|
||
json.dumps(call_args, ensure_ascii=True)[:800],
|
||
)
|
||
|
||
tool_fn = tool_map.get(call_name)
|
||
if tool_fn is None:
|
||
tool_output = f"Unknown tool: {call_name}"
|
||
elif lf:
|
||
with lf.start_as_current_observation(
|
||
as_type="tool",
|
||
name=call_name,
|
||
input=call_args,
|
||
) as tool_obs:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
tool_obs.update(output=str(tool_output)[:8000])
|
||
else:
|
||
tool_output = await tool_fn.ainvoke(call_args)
|
||
|
||
logger.info(
|
||
"deep_agent: Tool->AI tool_call_id=%s tool=%s output=%s",
|
||
call_id,
|
||
call_name,
|
||
str(tool_output)[:1200],
|
||
)
|
||
|
||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||
|
||
fallback_chunks: list[str] = []
|
||
async for chunk in llm.astream(messages):
|
||
token = _as_text(getattr(chunk, "content", ""))
|
||
if token:
|
||
streamed_chars += len(token)
|
||
streamed_text.append(token)
|
||
fallback_chunks.append(token)
|
||
yield "token", token
|
||
messages.append(AIMessage(content="".join(fallback_chunks)))
|
||
_messages_to_save = messages[1:]
|
||
logger.info(
|
||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||
trace_id or "-",
|
||
user_id,
|
||
tool_calls_count,
|
||
streamed_chars,
|
||
)
|
||
if _span:
|
||
_span.update(output="".join(streamed_text))
|
||
finally:
|
||
if session_id and _messages_to_save is not None:
|
||
session_buffer.set(user_id, session_id, _messages_to_save)
|
||
clear_tool_result_collector()
|
||
if _span_ctx:
|
||
_span_ctx.__exit__(None, None, None)
|
||
_lf_ctx.__exit__(None, None, None)
|
||
if lf:
|
||
lf.flush()
|
||
|
||
|
||
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
||
prepared_context = await _prepare_context(message, context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
|
||
response = await _run_single_agent(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="home-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
)
|
||
return _normalize_tagged_list_lines(response, message)
|
||
|
||
|
||
async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> tuple[str, dict[str, str | None]]:
|
||
prepared_context = await _prepare_context(message, context)
|
||
domain = await _infer_floating_domain(message, prepared_context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
|
||
response = await _run_single_agent(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="floating-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
)
|
||
sanitized = _strip_floating_markup(response)
|
||
if not sanitized and response:
|
||
sanitized = _fallback_from_raw_floating_text(response)
|
||
return sanitized, domain
|
||
|
||
|
||
async def run_home_stream(
|
||
user_id: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
project_id: str | None = None,
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
from app.agents.folder_agent import FOLDER_TOOLS
|
||
|
||
prepared_context = await _prepare_context(message, context)
|
||
system_prompt, langfuse_prompt = _build_system_prompt("home_system", _HOME_SYSTEM_PROMPT, prepared_context)
|
||
|
||
manifest_block = ""
|
||
if project_id:
|
||
manifest = await _fetch_project_manifest(project_id)
|
||
manifest_block = format_folder_manifest(manifest)
|
||
if not manifest_block:
|
||
# No specific project context — surface all linked folders so the agent
|
||
# can answer questions like "tell me about project X" using its files.
|
||
manifest_block = await build_brief_multi_project_manifest()
|
||
system_prompt = system_prompt + ("\n\n" + manifest_block if manifest_block else "")
|
||
|
||
trace_id = _trace_id_from_context(prepared_context)
|
||
tools = [*_all_tools_for_user(user_id, trace_id), *FOLDER_TOOLS]
|
||
|
||
text_chunks: list[str] = []
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="home-agent",
|
||
tools=tools,
|
||
conversation_history=context.get("conversation_history"),
|
||
):
|
||
event_type, data = event
|
||
if event_type != "token":
|
||
yield event
|
||
continue
|
||
text_chunks.append(str(data or ""))
|
||
|
||
normalized = _normalize_tagged_list_lines("".join(text_chunks), message)
|
||
if normalized:
|
||
yield "token", normalized
|
||
|
||
|
||
async def run_floating_stream(
|
||
user_id: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
prepared_context = await _prepare_context(message, context)
|
||
domain = await _infer_floating_domain(message, prepared_context)
|
||
yield "floating_domain", domain
|
||
|
||
brief_mode: bool = bool(context.get("brief_mode"))
|
||
briefing_context_text: str = str(context.get("briefing_context") or "").strip()
|
||
|
||
if brief_mode and briefing_context_text:
|
||
# Stage 2: inject briefing as ground truth context.
|
||
# Pre-substitute {briefing_context} in the template (handles both Langfuse {{}} and fallback {})
|
||
# before compile_prompt sees the remaining standard variables.
|
||
template, langfuse_prompt = get_prompt_or_fallback(
|
||
"task_brief_followup_system",
|
||
_TASK_BRIEF_FOLLOWUP_SYSTEM_PROMPT,
|
||
)
|
||
system_prompt = compile_prompt(
|
||
template, langfuse_prompt,
|
||
date_context=_datetime_context_injection(prepared_context).strip(),
|
||
language_instruction=_language_instruction(prepared_context).strip(),
|
||
user_identity=_user_identity_injection(prepared_context).strip(),
|
||
relational_memory=_relational_memory_injection(prepared_context).strip(),
|
||
proactive_hints=_proactive_hints_injection(prepared_context).strip(),
|
||
request_context=_request_context_block(prepared_context),
|
||
briefing_context=briefing_context_text,
|
||
)
|
||
else:
|
||
system_prompt, langfuse_prompt = _build_system_prompt("floating_system", _FLOATING_SYSTEM_PROMPT, prepared_context)
|
||
sanitizer = _FloatingStreamSanitizer()
|
||
emitted_sanitized = False
|
||
raw_chunks: list[str] = []
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="floating-agent",
|
||
conversation_history=context.get("conversation_history"),
|
||
):
|
||
event_type, data = event
|
||
if event_type != "token":
|
||
yield event
|
||
continue
|
||
|
||
raw_chunk = str(data or "")
|
||
raw_chunks.append(raw_chunk)
|
||
sanitized_chunk = sanitizer.feed(raw_chunk)
|
||
if sanitized_chunk:
|
||
emitted_sanitized = True
|
||
yield "token", sanitized_chunk
|
||
|
||
tail = sanitizer.finalize()
|
||
if tail:
|
||
emitted_sanitized = True
|
||
yield "token", tail
|
||
|
||
if not emitted_sanitized and raw_chunks:
|
||
yield "token", _fallback_from_raw_floating_text("".join(raw_chunks))
|
||
|
||
|
||
async def run_contextual_stream(
|
||
user_id: str,
|
||
message: str,
|
||
context: dict[str, Any],
|
||
scope: "ContextualScope", # type: ignore[name-defined]
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
"""Run the contextual agent for a single user turn.
|
||
|
||
Mirrors run_floating_stream's plumbing but injects the rendered scope
|
||
block into the system prompt and exposes the contextual tool set.
|
||
Note-edit tools (propose_note_edit) are intentionally excluded.
|
||
"""
|
||
from app.schemas.contextual import ContextualScope, render_scope_block # noqa: PLC0415
|
||
|
||
prepared_context = await _prepare_context(message, context)
|
||
trace_id = _trace_id_from_context(prepared_context)
|
||
|
||
template, langfuse_prompt = get_prompt_or_fallback(
|
||
"contextual_system", _CONTEXTUAL_SYSTEM_PROMPT,
|
||
)
|
||
scope_block = render_scope_block(scope)
|
||
# Build system prompt: Langfuse template (or fallback) + scope injection.
|
||
# The contextual prompt has no per-request slots like {date_context}, so
|
||
# we just append the scope block directly.
|
||
system_prompt = template + f"\n\n## Current view\n{scope_block}"
|
||
system_prompt += _language_instruction(prepared_context)
|
||
|
||
tools = _contextual_tools(user_id, trace_id)
|
||
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=message,
|
||
context=prepared_context,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="contextual-agent",
|
||
tools=tools,
|
||
conversation_history=context.get("conversation_history"),
|
||
):
|
||
yield event
|
||
|
||
|
||
async def run_task_brief_research_stream(
|
||
user_id: str,
|
||
task_id: str,
|
||
context: dict[str, Any],
|
||
project_id: str | None = None,
|
||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||
"""Stage-1 executive assistant: deep research for one task.
|
||
|
||
Yields ``("token", chunk)`` events like other stream runners.
|
||
The final concatenated text may contain a ``<canvas kind="...">...</canvas>`` block
|
||
which the WS handler strips and emits as a ``canvas_draft`` mutation.
|
||
"""
|
||
from app.agents.folder_agent import FOLDER_TOOLS
|
||
|
||
prepared_context = await _prepare_context(f"task:{task_id}", context)
|
||
tools = [*_brief_research_tools(user_id, _trace_id_from_context(prepared_context)), *FOLDER_TOOLS]
|
||
|
||
# Inject task_id so the agent knows what to look up first.
|
||
research_message = (
|
||
f"Prepare a briefing dossier for task ID: {task_id}\n"
|
||
"Follow the research workflow: read the task, then project, then client, "
|
||
"then cross-project relations, then relevant memory. "
|
||
"End with a concrete suggested first step. "
|
||
"If this is a writing task, include a <canvas kind=\"...\"> draft."
|
||
)
|
||
|
||
system_prompt, langfuse_prompt = _build_system_prompt(
|
||
"task_brief_research_system",
|
||
_TASK_BRIEF_RESEARCH_SYSTEM_PROMPT,
|
||
prepared_context,
|
||
)
|
||
|
||
manifest_block = ""
|
||
if project_id:
|
||
manifest = await _fetch_project_manifest(project_id)
|
||
manifest_block = format_folder_manifest(manifest)
|
||
system_prompt = system_prompt + ("\n\n" + manifest_block if manifest_block else "")
|
||
|
||
async for event in _run_single_agent_stream(
|
||
user_id=user_id,
|
||
system_prompt=system_prompt,
|
||
message=research_message,
|
||
context=prepared_context,
|
||
max_steps=12,
|
||
langfuse_prompt=langfuse_prompt,
|
||
agent_name="task-brief-agent",
|
||
tools=tools,
|
||
conversation_history=None,
|
||
):
|
||
yield event
|
||
|
||
|
||
async def update_core_memory(user_id: str, key: str, value: str) -> None:
|
||
"""Compatibility helper kept for callers that expect explicit memory update API."""
|
||
async with async_session() as db:
|
||
memory = MemoryMiddleware(db)
|
||
await memory.update_core(user_id, key, value)
|