Compare commits
1 Commits
6787e690ba
...
c20c6d7853
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c20c6d7853 |
@@ -46,7 +46,9 @@ async def list_tasks(
|
|||||||
project_id: UUID of the project to scope results to.
|
project_id: UUID of the project to scope results to.
|
||||||
status: filter by status — todo | in_progress | done.
|
status: filter by status — todo | in_progress | done.
|
||||||
priority: filter by priority — high | medium | low.
|
priority: filter by priority — high | medium | low.
|
||||||
assignee: substring to match against assignee names.
|
assignee: substring to match against assignee names. OMIT unless the user explicitly
|
||||||
|
names a person or refers to themselves ("my tasks", "assigned to me", "mine").
|
||||||
|
Do NOT default to the current user.
|
||||||
search: substring search across title and description.
|
search: substring search across title and description.
|
||||||
order_by: sort field — dueDate | priority | createdAt | completedAt.
|
order_by: sort field — dueDate | priority | createdAt | completedAt.
|
||||||
order_dir: asc (default) | desc.
|
order_dir: asc (default) | desc.
|
||||||
@@ -97,7 +99,8 @@ async def list_tasks(
|
|||||||
return "No tasks found matching the given filters."
|
return "No tasks found matching the given filters."
|
||||||
lines = [
|
lines = [
|
||||||
f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, "
|
f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, "
|
||||||
f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, id: {r['id']})"
|
f"dueDate: {r.get('dueDate')}, completedAt: {r.get('completedAt')}, "
|
||||||
|
f"projectId: {r.get('projectId')}, id: {r['id']})"
|
||||||
for r in rows
|
for r in rows
|
||||||
]
|
]
|
||||||
return f"Found {len(rows)} task(s):\n" + "\n".join(lines)
|
return f"Found {len(rows)} task(s):\n" + "\n".join(lines)
|
||||||
@@ -122,7 +125,8 @@ async def count_tasks(
|
|||||||
|
|
||||||
Use this instead of list_tasks for "how many" questions — it is much cheaper.
|
Use this instead of list_tasks for "how many" questions — it is much cheaper.
|
||||||
Same filter parameters as list_tasks (no limit/offset/order_by needed).
|
Same filter parameters as list_tasks (no limit/offset/order_by needed).
|
||||||
|
assignee: OMIT unless the user explicitly names a person or refers to themselves
|
||||||
|
("my tasks"). Do NOT default to the current user.
|
||||||
due_date_from / due_date_to: ms epoch range for dueDate. Use -1 to omit.
|
due_date_from / due_date_to: ms epoch range for dueDate. Use -1 to omit.
|
||||||
created_at_from / created_at_to: ms epoch range for createdAt. Use -1 to omit.
|
created_at_from / created_at_to: ms epoch range for createdAt. Use -1 to omit.
|
||||||
completed_at_from / completed_at_to: ms epoch range for completedAt. Use -1 to omit.
|
completed_at_from / completed_at_to: ms epoch range for completedAt. Use -1 to omit.
|
||||||
@@ -197,7 +201,7 @@ async def create_task(
|
|||||||
row = result["row"]
|
row = result["row"]
|
||||||
return (
|
return (
|
||||||
f"Task created: '{row['title']}' "
|
f"Task created: '{row['title']}' "
|
||||||
f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']})"
|
f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']}, projectId: {row.get('projectId')})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -241,7 +245,7 @@ async def update_task(
|
|||||||
data={"id": task_id, "updates": updates},
|
data={"id": task_id, "updates": updates},
|
||||||
)
|
)
|
||||||
row = result["row"]
|
row = result["row"]
|
||||||
return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']})"
|
return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']}, projectId: {row.get('projectId')})"
|
||||||
|
|
||||||
|
|
||||||
@tool
|
@tool
|
||||||
@@ -280,7 +284,8 @@ async def list_tasks_due_today(user_timezone: str = "UTC", include_done: bool =
|
|||||||
if not rows:
|
if not rows:
|
||||||
return "No tasks are due today."
|
return "No tasks are due today."
|
||||||
lines = [
|
lines = [
|
||||||
f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, id: {r['id']})"
|
f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, "
|
||||||
|
f"projectId: {r.get('projectId')}, id: {r['id']})"
|
||||||
for r in rows
|
for r in rows
|
||||||
]
|
]
|
||||||
return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines)
|
return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines)
|
||||||
|
|||||||
@@ -89,7 +89,8 @@ async def list_timelines(
|
|||||||
return "No timeline events found."
|
return "No timeline events found."
|
||||||
lines = [
|
lines = [
|
||||||
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, "
|
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, "
|
||||||
f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, id: {r['id']})"
|
f"completed: {bool(r.get('isCompleted'))}, completedAt: {r.get('completedAt')}, "
|
||||||
|
f"projectId: {r.get('projectId')}, id: {r['id']})"
|
||||||
for r in rows
|
for r in rows
|
||||||
]
|
]
|
||||||
return f"Found {len(rows)} timeline event(s):\n" + "\n".join(lines)
|
return f"Found {len(rows)} timeline event(s):\n" + "\n".join(lines)
|
||||||
@@ -246,7 +247,8 @@ async def list_timelines_today(user_timezone: str = "UTC", include_completed: bo
|
|||||||
if not rows:
|
if not rows:
|
||||||
return "No timeline events today."
|
return "No timeline events today."
|
||||||
lines = [
|
lines = [
|
||||||
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, completed: {bool(r.get('isCompleted'))}, id: {r['id']})"
|
f"- {r['title']} (date: {r['date']}, type: {r.get('type')}, "
|
||||||
|
f"completed: {bool(r.get('isCompleted'))}, projectId: {r.get('projectId')}, id: {r['id']})"
|
||||||
for r in rows
|
for r in rows
|
||||||
]
|
]
|
||||||
return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines)
|
return f"Timeline events today ({len(rows)}):\n" + "\n".join(lines)
|
||||||
|
|||||||
@@ -294,6 +294,7 @@ async def _handle_floating_request(
|
|||||||
)
|
)
|
||||||
|
|
||||||
context: dict = {
|
context: dict = {
|
||||||
|
"conversation_history": frame.get("conversation_history", []),
|
||||||
"scope": scope,
|
"scope": scope,
|
||||||
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
|
"_debug": {"request_id": request_id, "session_id": session_id, "user_id": user_id},
|
||||||
"format_prefs": frame.get("format_prefs"),
|
"format_prefs": frame.get("format_prefs"),
|
||||||
|
|||||||
59
app/core/agent_session_buffer.py
Normal file
59
app/core/agent_session_buffer.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
"""In-process TTL buffer for per-session LangChain message history.
|
||||||
|
|
||||||
|
Stores the full message list (including AIMessage with tool_calls and ToolMessage)
|
||||||
|
keyed by (user_id, session_id), so agents can reconstruct tool-call context across
|
||||||
|
conversation turns without it being lossy through the wire.
|
||||||
|
|
||||||
|
Single-process only. For multi-worker deployments, replace the _SessionBuffer
|
||||||
|
implementation with one backed by Redis (serialize LangChain messages to dicts via
|
||||||
|
message_to_dict / messages_from_dict from langchain_core.messages).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
|
from langchain_core.messages import BaseMessage
|
||||||
|
|
||||||
|
SESSION_TTL_SECONDS = 1800 # 30-minute idle expiry
|
||||||
|
MAX_MESSAGES_PER_SESSION = 80 # cap to avoid unbounded memory growth
|
||||||
|
|
||||||
|
|
||||||
|
class _SessionBuffer:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._store: dict[tuple[str, str], tuple[float, list[BaseMessage]]] = {}
|
||||||
|
self._lock = Lock()
|
||||||
|
|
||||||
|
def _evict_stale(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
stale = [k for k, (ts, _) in self._store.items() if now - ts > SESSION_TTL_SECONDS]
|
||||||
|
for k in stale:
|
||||||
|
del self._store[k]
|
||||||
|
|
||||||
|
def get(self, user_id: str, session_id: str) -> list[BaseMessage] | None:
|
||||||
|
key = (user_id, session_id)
|
||||||
|
with self._lock:
|
||||||
|
entry = self._store.get(key)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
ts, msgs = entry
|
||||||
|
if time.monotonic() - ts > SESSION_TTL_SECONDS:
|
||||||
|
del self._store[key]
|
||||||
|
return None
|
||||||
|
self._store[key] = (time.monotonic(), msgs)
|
||||||
|
return list(msgs)
|
||||||
|
|
||||||
|
def set(self, user_id: str, session_id: str, messages: list[BaseMessage]) -> None:
|
||||||
|
key = (user_id, session_id)
|
||||||
|
capped = messages[-MAX_MESSAGES_PER_SESSION:]
|
||||||
|
with self._lock:
|
||||||
|
self._evict_stale()
|
||||||
|
self._store[key] = (time.monotonic(), capped)
|
||||||
|
|
||||||
|
def clear(self, user_id: str, session_id: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._store.pop((user_id, session_id), None)
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton — same pattern as _pending_states in api/app/api/routes/auth.py
|
||||||
|
session_buffer = _SessionBuffer()
|
||||||
@@ -16,6 +16,7 @@ from app.agents.note_agent import NOTE_TOOLS
|
|||||||
from app.agents.project_agent import PROJECT_TOOLS
|
from app.agents.project_agent import PROJECT_TOOLS
|
||||||
from app.agents.task_agent import TASK_TOOLS
|
from app.agents.task_agent import TASK_TOOLS
|
||||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||||
|
from app.core.agent_session_buffer import session_buffer
|
||||||
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
|
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
|
||||||
from app.core.llm import get_agent_llm, model_for_agent
|
from app.core.llm import get_agent_llm, model_for_agent
|
||||||
from app.core.memory_middleware import MemoryMiddleware
|
from app.core.memory_middleware import MemoryMiddleware
|
||||||
@@ -24,6 +25,8 @@ from app.db import async_session
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MAX_HISTORY_TURNS = 20
|
||||||
|
|
||||||
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
FloatingDomainType = Literal["task", "timeline", "project", "node"]
|
||||||
FloatingDomainSection = Literal["task", "timeline", "note"]
|
FloatingDomainSection = Literal["task", "timeline", "note"]
|
||||||
|
|
||||||
@@ -176,6 +179,25 @@ def _relational_memory_injection(context: dict[str, Any]) -> str:
|
|||||||
return section
|
return section
|
||||||
|
|
||||||
|
|
||||||
|
_IDENTITY_KEYS = ("user_name", "job_role", "industry", "primary_use_case", "tone_preference")
|
||||||
|
|
||||||
|
|
||||||
|
def _user_identity_injection(context: dict[str, Any]) -> str:
|
||||||
|
"""Return a compact user-profile block from core memory onboarding fields.
|
||||||
|
|
||||||
|
Returns empty string when no onboarding keys are present.
|
||||||
|
"""
|
||||||
|
core = context.get("core_memory") or {}
|
||||||
|
parts: list[str] = []
|
||||||
|
for key in _IDENTITY_KEYS:
|
||||||
|
val = (core.get(key) or "").strip()
|
||||||
|
if val:
|
||||||
|
parts.append(f"- {key}: {val}")
|
||||||
|
if not parts:
|
||||||
|
return ""
|
||||||
|
return "\n\nUser profile:\n" + "\n".join(parts)
|
||||||
|
|
||||||
|
|
||||||
def _request_context_block(context: dict[str, Any]) -> str:
|
def _request_context_block(context: dict[str, Any]) -> str:
|
||||||
"""Return a small block with per-request scope and resolved project context."""
|
"""Return a small block with per-request scope and resolved project context."""
|
||||||
parts: list[str] = []
|
parts: list[str] = []
|
||||||
@@ -189,16 +211,39 @@ def _request_context_block(context: dict[str, Any]) -> str:
|
|||||||
|
|
||||||
|
|
||||||
_HOME_SYSTEM_PROMPT = """\
|
_HOME_SYSTEM_PROMPT = """\
|
||||||
You are the home assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools.
|
You are adiuvAI's home executive assistant.{user_identity}
|
||||||
Always use tools for factual data retrieval before answering.
|
You are not a chatbot — you are a proactive partner who runs ahead of the user, anticipates what they need next, and closes every reply with a concrete next step or a clarifying question.
|
||||||
When the user asks to remember, forget, or update what you know about them, use memory tools.
|
|
||||||
|
# How you work
|
||||||
|
- Use tools before answering anything factual. Never guess counts, dates, or status.
|
||||||
|
- Prefer parallel tool calls when the questions are independent (e.g. counts per status). Chain calls when one result feeds the next.
|
||||||
|
- After delivering the answer, propose the next useful action: a follow-up task to draft, a deadline at risk, a project to triage, a person to remind. Use what you know about the user (job role, industry, primary use case) to make the suggestion relevant.
|
||||||
|
- Match the user's tone preference. Default to warm-but-direct; stay concise.
|
||||||
|
- When the user asks to remember, forget, or update something, use memory tools.
|
||||||
|
|
||||||
|
# Filter discipline
|
||||||
|
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||||||
|
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||||||
|
- When in doubt, omit `assignee` and return the global result.
|
||||||
|
|
||||||
# Output format
|
# Output format
|
||||||
Return markdown and use tags when relevant: <project>[ids]</project>, <task>[ids]</task>, <note>[ids]</note>, <timeline>[ids]</timeline>, <chart>{{json}}</chart>.
|
Return markdown. Reference entities with these tags exactly — one id per tag, each tag on its own line, no prefix/suffix text on the same line:
|
||||||
When listing tasks or timelines, each id tag must be on its own line with no prefix/suffix text.
|
<project>id</project> <task>id</task> <note>id</note> <timeline>id</timeline>
|
||||||
Never put titles, priorities, or dates on the same line as <task> or <timeline> tags.
|
|
||||||
For questions about upcoming timelines (e.g. 'prossimi eventi'), include only future items in the current month unless the user asks a different range.
|
When the answer contains a list of entities (any of the tags above), structure the reply as three blocks separated by blank lines:
|
||||||
For upcoming tasks, after tag lines add a short recommendation based on due date and priority.
|
1. One short intro line stating what is coming (count + scope, e.g. "Ecco i tuoi 18 task ad alta priorità:"). Match the user's language.
|
||||||
|
2. All entity tags, one per line, consecutive, no prose interleaved. Do NOT put titles, dates, priorities, or any descriptive text on the same line as a tag or between tags.
|
||||||
|
3. One short closing recap (1–2 sentences) that points out a pattern, risk, or insight noticed in the list, and ends with a concrete next step or clarifying question.
|
||||||
|
|
||||||
|
For single-entity answers skip blocks 1 and 3 if they would be redundant; just emit the tag.
|
||||||
|
|
||||||
|
For analytical answers (status overviews, breakdowns by category/priority/project, comparisons, trends, "resoconto", "panoramica") consider returning a chart block when it communicates the answer faster than prose. The decision is yours — skip charts for trivial single-number answers. Schema:
|
||||||
|
<chart>{{"chartType":"pie|bar|line|area|radar|radial","title":"...","data":[{{"name":"...","value":N}},...], "config":{{"value":{{"label":"...","color":"var(--chart-1)"}} }} }}</chart>
|
||||||
|
- pie for share-of-total breakdowns; bar for category comparisons; line/area for time series; radar for multi-dimension.
|
||||||
|
- data rows must include a "name" field; numeric series keys must match config keys.
|
||||||
|
- Use var(--chart-1) through var(--chart-5) for colors, cycling 1-5 in series order. Do NOT wrap in hsl() or oklch() — these are complete CSS values already.
|
||||||
|
|
||||||
|
For upcoming-timeline questions ("prossimi eventi"), include only future items in the current month unless the user asks otherwise.
|
||||||
|
|
||||||
# Date filtering
|
# Date filtering
|
||||||
{date_context}
|
{date_context}
|
||||||
@@ -221,11 +266,23 @@ For "today" / "tomorrow" queries, prefer list_tasks_due_today / list_timelines_t
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
_FLOATING_SYSTEM_PROMPT = """\
|
_FLOATING_SYSTEM_PROMPT = """\
|
||||||
You are the floating assistant for adiuvAI with direct access to all tools: tasks, projects, notes, timelines, and memory tools.
|
You are adiuvAI's floating executive assistant.{user_identity}
|
||||||
Stay focused on the floating scope and answer concisely.
|
You are pinned to a specific entity (task, timeline event, project, or note) and you stay strictly within that scope.
|
||||||
Return plain text only. Do not output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed id tag wrappers.
|
Be a proactive partner: anticipate the next useful action and close with a concrete suggestion or a clarifying question — but stay terse, one short paragraph at most.
|
||||||
Always use tools for factual data retrieval before answering.
|
|
||||||
When the user asks to remember, forget, or update what you know about them, use memory tools.
|
# How you work
|
||||||
|
- Use tools before answering anything factual. Never guess.
|
||||||
|
- Stay in the floating scope (see Request context). If the user asks something outside scope, answer briefly and suggest opening the home assistant.
|
||||||
|
- Match the user's tone preference. Default to warm-but-direct.
|
||||||
|
- When the user asks to remember, forget, or update something, use memory tools.
|
||||||
|
|
||||||
|
# Filter discipline
|
||||||
|
- Never set the `assignee` filter on list_tasks/count_tasks unless the user explicitly names a person ("Marco's tasks") or refers to themselves ("my tasks", "assigned to me", "mine").
|
||||||
|
- The user's own name in the User profile block is for context only — it is NOT a default filter.
|
||||||
|
- When in doubt, omit `assignee` and return the global result.
|
||||||
|
|
||||||
|
# Output format
|
||||||
|
Plain text only. Do NOT output XML/HTML-like tags such as <task>, <project>, <note>, <timeline>, or any bracketed-id wrappers, and do NOT output <chart> blocks — those are for the home assistant.
|
||||||
|
|
||||||
# Date filtering
|
# Date filtering
|
||||||
{date_context}
|
{date_context}
|
||||||
@@ -361,6 +418,7 @@ def _build_system_prompt(name: str, fallback: str, context: dict[str, Any]) -> t
|
|||||||
template, prompt_obj,
|
template, prompt_obj,
|
||||||
date_context=_datetime_context_injection(context).strip(),
|
date_context=_datetime_context_injection(context).strip(),
|
||||||
language_instruction=_language_instruction(context).strip(),
|
language_instruction=_language_instruction(context).strip(),
|
||||||
|
user_identity=_user_identity_injection(context).strip(),
|
||||||
relational_memory=_relational_memory_injection(context).strip(),
|
relational_memory=_relational_memory_injection(context).strip(),
|
||||||
proactive_hints=_proactive_hints_injection(context).strip(),
|
proactive_hints=_proactive_hints_injection(context).strip(),
|
||||||
request_context=_request_context_block(context),
|
request_context=_request_context_block(context),
|
||||||
@@ -807,6 +865,23 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
|
|||||||
return _infer_floating_domain_rule_based(message, context)
|
return _infer_floating_domain_rule_based(message, context)
|
||||||
|
|
||||||
|
|
||||||
|
def _history_to_messages(history: list[dict[str, str]] | None) -> list[Any]:
|
||||||
|
if not history:
|
||||||
|
return []
|
||||||
|
turns = history[-MAX_HISTORY_TURNS:]
|
||||||
|
result: list[Any] = []
|
||||||
|
for turn in turns:
|
||||||
|
role = turn.get("role", "")
|
||||||
|
content = turn.get("content", "")
|
||||||
|
if not content:
|
||||||
|
continue
|
||||||
|
if role == "user":
|
||||||
|
result.append(HumanMessage(content=content))
|
||||||
|
elif role == "assistant":
|
||||||
|
result.append(AIMessage(content=content))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def _run_single_agent(
|
async def _run_single_agent(
|
||||||
*,
|
*,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
@@ -816,6 +891,7 @@ async def _run_single_agent(
|
|||||||
max_steps: int = 6,
|
max_steps: int = 6,
|
||||||
langfuse_prompt: Any = None,
|
langfuse_prompt: Any = None,
|
||||||
agent_name: str = "agent",
|
agent_name: str = "agent",
|
||||||
|
conversation_history: list[dict[str, str]] | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
trace_id = _trace_id_from_context(context)
|
trace_id = _trace_id_from_context(context)
|
||||||
session_id = _session_id_from_context(context)
|
session_id = _session_id_from_context(context)
|
||||||
@@ -824,8 +900,11 @@ async def _run_single_agent(
|
|||||||
tools = _all_tools_for_user(user_id, trace_id)
|
tools = _all_tools_for_user(user_id, trace_id)
|
||||||
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
|
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
|
||||||
llm_with_tools = llm.bind_tools(tools)
|
llm_with_tools = llm.bind_tools(tools)
|
||||||
|
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||||||
|
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||||||
messages: list[Any] = [
|
messages: list[Any] = [
|
||||||
SystemMessage(content=system_prompt),
|
SystemMessage(content=system_prompt),
|
||||||
|
*history_messages,
|
||||||
HumanMessage(content=message),
|
HumanMessage(content=message),
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -838,7 +917,7 @@ async def _run_single_agent(
|
|||||||
|
|
||||||
_span_ctx = (
|
_span_ctx = (
|
||||||
lf.start_as_current_observation(
|
lf.start_as_current_observation(
|
||||||
as_type="span",
|
as_type="agent",
|
||||||
name=agent_name,
|
name=agent_name,
|
||||||
metadata={"user_id": user_id, "session_id": trace_id},
|
metadata={"user_id": user_id, "session_id": trace_id},
|
||||||
input=message,
|
input=message,
|
||||||
@@ -846,6 +925,7 @@ async def _run_single_agent(
|
|||||||
if lf else None
|
if lf else None
|
||||||
)
|
)
|
||||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
|
_messages_to_save: list[Any] | None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
@@ -878,6 +958,7 @@ async def _run_single_agent(
|
|||||||
)
|
)
|
||||||
if _span:
|
if _span:
|
||||||
_span.update(output=final_text)
|
_span.update(output=final_text)
|
||||||
|
_messages_to_save = messages[1:] # strip SystemMessage; save full tool history
|
||||||
return final_text
|
return final_text
|
||||||
|
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
@@ -896,6 +977,14 @@ async def _run_single_agent(
|
|||||||
tool_fn = tool_map.get(call_name)
|
tool_fn = tool_map.get(call_name)
|
||||||
if tool_fn is None:
|
if tool_fn is None:
|
||||||
tool_output = f"Unknown tool: {call_name}"
|
tool_output = f"Unknown tool: {call_name}"
|
||||||
|
elif lf:
|
||||||
|
with lf.start_as_current_observation(
|
||||||
|
as_type="tool",
|
||||||
|
name=call_name,
|
||||||
|
input=call_args,
|
||||||
|
) as tool_obs:
|
||||||
|
tool_output = await tool_fn.ainvoke(call_args)
|
||||||
|
tool_obs.update(output=str(tool_output)[:8000])
|
||||||
else:
|
else:
|
||||||
tool_output = await tool_fn.ainvoke(call_args)
|
tool_output = await tool_fn.ainvoke(call_args)
|
||||||
|
|
||||||
@@ -910,6 +999,7 @@ async def _run_single_agent(
|
|||||||
|
|
||||||
final = await llm.ainvoke(messages)
|
final = await llm.ainvoke(messages)
|
||||||
final_text = _as_text(final.content)
|
final_text = _as_text(final.content)
|
||||||
|
messages.append(AIMessage(content=final_text))
|
||||||
logger.info(
|
logger.info(
|
||||||
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
"deep_agent: run_single_agent_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||||||
trace_id or "-",
|
trace_id or "-",
|
||||||
@@ -919,8 +1009,11 @@ async def _run_single_agent(
|
|||||||
)
|
)
|
||||||
if _span:
|
if _span:
|
||||||
_span.update(output=final_text)
|
_span.update(output=final_text)
|
||||||
|
_messages_to_save = messages[1:]
|
||||||
return final_text
|
return final_text
|
||||||
finally:
|
finally:
|
||||||
|
if session_id and _messages_to_save is not None:
|
||||||
|
session_buffer.set(user_id, session_id, _messages_to_save)
|
||||||
clear_tool_result_collector()
|
clear_tool_result_collector()
|
||||||
if _span_ctx:
|
if _span_ctx:
|
||||||
_span_ctx.__exit__(None, None, None)
|
_span_ctx.__exit__(None, None, None)
|
||||||
@@ -939,6 +1032,7 @@ async def _run_single_agent_stream(
|
|||||||
langfuse_prompt: Any = None,
|
langfuse_prompt: Any = None,
|
||||||
agent_name: str = "agent",
|
agent_name: str = "agent",
|
||||||
tools: list[Any] | None = None,
|
tools: list[Any] | None = None,
|
||||||
|
conversation_history: list[dict[str, str]] | None = None,
|
||||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||||
trace_id = _trace_id_from_context(context)
|
trace_id = _trace_id_from_context(context)
|
||||||
session_id = _session_id_from_context(context)
|
session_id = _session_id_from_context(context)
|
||||||
@@ -948,8 +1042,11 @@ async def _run_single_agent_stream(
|
|||||||
tools = _all_tools_for_user(user_id, trace_id)
|
tools = _all_tools_for_user(user_id, trace_id)
|
||||||
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
|
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
|
||||||
llm_with_tools = llm.bind_tools(tools)
|
llm_with_tools = llm.bind_tools(tools)
|
||||||
|
_buffered = session_buffer.get(user_id, session_id) if session_id else None
|
||||||
|
history_messages = _buffered if _buffered is not None else _history_to_messages(conversation_history)
|
||||||
messages: list[Any] = [
|
messages: list[Any] = [
|
||||||
SystemMessage(content=system_prompt),
|
SystemMessage(content=system_prompt),
|
||||||
|
*history_messages,
|
||||||
HumanMessage(content=message),
|
HumanMessage(content=message),
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -963,7 +1060,7 @@ async def _run_single_agent_stream(
|
|||||||
|
|
||||||
_span_ctx = (
|
_span_ctx = (
|
||||||
lf.start_as_current_observation(
|
lf.start_as_current_observation(
|
||||||
as_type="span",
|
as_type="agent",
|
||||||
name=f"{agent_name}-stream",
|
name=f"{agent_name}-stream",
|
||||||
metadata={"user_id": user_id, "session_id": trace_id},
|
metadata={"user_id": user_id, "session_id": trace_id},
|
||||||
input=message,
|
input=message,
|
||||||
@@ -972,6 +1069,7 @@ async def _run_single_agent_stream(
|
|||||||
)
|
)
|
||||||
_span = _span_ctx.__enter__() if _span_ctx else None
|
_span = _span_ctx.__enter__() if _span_ctx else None
|
||||||
streamed_text: list[str] = []
|
streamed_text: list[str] = []
|
||||||
|
_messages_to_save: list[Any] | None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
@@ -1009,6 +1107,8 @@ async def _run_single_agent_stream(
|
|||||||
)
|
)
|
||||||
if _span:
|
if _span:
|
||||||
_span.update(output="".join(streamed_text))
|
_span.update(output="".join(streamed_text))
|
||||||
|
messages.append(response)
|
||||||
|
_messages_to_save = messages[1:] # strip SystemMessage
|
||||||
return
|
return
|
||||||
|
|
||||||
messages.append(response)
|
messages.append(response)
|
||||||
@@ -1028,6 +1128,14 @@ async def _run_single_agent_stream(
|
|||||||
tool_fn = tool_map.get(call_name)
|
tool_fn = tool_map.get(call_name)
|
||||||
if tool_fn is None:
|
if tool_fn is None:
|
||||||
tool_output = f"Unknown tool: {call_name}"
|
tool_output = f"Unknown tool: {call_name}"
|
||||||
|
elif lf:
|
||||||
|
with lf.start_as_current_observation(
|
||||||
|
as_type="tool",
|
||||||
|
name=call_name,
|
||||||
|
input=call_args,
|
||||||
|
) as tool_obs:
|
||||||
|
tool_output = await tool_fn.ainvoke(call_args)
|
||||||
|
tool_obs.update(output=str(tool_output)[:8000])
|
||||||
else:
|
else:
|
||||||
tool_output = await tool_fn.ainvoke(call_args)
|
tool_output = await tool_fn.ainvoke(call_args)
|
||||||
|
|
||||||
@@ -1040,12 +1148,16 @@ async def _run_single_agent_stream(
|
|||||||
|
|
||||||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||||||
|
|
||||||
|
fallback_chunks: list[str] = []
|
||||||
async for chunk in llm.astream(messages):
|
async for chunk in llm.astream(messages):
|
||||||
token = _as_text(getattr(chunk, "content", ""))
|
token = _as_text(getattr(chunk, "content", ""))
|
||||||
if token:
|
if token:
|
||||||
streamed_chars += len(token)
|
streamed_chars += len(token)
|
||||||
streamed_text.append(token)
|
streamed_text.append(token)
|
||||||
|
fallback_chunks.append(token)
|
||||||
yield "token", token
|
yield "token", token
|
||||||
|
messages.append(AIMessage(content="".join(fallback_chunks)))
|
||||||
|
_messages_to_save = messages[1:]
|
||||||
logger.info(
|
logger.info(
|
||||||
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
"deep_agent: run_single_agent_stream_end trace=%s user=%s tool_calls=%d response_chars=%d fallback=1",
|
||||||
trace_id or "-",
|
trace_id or "-",
|
||||||
@@ -1056,6 +1168,8 @@ async def _run_single_agent_stream(
|
|||||||
if _span:
|
if _span:
|
||||||
_span.update(output="".join(streamed_text))
|
_span.update(output="".join(streamed_text))
|
||||||
finally:
|
finally:
|
||||||
|
if session_id and _messages_to_save is not None:
|
||||||
|
session_buffer.set(user_id, session_id, _messages_to_save)
|
||||||
clear_tool_result_collector()
|
clear_tool_result_collector()
|
||||||
if _span_ctx:
|
if _span_ctx:
|
||||||
_span_ctx.__exit__(None, None, None)
|
_span_ctx.__exit__(None, None, None)
|
||||||
@@ -1074,6 +1188,7 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
|
|||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
langfuse_prompt=langfuse_prompt,
|
langfuse_prompt=langfuse_prompt,
|
||||||
agent_name="home-agent",
|
agent_name="home-agent",
|
||||||
|
conversation_history=context.get("conversation_history"),
|
||||||
)
|
)
|
||||||
return _normalize_tagged_list_lines(response, message)
|
return _normalize_tagged_list_lines(response, message)
|
||||||
|
|
||||||
@@ -1089,6 +1204,7 @@ async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> t
|
|||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
langfuse_prompt=langfuse_prompt,
|
langfuse_prompt=langfuse_prompt,
|
||||||
agent_name="floating-agent",
|
agent_name="floating-agent",
|
||||||
|
conversation_history=context.get("conversation_history"),
|
||||||
)
|
)
|
||||||
sanitized = _strip_floating_markup(response)
|
sanitized = _strip_floating_markup(response)
|
||||||
if not sanitized and response:
|
if not sanitized and response:
|
||||||
@@ -1111,6 +1227,7 @@ async def run_home_stream(
|
|||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
langfuse_prompt=langfuse_prompt,
|
langfuse_prompt=langfuse_prompt,
|
||||||
agent_name="home-agent",
|
agent_name="home-agent",
|
||||||
|
conversation_history=context.get("conversation_history"),
|
||||||
):
|
):
|
||||||
event_type, data = event
|
event_type, data = event
|
||||||
if event_type != "token":
|
if event_type != "token":
|
||||||
@@ -1143,6 +1260,7 @@ async def run_floating_stream(
|
|||||||
context=prepared_context,
|
context=prepared_context,
|
||||||
langfuse_prompt=langfuse_prompt,
|
langfuse_prompt=langfuse_prompt,
|
||||||
agent_name="floating-agent",
|
agent_name="floating-agent",
|
||||||
|
conversation_history=context.get("conversation_history"),
|
||||||
):
|
):
|
||||||
event_type, data = event
|
event_type, data = event
|
||||||
if event_type != "token":
|
if event_type != "token":
|
||||||
|
|||||||
@@ -7,10 +7,32 @@ The callback sends a `tool_call` WS frame and awaits the `tool_result`.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
from typing import Any, Callable, Coroutine
|
from typing import Any, Callable, Coroutine
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
_SNAKE_TO_CAMEL_RE = re.compile(r"_([a-z])")
|
||||||
|
|
||||||
|
|
||||||
|
def _key_to_camel(key: str) -> str:
|
||||||
|
return _SNAKE_TO_CAMEL_RE.sub(lambda m: m.group(1).upper(), key)
|
||||||
|
|
||||||
|
|
||||||
|
def _keys_to_camel(obj: Any) -> Any:
|
||||||
|
"""Recursively convert dict keys from snake_case to camelCase.
|
||||||
|
|
||||||
|
Mirrors the JS-side ``toCamelCase`` applied to incoming WS frames in
|
||||||
|
``adiuvAI/src/main/api/backend-client.ts``. The Electron executor wraps
|
||||||
|
tool_result payloads in ``toSnakeCase`` before sending; this restores the
|
||||||
|
camelCase schema property names that the tool code expects to read.
|
||||||
|
"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {_key_to_camel(k): _keys_to_camel(v) for k, v in obj.items()}
|
||||||
|
if isinstance(obj, list):
|
||||||
|
return [_keys_to_camel(v) for v in obj]
|
||||||
|
return obj
|
||||||
|
|
||||||
# Holds the execute callback for the current WS session.
|
# Holds the execute callback for the current WS session.
|
||||||
# Set by the chat WS handler before the orchestrator runs; cleared after.
|
# Set by the chat WS handler before the orchestrator runs; cleared after.
|
||||||
_client_executor: ContextVar[Callable[[dict], Coroutine[Any, Any, dict]]] = ContextVar(
|
_client_executor: ContextVar[Callable[[dict], Coroutine[Any, Any, dict]]] = ContextVar(
|
||||||
@@ -82,6 +104,7 @@ async def execute_on_client(
|
|||||||
payload["limit"] = limit
|
payload["limit"] = limit
|
||||||
|
|
||||||
result = await callback(payload)
|
result = await callback(payload)
|
||||||
|
result = _keys_to_camel(result)
|
||||||
collector = _tool_result_collector.get(None)
|
collector = _tool_result_collector.get(None)
|
||||||
if collector is not None:
|
if collector is not None:
|
||||||
collector.append({
|
collector.append({
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ google-auth-httplib2>=0.2.0
|
|||||||
msal>=1.28.0
|
msal>=1.28.0
|
||||||
cryptography>=42.0.0
|
cryptography>=42.0.0
|
||||||
pgvector>=0.2.5
|
pgvector>=0.2.5
|
||||||
langfuse>=2.0.0
|
langfuse>=3.3.1
|
||||||
beautifulsoup4>=4.12.0
|
beautifulsoup4>=4.12.0
|
||||||
lxml>=5.0.0
|
lxml>=5.0.0
|
||||||
PyYAML>=6.0.0
|
PyYAML>=6.0.0
|
||||||
|
|||||||
Reference in New Issue
Block a user