Fix home message tools calls

This commit is contained in:
Roberto
2026-04-29 09:21:41 +02:00
parent 6787e690ba
commit c20c6d7853
7 changed files with 232 additions and 24 deletions

View File

@@ -46,7 +46,9 @@ async def list_tasks(
project_id: UUID of the project to scope results to.
status: filter by status — todo | in_progress | done.
priority: filter by priority — high | medium | low.
assignee: substring to match against assignee names.
assignee: substring to match against assignee names. OMIT unless the user explicitly
names a person or refers to themselves ("my tasks", "assigned to me", "mine").
Do NOT default to the current user.
search: substring search across title and description.
order_by: sort field — dueDate | priority | createdAt | completedAt.
order_dir: asc (default) | desc.
@@ -97,7 +99,8 @@ async def list_tasks(
return "No tasks found matching the given filters."
lines = [
f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, "
f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, id: {r['id']})"
f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, "
f"projectId: {r.get('projectId')}, id: {r['id']})"
for r in rows
]
return f"Found {len(rows)} task(s):\n" + "\n".join(lines)
@@ -122,7 +125,8 @@ async def count_tasks(
Use this instead of list_tasks for "how many" questions — it is much cheaper.
Same filter parameters as list_tasks (no limit/offset/order_by needed).
assignee: OMIT unless the user explicitly names a person or refers to themselves
("my tasks"). Do NOT default to the current user.
due_date_from / due_date_to: ms epoch range for dueDate. Use -1 to omit.
created_at_from / created_at_to: ms epoch range for createdAt. Use -1 to omit.
completed_at_from / completed_at_to: ms epoch range for completedAt. Use -1 to omit.
@@ -197,7 +201,7 @@ async def create_task(
row = result["row"]
return (
f"Task created: '{row['title']}' "
f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']})"
f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']}, projectId: {row.get('projectId')})"
)
@@ -241,7 +245,7 @@ async def update_task(
data={"id": task_id, "updates": updates},
)
row = result["row"]
return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']})"
return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']}, projectId: {row.get('projectId')})"
@tool
@@ -280,7 +284,8 @@ async def list_tasks_due_today(user_timezone: str = "UTC", include_done: bool =
if not rows:
return "No tasks are due today."
lines = [
f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, id: {r['id']})"
f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, "
f"projectId: {r.get('projectId')}, id: {r['id']})"
for r in rows
]
return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines)

View File

@@ -89,7 +89,8 @@ async def list_timelines(
return "No timeline events found."
lines = [
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, "
f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, id: {r['id']})"
f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, "
f"projectId: {r.get('projectId')}, id: {r['id']})"
for r in rows
]
return f"Found {len(rows)} timeline event(s):\n" + "\n".join(lines)
@@ -246,7 +247,8 @@ async def list_timelines_today(user_timezone: str = "UTC", include_completed: bo
if not rows:
return "No timeline events today."
lines = [
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, completed: {bool(r.get('isCompleted'))}, id: {r['id']})"
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, "
f"completed: {bool(r.get('isCompleted'))}, projectId: {r.get('projectId')}, id: {r['id']})"
for r in rows
]
return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines)

View File

@@ -294,6 +294,7 @@ async def _handle_floating_request(
)
context: dict = {
"conversation_history": frame.get("conversation_history", []),
"scope": scope,
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
"format_prefs": frame.get("format_prefs"),

View File

@@ -0,0 +1,59 @@
"""In-process TTL buffer for per-session LangChain message history.
Stores the full message list (including AIMessage with tool_calls and ToolMessage)
keyed by (user_id, session_id), so agents can reconstruct tool-call context across
conversation turns without it being lossy through the wire.
Single-process only. For multi-worker deployments, replace the _SessionBuffer
implementation with one backed by Redis (serialize LangChain messages to dicts via
message_to_dict / messages_from_dict from langchain_core.messages).
"""
from __future__ import annotations
import time
from threading import Lock
from langchain_core.messages import BaseMessage
SESSION_TTL_SECONDS = 1800 # 30-minute idle expiry
MAX_MESSAGES_PER_SESSION = 80 # cap to avoid unbounded memory growth
class _SessionBuffer:
def __init__(self) -> None:
self._store: dict[tuple[str, str], tuple[float, list[BaseMessage]]] = {}
self._lock = Lock()
def _evict_stale(self) -> None:
now = time.monotonic()
stale = [k for k, (ts, _) in self._store.items() if now - ts > SESSION_TTL_SECONDS]
for k in stale:
del self._store[k]
def get(self, user_id: str, session_id: str) -> list[BaseMessage] | None:
key = (user_id, session_id)
with self._lock:
entry = self._store.get(key)
if entry is None:
return None
ts, msgs = entry
if time.monotonic() - ts > SESSION_TTL_SECONDS:
del self._store[key]
return None
self._store[key] = (time.monotonic(), msgs)
return list(msgs)
def set(self, user_id: str, session_id: str, messages: list[BaseMessage]) -> None:
key = (user_id, session_id)
capped = messages[-MAX_MESSAGES_PER_SESSION:]
with self._lock:
self._evict_stale()
self._store[key] = (time.monotonic(), capped)
def clear(self, user_id: str, session_id: str) -> None:
with self._lock:
self._store.pop((user_id, session_id), None)
# Module-level singleton — same pattern as _pending_states in api/app/api/routes/auth.py
session_buffer = _SessionBuffer()

View File

@@ -16,6 +16,7 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.agent_session_buffer import session_buffer
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_agent_llm, model_for_agent
from app.core.memory_middleware import MemoryMiddleware
@@ -24,6 +25,8 @@ from app.db import async_session
logger = logging.getLogger(__name__)
MAX_HISTORY_TURNS = 20
FloatingDomainType = Literal["task", "timeline", "project", "node"]
FloatingDomainSection = Literal["task", "timeline", "note"]
@@ -176,6 +179,25 @@ def _relational_memory_injection(context: dict[str, Any]) -> str:
return section
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
def _user_identity_injection(context: dict[str, Any]) -> str:
"""Return a compact user-profile block from core memory onboarding fields.
Returns empty string when no onboarding keys are present.
"""
core = context.get("core_memory") or {}
parts: list[str] = []
for key in _IDENTITY_KEYS:
val = (core.get(key) or "").strip()
if val:
parts.append(f"- {key}: {val}")
if not parts:
return ""
return "\n\nUser profile:\n" + "\n".join(parts)
def _request_context_block(context: dict[str, Any]) -> str:
"""Return a small block with per-request scope and resolved project context."""
parts: list[str] = []
@@ -189,16 +211,39 @@ def _request_context_block(context: dict[str, Any]) -> str:
_HOME_SYSTEM_PROMPT = """\
You are the home assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools.
Always use tools for factual data retrieval before answering.
When the user asks to remember, forget, or update what you know about them, use memory tools.
You are adiuvAI's home executive assistant.{user_identity}
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
# How you work
- Use tools before answering anything factual. Never guess counts, dates, or status.
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
- Match the user's tone preference. Default to warm-but-direct; stay concise.
- When the user asks to remember, forget, or update something, use memory tools.
# Filter discipline
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
- The user's own name in the User profile block is for context only — it is NOT a default filter.
- When in doubt, omit `assignee` and return the global result.
# Output format
Return markdown and use tags when relevant: <project>[ids]</project>, <task>[ids]</task>, <note>[ids]</note>, <timeline>[ids]</timeline>, <chart>{{json}}</chart>.
When listing tasks or timelines, each id tag must be on its own line with no prefix/suffix text.
Never put titles, priorities, or dates on the same line as <task> or <timeline> tags.
For questions about upcoming timelines (e.g. 'prossimi eventi'), include only future items in the current month unless the user asks a different range.
For upcoming tasks, after tag lines add a short recommendation based on due date and priority.
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
<project>id</project> <task>id</task> <note>id</note> <timeline>id</timeline>
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
3. One short closing recap (12 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
<chart>{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}</chart>
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
- data rows must include a "name" field; numeric series keys must match config keys.
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
# Date filtering
{date_context}
@@ -221,11 +266,23 @@ For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_t
"""
_FLOATING_SYSTEM_PROMPT = """\
You are the floating assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools.
Stay focused on the floating scope and answer concisely.
Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers.
Always use tools for factual data retrieval before answering.
When the user asks to remember, forget, or update what you know about them, use memory tools.
You are adiuvAI's floating executive assistant.{user_identity}
You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope.
Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most.
# How you work
- Use tools before answering anything factual. Never guess.
- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant.
- Match the user's tone preference. Default to warm-but-direct.
- When the user asks to remember, forget, or update something, use memory tools.
# Filter discipline
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
- The user's own name in the User profile block is for context only — it is NOT a default filter.
- When in doubt, omit `assignee` and return the global result.
# Output format
Plain text only. Do NOT output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed-id wrappers, and do NOT output <chart> blocks — those are for the home assistant.
# Date filtering
{date_context}
@@ -361,6 +418,7 @@ def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> t
template, prompt_obj,
date_context=_datetime_context_injection(context).strip(),
language_instruction=_language_instruction(context).strip(),
user_identity=_user_identity_injection(context).strip(),
relational_memory=_relational_memory_injection(context).strip(),
proactive_hints=_proactive_hints_injection(context).strip(),
request_context=_request_context_block(context),
@@ -807,6 +865,23 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
return _infer_floating_domain_rule_based(message, context)
def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]:
if not history:
return []
turns = history[-MAX_HISTORY_TURNS:]
result: list[Any] = []
for turn in turns:
role = turn.get("role", "")
content = turn.get("content", "")
if not content:
continue
if role == "user":
result.append(HumanMessage(content=content))
elif role == "assistant":
result.append(AIMessage(content=content))
return result
async def _run_single_agent(
*,
user_id: str,
@@ -816,6 +891,7 @@ async def _run_single_agent(
max_steps: int = 6,
langfuse_prompt: Any = None,
agent_name: str = "agent",
conversation_history: list[dict[str, str]] | None = None,
) -> str:
trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
@@ -824,8 +900,11 @@ async def _run_single_agent(
tools = _all_tools_for_user(user_id, trace_id)
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
llm_with_tools = llm.bind_tools(tools)
_buffered = session_buffer.get(user_id, session_id) if session_id else None
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
messages: list[Any] = [
SystemMessage(content=system_prompt),
*history_messages,
HumanMessage(content=message),
]
@@ -838,7 +917,7 @@ async def _run_single_agent(
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
as_type="agent",
name=agent_name,
metadata={"user_id": user_id, "session_id": trace_id},
input=message,
@@ -846,6 +925,7 @@ async def _run_single_agent(
if lf else None
)
_span = _span_ctx.__enter__() if _span_ctx else None
_messages_to_save: list[Any] | None = None
try:
for _ in range(max_steps):
@@ -878,6 +958,7 @@ async def _run_single_agent(
)
if _span:
_span.update(output=final_text)
_messages_to_save = messages[1:] # strip SystemMessage; save full tool history
return final_text
tool_map = {tool_def.name: tool_def for tool_def in tools}
@@ -896,6 +977,14 @@ async def _run_single_agent(
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
elif lf:
with lf.start_as_current_observation(
as_type="tool",
name=call_name,
input=call_args,
) as tool_obs:
tool_output = await tool_fn.ainvoke(call_args)
tool_obs.update(output=str(tool_output)[:8000])
else:
tool_output = await tool_fn.ainvoke(call_args)
@@ -910,6 +999,7 @@ async def _run_single_agent(
final = await llm.ainvoke(messages)
final_text = _as_text(final.content)
messages.append(AIMessage(content=final_text))
logger.info(
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
trace_id or "-",
@@ -919,8 +1009,11 @@ async def _run_single_agent(
)
if _span:
_span.update(output=final_text)
_messages_to_save = messages[1:]
return final_text
finally:
if session_id and _messages_to_save is not None:
session_buffer.set(user_id, session_id, _messages_to_save)
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
@@ -939,6 +1032,7 @@ async def _run_single_agent_stream(
langfuse_prompt: Any = None,
agent_name: str = "agent",
tools: list[Any] | None = None,
conversation_history: list[dict[str, str]] | None = None,
) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
@@ -948,8 +1042,11 @@ async def _run_single_agent_stream(
tools = _all_tools_for_user(user_id, trace_id)
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
llm_with_tools = llm.bind_tools(tools)
_buffered = session_buffer.get(user_id, session_id) if session_id else None
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
messages: list[Any] = [
SystemMessage(content=system_prompt),
*history_messages,
HumanMessage(content=message),
]
@@ -963,7 +1060,7 @@ async def _run_single_agent_stream(
_span_ctx = (
lf.start_as_current_observation(
as_type="span",
as_type="agent",
name=f"{agent_name}-stream",
metadata={"user_id": user_id, "session_id": trace_id},
input=message,
@@ -972,6 +1069,7 @@ async def _run_single_agent_stream(
)
_span = _span_ctx.__enter__() if _span_ctx else None
streamed_text: list[str] = []
_messages_to_save: list[Any] | None = None
try:
for _ in range(max_steps):
@@ -1009,6 +1107,8 @@ async def _run_single_agent_stream(
)
if _span:
_span.update(output="".join(streamed_text))
messages.append(response)
_messages_to_save = messages[1:] # strip SystemMessage
return
messages.append(response)
@@ -1028,6 +1128,14 @@ async def _run_single_agent_stream(
tool_fn = tool_map.get(call_name)
if tool_fn is None:
tool_output = f"Unknown tool: {call_name}"
elif lf:
with lf.start_as_current_observation(
as_type="tool",
name=call_name,
input=call_args,
) as tool_obs:
tool_output = await tool_fn.ainvoke(call_args)
tool_obs.update(output=str(tool_output)[:8000])
else:
tool_output = await tool_fn.ainvoke(call_args)
@@ -1040,12 +1148,16 @@ async def _run_single_agent_stream(
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
fallback_chunks: list[str] = []
async for chunk in llm.astream(messages):
token = _as_text(getattr(chunk, "content", ""))
if token:
streamed_chars += len(token)
streamed_text.append(token)
fallback_chunks.append(token)
yield "token", token
messages.append(AIMessage(content="".join(fallback_chunks)))
_messages_to_save = messages[1:]
logger.info(
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
trace_id or "-",
@@ -1056,6 +1168,8 @@ async def _run_single_agent_stream(
if _span:
_span.update(output="".join(streamed_text))
finally:
if session_id and _messages_to_save is not None:
session_buffer.set(user_id, session_id, _messages_to_save)
clear_tool_result_collector()
if _span_ctx:
_span_ctx.__exit__(None, None, None)
@@ -1074,6 +1188,7 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
conversation_history=context.get("conversation_history"),
)
return _normalize_tagged_list_lines(response, message)
@@ -1089,6 +1204,7 @@ async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> t
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
conversation_history=context.get("conversation_history"),
)
sanitized = _strip_floating_markup(response)
if not sanitized and response:
@@ -1111,6 +1227,7 @@ async def run_home_stream(
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="home-agent",
conversation_history=context.get("conversation_history"),
):
event_type, data = event
if event_type != "token":
@@ -1143,6 +1260,7 @@ async def run_floating_stream(
context=prepared_context,
langfuse_prompt=langfuse_prompt,
agent_name="floating-agent",
conversation_history=context.get("conversation_history"),
):
event_type, data = event
if event_type != "token":

View File

@@ -7,10 +7,32 @@ The callback sends a `tool_call` WS frame and awaits the `tool_result`.
from __future__ import annotations
import re
from contextvars import ContextVar
from typing import Any, Callable, Coroutine
from uuid import uuid4
_SNAKE_TO_CAMEL_RE = re.compile(r"_([a-z])")
def _key_to_camel(key: str) -> str:
return _SNAKE_TO_CAMEL_RE.sub(lambda m: m.group(1).upper(), key)
def _keys_to_camel(obj: Any) -> Any:
"""Recursively convert dict keys from snake_case to camelCase.
Mirrors the JS-side ``toCamelCase`` applied to incoming WS frames in
``adiuvAI/src/main/api/backend-client.ts``. The Electron executor wraps
tool_result payloads in ``toSnakeCase`` before sending; this restores the
camelCase schema property names that the tool code expects to read.
"""
if isinstance(obj, dict):
return {_key_to_camel(k): _keys_to_camel(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_keys_to_camel(v) for v in obj]
return obj
# Holds the execute callback for the current WS session.
# Set by the chat WS handler before the orchestrator runs; cleared after.
_client_executor: ContextVar[Callable[[dict], Coroutine[Any, Any, dict]]] = ContextVar(
@@ -82,6 +104,7 @@ async def execute_on_client(
payload["limit"] = limit
result = await callback(payload)
result = _keys_to_camel(result)
collector = _tool_result_collector.get(None)
if collector is not None:
collector.append({

View File

@@ -33,7 +33,7 @@ google-auth-httplib2>=0.2.0
msal>=1.28.0
cryptography>=42.0.0
pgvector>=0.2.5
langfuse>=2.0.0
langfuse>=3.3.1
beautifulsoup4>=4.12.0
lxml>=5.0.0
PyYAML>=6.0.0