From c20c6d7853ec5259a92ab7c01841c90e39382fee Mon Sep 17 00:00:00 2001 From: Roberto Date: Wed, 29 Apr 2026 09:21:41 +0200 Subject: [PATCH] Fix home message tools calls --- app/agents/task_agent.py | 17 ++-- app/agents/timeline_agent.py | 6 +- app/api/routes/device_ws.py | 1 + app/core/agent_session_buffer.py | 59 ++++++++++++ app/core/deep_agent.py | 148 +++++++++++++++++++++++++++---- app/core/ws_context.py | 23 +++++ requirements.txt | 2 +- 7 files changed, 232 insertions(+), 24 deletions(-) create mode 100644 app/core/agent_session_buffer.py diff --git a/app/agents/task_agent.py b/app/agents/task_agent.py index 9dd85dd..7761122 100644 --- a/app/agents/task_agent.py +++ b/app/agents/task_agent.py @@ -46,7 +46,9 @@ async def list_tasks( project_id: UUID of the project to scope results to. status: filter by status — todo | in_progress | done. priority: filter by priority — high | medium | low. - assignee: substring to match against assignee names. + assignee: substring to match against assignee names. OMIT unless the user explicitly + names a person or refers to themselves ("my tasks", "assigned to me", "mine"). + Do NOT default to the current user. search: substring search across title and description. order_by: sort field — dueDate | priority | createdAt | completedAt. order_dir: asc (default) | desc. @@ -97,7 +99,8 @@ async def list_tasks( return "No tasks found matching the given filters." lines = [ f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, " - f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, id: {r['id']})" + f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, " + f"projectId: {r.get('projectId')}, id: {r['id']})" for r in rows ] return f"Found {len(rows)} task(s):\n" + "\n".join(lines) @@ -122,7 +125,8 @@ async def count_tasks( Use this instead of list_tasks for "how many" questions — it is much cheaper. Same filter parameters as list_tasks (no limit/offset/order_by needed). - + assignee: OMIT unless the user explicitly names a person or refers to themselves + ("my tasks"). Do NOT default to the current user. due_date_from / due_date_to: ms epoch range for dueDate. Use -1 to omit. created_at_from / created_at_to: ms epoch range for createdAt. Use -1 to omit. completed_at_from / completed_at_to: ms epoch range for completedAt. Use -1 to omit. @@ -197,7 +201,7 @@ async def create_task( row = result["row"] return ( f"Task created: '{row['title']}' " - f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']})" + f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']}, projectId: {row.get('projectId')})" ) @@ -241,7 +245,7 @@ async def update_task( data={"id": task_id, "updates": updates}, ) row = result["row"] - return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']})" + return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']}, projectId: {row.get('projectId')})" @tool @@ -280,7 +284,8 @@ async def list_tasks_due_today(user_timezone: str = "UTC", include_done: bool = if not rows: return "No tasks are due today." lines = [ - f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, id: {r['id']})" + f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, " + f"projectId: {r.get('projectId')}, id: {r['id']})" for r in rows ] return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines) diff --git a/app/agents/timeline_agent.py b/app/agents/timeline_agent.py index 0f777a1..beeedb1 100644 --- a/app/agents/timeline_agent.py +++ b/app/agents/timeline_agent.py @@ -89,7 +89,8 @@ async def list_timelines( return "No timeline events found." lines = [ f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, " - f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, id: {r['id']})" + f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, " + f"projectId: {r.get('projectId')}, id: {r['id']})" for r in rows ] return f"Found {len(rows)} timeline event(s):\n" + "\n".join(lines) @@ -246,7 +247,8 @@ async def list_timelines_today(user_timezone: str = "UTC", include_completed: bo if not rows: return "No timeline events today." lines = [ - f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, completed: {bool(r.get('isCompleted'))}, id: {r['id']})" + f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, " + f"completed: {bool(r.get('isCompleted'))}, projectId: {r.get('projectId')}, id: {r['id']})" for r in rows ] return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines) diff --git a/app/api/routes/device_ws.py b/app/api/routes/device_ws.py index 47f8511..e66304a 100644 --- a/app/api/routes/device_ws.py +++ b/app/api/routes/device_ws.py @@ -294,6 +294,7 @@ async def _handle_floating_request( ) context: dict = { + "conversation_history": frame.get("conversation_history", []), "scope": scope, "_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id}, "format_prefs": frame.get("format_prefs"), diff --git a/app/core/agent_session_buffer.py b/app/core/agent_session_buffer.py new file mode 100644 index 0000000..87cdd03 --- /dev/null +++ b/app/core/agent_session_buffer.py @@ -0,0 +1,59 @@ +"""In-process TTL buffer for per-session LangChain message history. + +Stores the full message list (including AIMessage with tool_calls and ToolMessage) +keyed by (user_id, session_id), so agents can reconstruct tool-call context across +conversation turns without it being lossy through the wire. + +Single-process only. For multi-worker deployments, replace the _SessionBuffer +implementation with one backed by Redis (serialize LangChain messages to dicts via +message_to_dict / messages_from_dict from langchain_core.messages). +""" +from __future__ import annotations + +import time +from threading import Lock + +from langchain_core.messages import BaseMessage + +SESSION_TTL_SECONDS = 1800 # 30-minute idle expiry +MAX_MESSAGES_PER_SESSION = 80 # cap to avoid unbounded memory growth + + +class _SessionBuffer: + def __init__(self) -> None: + self._store: dict[tuple[str, str], tuple[float, list[BaseMessage]]] = {} + self._lock = Lock() + + def _evict_stale(self) -> None: + now = time.monotonic() + stale = [k for k, (ts, _) in self._store.items() if now - ts > SESSION_TTL_SECONDS] + for k in stale: + del self._store[k] + + def get(self, user_id: str, session_id: str) -> list[BaseMessage] | None: + key = (user_id, session_id) + with self._lock: + entry = self._store.get(key) + if entry is None: + return None + ts, msgs = entry + if time.monotonic() - ts > SESSION_TTL_SECONDS: + del self._store[key] + return None + self._store[key] = (time.monotonic(), msgs) + return list(msgs) + + def set(self, user_id: str, session_id: str, messages: list[BaseMessage]) -> None: + key = (user_id, session_id) + capped = messages[-MAX_MESSAGES_PER_SESSION:] + with self._lock: + self._evict_stale() + self._store[key] = (time.monotonic(), capped) + + def clear(self, user_id: str, session_id: str) -> None: + with self._lock: + self._store.pop((user_id, session_id), None) + + +# Module-level singleton — same pattern as _pending_states in api/app/api/routes/auth.py +session_buffer = _SessionBuffer() diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py index 252cb72..a3a7c7d 100644 --- a/app/core/deep_agent.py +++ b/app/core/deep_agent.py @@ -16,6 +16,7 @@ from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS +from app.core.agent_session_buffer import session_buffer from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context from app.core.llm import get_agent_llm, model_for_agent from app.core.memory_middleware import MemoryMiddleware @@ -24,6 +25,8 @@ from app.db import async_session logger = logging.getLogger(__name__) +MAX_HISTORY_TURNS = 20 + FloatingDomainType = Literal["task", "timeline", "project", "node"] FloatingDomainSection = Literal["task", "timeline", "note"] @@ -176,6 +179,25 @@ def _relational_memory_injection(context: dict[str, Any]) -> str: return section +_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference") + + +def _user_identity_injection(context: dict[str, Any]) -> str: + """Return a compact user-profile block from core memory onboarding fields. + + Returns empty string when no onboarding keys are present. + """ + core = context.get("core_memory") or {} + parts: list[str] = [] + for key in _IDENTITY_KEYS: + val = (core.get(key) or "").strip() + if val: + parts.append(f"- {key}: {val}") + if not parts: + return "" + return "\n\nUser profile:\n" + "\n".join(parts) + + def _request_context_block(context: dict[str, Any]) -> str: """Return a small block with per-request scope and resolved project context.""" parts: list[str] = [] @@ -189,16 +211,39 @@ def _request_context_block(context: dict[str, Any]) -> str: _HOME_SYSTEM_PROMPT = """\ -You are the home assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools. -Always use tools for factual data retrieval before answering. -When the user asks to remember, forget, or update what you know about them, use memory tools. +You are adiuvAI's home executive assistant.{user_identity} +You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question. + +# How you work +- Use tools before answering anything factual. Never guess counts, dates, or status. +- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next. +- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant. +- Match the user's tone preference. Default to warm-but-direct; stay concise. +- When the user asks to remember, forget, or update something, use memory tools. + +# Filter discipline +- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine"). +- The user's own name in the User profile block is for context only — it is NOT a default filter. +- When in doubt, omit `assignee` and return the global result. # Output format -Return markdown and use tags when relevant: [ids], [ids], [ids], [ids], {{json}}. -When listing tasks or timelines, each id tag must be on its own line with no prefix/suffix text. -Never put titles, priorities, or dates on the same line as or tags. -For questions about upcoming timelines (e.g. 'prossimi eventi'), include only future items in the current month unless the user asks a different range. -For upcoming tasks, after tag lines add a short recommendation based on due date and priority. +Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line: + id id id id + +When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines: + 1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language. + 2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags. + 3. One short closing recap (1–2 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question. + +For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag. + +For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema: + {{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }} +- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension. +- data rows must include a "name" field; numeric series keys must match config keys. +- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already. + +For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise. # Date filtering {date_context} @@ -221,11 +266,23 @@ For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_t """ _FLOATING_SYSTEM_PROMPT = """\ -You are the floating assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools. -Stay focused on the floating scope and answer concisely. -Return plain text only. Do not output XML/HTML-like tags such as , , , , or any bracketed id tag wrappers. -Always use tools for factual data retrieval before answering. -When the user asks to remember, forget, or update what you know about them, use memory tools. +You are adiuvAI's floating executive assistant.{user_identity} +You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope. +Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most. + +# How you work +- Use tools before answering anything factual. Never guess. +- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant. +- Match the user's tone preference. Default to warm-but-direct. +- When the user asks to remember, forget, or update something, use memory tools. + +# Filter discipline +- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine"). +- The user's own name in the User profile block is for context only — it is NOT a default filter. +- When in doubt, omit `assignee` and return the global result. + +# Output format +Plain text only. Do NOT output XML/HTML-like tags such as , , , , or any bracketed-id wrappers, and do NOT output blocks — those are for the home assistant. # Date filtering {date_context} @@ -361,6 +418,7 @@ def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> t template, prompt_obj, date_context=_datetime_context_injection(context).strip(), language_instruction=_language_instruction(context).strip(), + user_identity=_user_identity_injection(context).strip(), relational_memory=_relational_memory_injection(context).strip(), proactive_hints=_proactive_hints_injection(context).strip(), request_context=_request_context_block(context), @@ -807,6 +865,23 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[ return _infer_floating_domain_rule_based(message, context) +def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]: + if not history: + return [] + turns = history[-MAX_HISTORY_TURNS:] + result: list[Any] = [] + for turn in turns: + role = turn.get("role", "") + content = turn.get("content", "") + if not content: + continue + if role == "user": + result.append(HumanMessage(content=content)) + elif role == "assistant": + result.append(AIMessage(content=content)) + return result + + async def _run_single_agent( *, user_id: str, @@ -816,6 +891,7 @@ async def _run_single_agent( max_steps: int = 6, langfuse_prompt: Any = None, agent_name: str = "agent", + conversation_history: list[dict[str, str]] | None = None, ) -> str: trace_id = _trace_id_from_context(context) session_id = _session_id_from_context(context) @@ -824,8 +900,11 @@ async def _run_single_agent( tools = _all_tools_for_user(user_id, trace_id) logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id) llm_with_tools = llm.bind_tools(tools) + _buffered = session_buffer.get(user_id, session_id) if session_id else None + history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history) messages: list[Any] = [ SystemMessage(content=system_prompt), + *history_messages, HumanMessage(content=message), ] @@ -838,7 +917,7 @@ async def _run_single_agent( _span_ctx = ( lf.start_as_current_observation( - as_type="span", + as_type="agent", name=agent_name, metadata={"user_id": user_id, "session_id": trace_id}, input=message, @@ -846,6 +925,7 @@ async def _run_single_agent( if lf else None ) _span = _span_ctx.__enter__() if _span_ctx else None + _messages_to_save: list[Any] | None = None try: for _ in range(max_steps): @@ -878,6 +958,7 @@ async def _run_single_agent( ) if _span: _span.update(output=final_text) + _messages_to_save = messages[1:] # strip SystemMessage; save full tool history return final_text tool_map = {tool_def.name: tool_def for tool_def in tools} @@ -896,6 +977,14 @@ async def _run_single_agent( tool_fn = tool_map.get(call_name) if tool_fn is None: tool_output = f"Unknown tool: {call_name}" + elif lf: + with lf.start_as_current_observation( + as_type="tool", + name=call_name, + input=call_args, + ) as tool_obs: + tool_output = await tool_fn.ainvoke(call_args) + tool_obs.update(output=str(tool_output)[:8000]) else: tool_output = await tool_fn.ainvoke(call_args) @@ -910,6 +999,7 @@ async def _run_single_agent( final = await llm.ainvoke(messages) final_text = _as_text(final.content) + messages.append(AIMessage(content=final_text)) logger.info( "deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1", trace_id or "-", @@ -919,8 +1009,11 @@ async def _run_single_agent( ) if _span: _span.update(output=final_text) + _messages_to_save = messages[1:] return final_text finally: + if session_id and _messages_to_save is not None: + session_buffer.set(user_id, session_id, _messages_to_save) clear_tool_result_collector() if _span_ctx: _span_ctx.__exit__(None, None, None) @@ -939,6 +1032,7 @@ async def _run_single_agent_stream( langfuse_prompt: Any = None, agent_name: str = "agent", tools: list[Any] | None = None, + conversation_history: list[dict[str, str]] | None = None, ) -> AsyncGenerator[tuple[str, Any], None]: trace_id = _trace_id_from_context(context) session_id = _session_id_from_context(context) @@ -948,8 +1042,11 @@ async def _run_single_agent_stream( tools = _all_tools_for_user(user_id, trace_id) logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id) llm_with_tools = llm.bind_tools(tools) + _buffered = session_buffer.get(user_id, session_id) if session_id else None + history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history) messages: list[Any] = [ SystemMessage(content=system_prompt), + *history_messages, HumanMessage(content=message), ] @@ -963,7 +1060,7 @@ async def _run_single_agent_stream( _span_ctx = ( lf.start_as_current_observation( - as_type="span", + as_type="agent", name=f"{agent_name}-stream", metadata={"user_id": user_id, "session_id": trace_id}, input=message, @@ -972,6 +1069,7 @@ async def _run_single_agent_stream( ) _span = _span_ctx.__enter__() if _span_ctx else None streamed_text: list[str] = [] + _messages_to_save: list[Any] | None = None try: for _ in range(max_steps): @@ -1009,6 +1107,8 @@ async def _run_single_agent_stream( ) if _span: _span.update(output="".join(streamed_text)) + messages.append(response) + _messages_to_save = messages[1:] # strip SystemMessage return messages.append(response) @@ -1028,6 +1128,14 @@ async def _run_single_agent_stream( tool_fn = tool_map.get(call_name) if tool_fn is None: tool_output = f"Unknown tool: {call_name}" + elif lf: + with lf.start_as_current_observation( + as_type="tool", + name=call_name, + input=call_args, + ) as tool_obs: + tool_output = await tool_fn.ainvoke(call_args) + tool_obs.update(output=str(tool_output)[:8000]) else: tool_output = await tool_fn.ainvoke(call_args) @@ -1040,12 +1148,16 @@ async def _run_single_agent_stream( messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) + fallback_chunks: list[str] = [] async for chunk in llm.astream(messages): token = _as_text(getattr(chunk, "content", "")) if token: streamed_chars += len(token) streamed_text.append(token) + fallback_chunks.append(token) yield "token", token + messages.append(AIMessage(content="".join(fallback_chunks))) + _messages_to_save = messages[1:] logger.info( "deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1", trace_id or "-", @@ -1056,6 +1168,8 @@ async def _run_single_agent_stream( if _span: _span.update(output="".join(streamed_text)) finally: + if session_id and _messages_to_save is not None: + session_buffer.set(user_id, session_id, _messages_to_save) clear_tool_result_collector() if _span_ctx: _span_ctx.__exit__(None, None, None) @@ -1074,6 +1188,7 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str: context=prepared_context, langfuse_prompt=langfuse_prompt, agent_name="home-agent", + conversation_history=context.get("conversation_history"), ) return _normalize_tagged_list_lines(response, message) @@ -1089,6 +1204,7 @@ async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> t context=prepared_context, langfuse_prompt=langfuse_prompt, agent_name="floating-agent", + conversation_history=context.get("conversation_history"), ) sanitized = _strip_floating_markup(response) if not sanitized and response: @@ -1111,6 +1227,7 @@ async def run_home_stream( context=prepared_context, langfuse_prompt=langfuse_prompt, agent_name="home-agent", + conversation_history=context.get("conversation_history"), ): event_type, data = event if event_type != "token": @@ -1143,6 +1260,7 @@ async def run_floating_stream( context=prepared_context, langfuse_prompt=langfuse_prompt, agent_name="floating-agent", + conversation_history=context.get("conversation_history"), ): event_type, data = event if event_type != "token": diff --git a/app/core/ws_context.py b/app/core/ws_context.py index 14ac879..36f8a5a 100644 --- a/app/core/ws_context.py +++ b/app/core/ws_context.py @@ -7,10 +7,32 @@ The callback sends a `tool_call` WS frame and awaits the `tool_result`. from __future__ import annotations +import re from contextvars import ContextVar from typing import Any, Callable, Coroutine from uuid import uuid4 +_SNAKE_TO_CAMEL_RE = re.compile(r"_([a-z])") + + +def _key_to_camel(key: str) -> str: + return _SNAKE_TO_CAMEL_RE.sub(lambda m: m.group(1).upper(), key) + + +def _keys_to_camel(obj: Any) -> Any: + """Recursively convert dict keys from snake_case to camelCase. + + Mirrors the JS-side ``toCamelCase`` applied to incoming WS frames in + ``adiuvAI/src/main/api/backend-client.ts``. The Electron executor wraps + tool_result payloads in ``toSnakeCase`` before sending; this restores the + camelCase schema property names that the tool code expects to read. + """ + if isinstance(obj, dict): + return {_key_to_camel(k): _keys_to_camel(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_keys_to_camel(v) for v in obj] + return obj + # Holds the execute callback for the current WS session. # Set by the chat WS handler before the orchestrator runs; cleared after. _client_executor: ContextVar[Callable[[dict], Coroutine[Any, Any, dict]]] = ContextVar( @@ -82,6 +104,7 @@ async def execute_on_client( payload["limit"] = limit result = await callback(payload) + result = _keys_to_camel(result) collector = _tool_result_collector.get(None) if collector is not None: collector.append({ diff --git a/requirements.txt b/requirements.txt index 5fddc64..6934c7c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,7 +33,7 @@ google-auth-httplib2>=0.2.0 msal>=1.28.0 cryptography>=42.0.0 pgvector>=0.2.5 -langfuse>=2.0.0 +langfuse>=3.3.1 beautifulsoup4>=4.12.0 lxml>=5.0.0 PyYAML>=6.0.0