diff --git a/app/agents/note_agent.py b/app/agents/note_agent.py
index b8a6f18..cae644b 100644
--- a/app/agents/note_agent.py
+++ b/app/agents/note_agent.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import re
from typing import Any
from langchain_core.tools import tool
@@ -9,6 +10,14 @@ from langchain_core.tools import tool
from app.core.llm import embed
from app.core.ws_context import execute_on_client
+_UUID_RE = re.compile(
+ r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
+)
+
+
+def _is_uuid(value: str) -> bool:
+ return bool(_UUID_RE.match(value))
+
NOTE_SYSTEM_PROMPT = (
"You are a note-taking assistant. You help users create, retrieve, update,\n"
"and delete Markdown notes in their workspace.\n\n"
@@ -19,6 +28,7 @@ NOTE_SYSTEM_PROMPT = (
" before appending or replacing sections\n"
" - list_notes without project_id returns all notes; scope with project_id\n"
" when the user is working within a specific project\n"
+ " - project_id must be a UUID; if you only know a project name, do not pass it as project_id\n"
" - Do not fabricate note content — reflect what the user provides or what\n"
" is already in the note (retrieved via get_note)."
)
@@ -27,10 +37,11 @@ NOTE_SYSTEM_PROMPT = (
@tool
async def list_notes(project_id: str = "") -> str:
"""List notes, optionally scoped to a project by project_id."""
+ normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else ""
result = await execute_on_client(
action="select",
table="notes",
- filters={"projectId": project_id or None},
+ filters={"projectId": normalized_project_id or None},
)
rows = result.get("rows", [])
if not rows:
diff --git a/app/agents/task_agent.py b/app/agents/task_agent.py
index 3f8ab95..0259a0f 100644
--- a/app/agents/task_agent.py
+++ b/app/agents/task_agent.py
@@ -3,12 +3,21 @@
from __future__ import annotations
from datetime import datetime, timezone
+import re
from typing import Any
from langchain_core.tools import tool
from app.core.ws_context import execute_on_client
+_UUID_RE = re.compile(
+ r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
+)
+
+
+def _is_uuid(value: str) -> bool:
+ return bool(_UUID_RE.match(value))
+
TASK_SYSTEM_PROMPT = (
"You are a task management assistant for a project workspace.\n"
"You create, update, list, and track tasks and their comments.\n\n"
@@ -39,11 +48,12 @@ async def list_tasks(
) -> str:
"""List tasks, optionally filtered by project_id, status (todo|in_progress|done),
a search string, or an order_by field name (dueDate|priority|createdAt)."""
+ normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else ""
result = await execute_on_client(
action="select",
table="tasks",
filters={
- "projectId": project_id or None,
+ "projectId": normalized_project_id or None,
"status": status or None,
"search": search or None,
"orderBy": order_by or None,
@@ -205,8 +215,12 @@ async def add_task_comment(task_id: str, author: str, content: str) -> str:
table="taskComments",
data={"taskId": task_id, "author": author, "content": content},
)
- row = result["row"]
- return f"Comment added by {row['author']} on task {row['taskId']} (comment id: {row['id']})."
+ row = result.get("row", {})
+ row_author = row.get("author", author)
+ # Electron payloads can vary (taskId vs task_id). Fall back to input task_id.
+ row_task_id = row.get("taskId") or row.get("task_id") or task_id
+ row_comment_id = row.get("id", "unknown")
+ return f"Comment added by {row_author} on task {row_task_id} (comment id: {row_comment_id})."
@tool
diff --git a/app/agents/timeline_agent.py b/app/agents/timeline_agent.py
index 19708e9..f9b5652 100644
--- a/app/agents/timeline_agent.py
+++ b/app/agents/timeline_agent.py
@@ -2,17 +2,27 @@
from __future__ import annotations
+import re
from typing import Any
from langchain_core.tools import tool
from app.core.ws_context import execute_on_client
+_UUID_RE = re.compile(
+ r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
+)
+
+
+def _is_uuid(value: str) -> bool:
+ return bool(_UUID_RE.match(value))
+
TIMELINE_SYSTEM_PROMPT = (
"You are a project timeline assistant. Timelines are milestone dates that\n"
"track progress on a project — they are not calendar events.\n\n"
"Rules:\n"
" - project_id is REQUIRED for every create; confirm with the user if unknown\n"
+ " - For listing, project_id must be a UUID; never pass plain names as project_id\n"
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
" - is_approved: 0 until the user explicitly confirms; then 1\n"
@@ -25,10 +35,11 @@ TIMELINE_SYSTEM_PROMPT = (
@tool
async def list_timelines(project_id: str = "") -> str:
"""List timelines. Provide project_id to scope to a specific project."""
+ normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else ""
result = await execute_on_client(
action="select",
table="timelines",
- filters={"projectId": project_id or None},
+ filters={"projectId": normalized_project_id or None},
)
rows = result.get("rows", [])
if not rows:
diff --git a/app/core/deep_agent.py b/app/core/deep_agent.py
index b64624c..52f5166 100644
--- a/app/core/deep_agent.py
+++ b/app/core/deep_agent.py
@@ -5,8 +5,10 @@ from __future__ import annotations
import asyncio
import json
import logging
+import re
from collections.abc import AsyncGenerator, Awaitable, Callable
-from typing import Any, Literal, TypedDict
+import operator
+from typing import Annotated, Any, Literal, TypedDict
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
@@ -21,11 +23,14 @@ from app.agents.task_agent import TASK_SYSTEM_PROMPT, TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_SYSTEM_PROMPT, TIMELINE_TOOLS
from app.core.llm import get_llm
from app.core.memory_middleware import MemoryMiddleware
-from app.core.ws_context import clear_tool_result_collector, set_tool_result_collector
+from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session
logger = logging.getLogger(__name__)
+# Quick test switch: home requests run as one agent with all tools.
+HOME_SINGLE_AGENT_TEST_MODE = True
+
WorkerName = Literal["task_agent", "project_agent", "note_agent", "timeline_agent"]
FloatingDomain = Literal["tasks", "projects", "notes", "timelines"]
@@ -55,6 +60,7 @@ class WorkerResult(TypedDict):
instruction: str
response: str
entity_ids: dict[str, list[str]]
+ facts: dict[str, Any]
class OrchestratorState(TypedDict, total=False):
@@ -70,7 +76,7 @@ class OrchestratorState(TypedDict, total=False):
class GraphState(OrchestratorState):
- worker_results: list[WorkerResult]
+ worker_results: Annotated[list[WorkerResult], operator.add]
class ReducerState(OrchestratorState):
@@ -127,7 +133,9 @@ _HOME_SYNTH_SYSTEM = (
"You are the final response synthesizer. Return markdown only. "
"Embed inline component tags when relevant: [ids], [ids], "
"[ids], [ids], and {json}. "
- "Only include IDs that are truly relevant to the request."
+ "Only include IDs that are truly relevant to the request. "
+ "Never invent missing values. If facts include a non-null clientId for a project, "
+ "do not claim that the project has no owner/client."
)
_FLOATING_SYNTH_SYSTEM = (
@@ -135,6 +143,14 @@ _FLOATING_SYNTH_SYSTEM = (
"Return concise markdown and stay focused on the requested scope."
)
+_HOME_SINGLE_AGENT_SYSTEM = (
+ "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines. "
+ "Always use tools for factual data retrieval before answering. "
+ "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. "
+ "Return markdown and embed inline tags when relevant: [ids], [ids], "
+ "[ids], [ids], {json}."
+)
+
def _as_text(content: Any) -> str:
if content is None:
@@ -249,7 +265,171 @@ def _coerce_plan(payload: dict[str, Any], message: str, floating: bool) -> Worke
)
+def _needs_full_project_snapshot(message: str, floating: bool) -> bool:
+ """Detect project status/update requests that should query all workers."""
+ if floating:
+ return False
+ lowered = message.lower()
+ has_project = any(k in lowered for k in ["project", "progetto", "progetto", "progetti", "progetto", "whitelist"])
+ has_status_intent = any(k in lowered for k in ["status", "stato", "aggiorn", "update", "situazione", "riepilogo", "summary"])
+ return has_project and has_status_intent
+
+
+def _build_full_project_snapshot_plan(message: str) -> WorkerPlan:
+ """Build a deterministic all-workers plan for project status snapshots."""
+ project_hint = (
+ "Use context.context.resolved_project_id when present as project_id. "
+ "Do not pass project names as project_id."
+ )
+ return WorkerPlan(
+ tasks=[
+ WorkerTask(worker="project_agent", instruction=f"Resolve the target project from this request and return core fields including id, name, status, clientId. {project_hint} Request: {message}"),
+ WorkerTask(worker="task_agent", instruction=f"Collect tasks relevant to the project in this request; include pending/blocked highlights and IDs. {project_hint} Request: {message}"),
+ WorkerTask(worker="timeline_agent", instruction=f"Collect timeline/milestone items relevant to the project in this request; include upcoming items and IDs. {project_hint} Request: {message}"),
+ WorkerTask(worker="note_agent", instruction=f"Collect notes relevant to the project in this request; include latest useful notes and IDs. {project_hint} Request: {message}"),
+ ]
+ )
+
+
+def _candidate_tokens(message: str) -> list[str]:
+ tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower())
+ return [t for t in tokens if len(t) >= 3]
+
+
+async def _resolve_project_id_from_message(message: str) -> str | None:
+ """Resolve likely project UUID from user message using client project list."""
+ try:
+ result = await execute_on_client(action="select", table="projects")
+ except Exception as exc:
+ logger.warning("deep_agent: project resolve select failed: %s", exc)
+ return None
+
+ rows = result.get("rows", [])
+ if not isinstance(rows, list) or not rows:
+ return None
+
+ tokens = _candidate_tokens(message)
+ scored: list[tuple[int, dict[str, Any]]] = []
+ for row in rows:
+ if not isinstance(row, dict):
+ continue
+ name = str(row.get("name", "")).lower()
+ score = sum(1 for token in tokens if token in name)
+ if score > 0:
+ scored.append((score, row))
+
+ if not scored:
+ return None
+
+ scored.sort(key=lambda item: item[0], reverse=True)
+ top_score = scored[0][0]
+ top_rows = [row for score, row in scored if score == top_score]
+ if len(top_rows) != 1:
+ return None
+
+ project_id = top_rows[0].get("id")
+ return project_id if isinstance(project_id, str) else None
+
+
+async def _prepare_home_context(message: str, context: dict[str, Any]) -> dict[str, Any]:
+ """Resolve and inject project_id hints for home flows."""
+ prepared = dict(context)
+ if _needs_full_project_snapshot(message, floating=False):
+ resolved_project_id = await _resolve_project_id_from_message(message)
+ if resolved_project_id:
+ prepared["resolved_project_id"] = resolved_project_id
+ logger.info("deep_agent: resolved_project_id=%s for message=%s", resolved_project_id, message[:200])
+ return prepared
+
+
+def _all_tools() -> list[Any]:
+ tools: list[Any] = []
+ for config in WORKER_CONFIG.values():
+ tools.extend(config["tools"])
+ return tools
+
+
+async def _run_home_single_agent(
+ user_id: str,
+ message: str,
+ context: dict[str, Any],
+) -> str:
+ """Single-agent test mode: one loop with all tools."""
+ prepared_context = await _prepare_home_context(message, context)
+
+ llm = get_llm()
+ tools = _all_tools()
+ llm_with_tools = llm.bind_tools(tools)
+ messages: list[Any] = [
+ SystemMessage(content=_HOME_SINGLE_AGENT_SYSTEM),
+ HumanMessage(content=f"User message:\n{message}\n\nContext:\n{json.dumps({'context': prepared_context}, ensure_ascii=True)[:3500]}"),
+ ]
+
+ for _ in range(6):
+ response: AIMessage = await llm_with_tools.ainvoke(messages)
+ messages.append(response)
+ if not response.tool_calls:
+ return _as_text(response.content)
+
+ tool_map = {t.name: t for t in tools}
+ for call in response.tool_calls:
+ tool_fn = tool_map.get(call["name"])
+ if tool_fn is None:
+ tool_output = f"Unknown tool: {call['name']}"
+ else:
+ tool_output = await tool_fn.ainvoke(call.get("args", {}))
+ messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
+
+ final = await llm.ainvoke(messages)
+ return _as_text(final.content)
+
+
+async def _run_home_single_agent_stream(
+ user_id: str,
+ message: str,
+ context: dict[str, Any],
+) -> AsyncGenerator[tuple[str, Any], None]:
+ """Streaming variant for single-agent home test mode."""
+ prepared_context = await _prepare_home_context(message, context)
+
+ llm = get_llm()
+ tools = _all_tools()
+ llm_with_tools = llm.bind_tools(tools)
+ messages: list[Any] = [
+ SystemMessage(content=_HOME_SINGLE_AGENT_SYSTEM),
+ HumanMessage(content=f"User message:\n{message}\n\nContext:\n{json.dumps({'context': prepared_context}, ensure_ascii=True)[:3500]}"),
+ ]
+
+ for _ in range(6):
+ response: AIMessage = await llm_with_tools.ainvoke(messages)
+ messages.append(response)
+ if not response.tool_calls:
+ async for chunk in llm.astream(messages):
+ token = _as_text(getattr(chunk, "content", ""))
+ if token:
+ yield "token", token
+ return
+
+ tool_map = {t.name: t for t in tools}
+ for call in response.tool_calls:
+ tool_fn = tool_map.get(call["name"])
+ if tool_fn is None:
+ tool_output = f"Unknown tool: {call['name']}"
+ else:
+ tool_output = await tool_fn.ainvoke(call.get("args", {}))
+ messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
+
+ async for chunk in llm.astream(messages):
+ token = _as_text(getattr(chunk, "content", ""))
+ if token:
+ yield "token", token
+
+
async def _plan_with_llm(message: str, context: dict[str, Any], floating: bool) -> WorkerPlan:
+ if _needs_full_project_snapshot(message, floating):
+ logger.info("deep_agent: forcing full project snapshot plan for message=%s", message[:200])
+ return _build_full_project_snapshot_plan(message)
+
llm = get_llm()
system = _FLOATING_ORCHESTRATOR_SYSTEM if floating else _HOME_ORCHESTRATOR_SYSTEM
@@ -279,7 +459,13 @@ async def _plan_with_llm(message: str, context: dict[str, Any], floating: bool)
payload = _extract_json_object(_as_text(response.content))
if payload is None:
raise ValueError("planner returned non-JSON output")
- return _coerce_plan(payload, message, floating)
+ plan = _coerce_plan(payload, message, floating)
+ logger.info(
+ "deep_agent: planner produced tasks=%s floating=%s",
+ [t.worker for t in plan.tasks],
+ plan.floating_domain,
+ )
+ return plan
except Exception as exc:
logger.warning("deep_agent: planner failed, using fallback: %s", exc)
@@ -324,6 +510,64 @@ def _extract_entity_ids(tool_results: list[dict[str, Any]]) -> dict[str, list[st
return out
+def _extract_facts(tool_results: list[dict[str, Any]]) -> dict[str, Any]:
+ """Extract small, structured facts for the synthesizer to avoid hallucinations."""
+ facts: dict[str, Any] = {"projects": [], "tasks": [], "notes": [], "timelines": []}
+
+ for item in tool_results:
+ table = item.get("table")
+ payload = item.get("data") or {}
+
+ rows: list[dict[str, Any]] = []
+ row = payload.get("row")
+ if isinstance(row, dict):
+ rows.append(row)
+ if isinstance(payload.get("rows"), list):
+ rows.extend([r for r in payload["rows"] if isinstance(r, dict)])
+
+ if table == "projects":
+ for r in rows:
+ facts["projects"].append(
+ {
+ "id": r.get("id"),
+ "name": r.get("name"),
+ "status": r.get("status"),
+ "clientId": r.get("clientId"),
+ }
+ )
+ elif table == "tasks":
+ for r in rows:
+ facts["tasks"].append(
+ {
+ "id": r.get("id"),
+ "title": r.get("title"),
+ "status": r.get("status"),
+ "projectId": r.get("projectId"),
+ }
+ )
+ elif table == "notes":
+ for r in rows:
+ facts["notes"].append(
+ {
+ "id": r.get("id"),
+ "title": r.get("title"),
+ "projectId": r.get("projectId"),
+ }
+ )
+ elif table == "timelines":
+ for r in rows:
+ facts["timelines"].append(
+ {
+ "id": r.get("id"),
+ "title": r.get("title"),
+ "date": r.get("date"),
+ "projectId": r.get("projectId"),
+ }
+ )
+
+ return facts
+
+
async def _run_tool_loop(
worker: WorkerName,
instruction: str,
@@ -335,10 +579,45 @@ async def _run_tool_loop(
llm = get_llm()
llm_with_tools = llm.bind_tools(tools) if tools else llm
+ resolved_project_id = None
+ ctx = context.get("context", {}) if isinstance(context, dict) else {}
+ if isinstance(ctx, dict):
+ rpid = ctx.get("resolved_project_id")
+ if isinstance(rpid, str) and rpid:
+ resolved_project_id = rpid
+
+ mandatory_tool_policy = ""
+ if resolved_project_id:
+ if worker == "project_agent":
+ mandatory_tool_policy = (
+ "MANDATORY TOOL POLICY:\n"
+ f"- You MUST call get_project(project_id=\"{resolved_project_id}\") before final answer.\n"
+ "- Optionally call list_projects afterward only if needed for disambiguation.\n\n"
+ )
+ elif worker == "task_agent":
+ mandatory_tool_policy = (
+ "MANDATORY TOOL POLICY:\n"
+ f"- You MUST call list_tasks(project_id=\"{resolved_project_id}\") before final answer.\n"
+ "- Do not use project name as project_id.\n\n"
+ )
+ elif worker == "timeline_agent":
+ mandatory_tool_policy = (
+ "MANDATORY TOOL POLICY:\n"
+ f"- You MUST call list_timelines(project_id=\"{resolved_project_id}\") before final answer.\n"
+ "- Do not use project name as project_id.\n\n"
+ )
+ elif worker == "note_agent":
+ mandatory_tool_policy = (
+ "MANDATORY TOOL POLICY:\n"
+ f"- You MUST call list_notes(project_id=\"{resolved_project_id}\") before final answer.\n"
+ "- Do not use project name as project_id.\n\n"
+ )
+
messages: list[Any] = [
SystemMessage(content=worker_prompt),
HumanMessage(
content=(
+ mandatory_tool_policy +
"Worker instruction:\n"
f"{instruction}\n\n"
"Conversation context:\n"
@@ -359,12 +638,38 @@ async def _run_tool_loop(
tool_map = {t.name: t for t in tools}
for call in response.tool_calls:
+ call_id = str(call.get("id", ""))
+ call_name = str(call.get("name", ""))
+ call_args = call.get("args", {})
+ logger.info(
+ "deep_agent: worker=%s AI->Tool tool_call_id=%s tool=%s args=%s",
+ worker,
+ call_id,
+ call_name,
+ json.dumps(call_args, ensure_ascii=True)[:800],
+ )
+
tool_fn = tool_map.get(call["name"])
if tool_fn is None:
tool_output = f"Unknown tool: {call['name']}"
else:
tool_output = await tool_fn.ainvoke(call.get("args", {}))
+
+ tool_output_text = str(tool_output)
+ logger.info(
+ "deep_agent: worker=%s Tool->AI tool_call_id=%s tool=%s output=%s",
+ worker,
+ call_id,
+ call_name,
+ tool_output_text[:1200],
+ )
+
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
+ logger.info(
+ "deep_agent: worker=%s appended ToolMessage tool_call_id=%s",
+ worker,
+ call_id,
+ )
structured_llm = llm.with_structured_output(WorkerSummary)
messages.append(SystemMessage(content="You have finished using tools. Summarize findings in max 3 sentences."))
@@ -384,11 +689,18 @@ def _worker_node(worker: WorkerName):
return {"worker_results": []}
instruction = str(task_payload.get("instruction") or state.get("user_message") or "")
+ logger.info("deep_agent: worker=%s start instruction=%s", worker, instruction[:240])
worker_context = {
"memory": state.get("memory_context", {}),
"context": state.get("context", {}),
}
response, tool_results = await _run_tool_loop(worker, instruction, worker_context)
+ logger.info(
+ "deep_agent: worker=%s complete tool_calls=%d entity_counts=%s",
+ worker,
+ len(tool_results),
+ {k: len(v) for k, v in _extract_entity_ids(tool_results).items()},
+ )
return {
"worker_results": [
@@ -397,6 +709,7 @@ def _worker_node(worker: WorkerName):
"instruction": instruction,
"response": response,
"entity_ids": _extract_entity_ids(tool_results),
+ "facts": _extract_facts(tool_results),
}
]
}
@@ -414,6 +727,7 @@ def _build_synthesis_prompt(state: GraphState, floating: bool) -> str:
"instruction": result.get("instruction"),
"response": result.get("response"),
"entity_ids": result.get("entity_ids", {}),
+ "facts": result.get("facts", {}),
}
)
@@ -480,14 +794,25 @@ async def _orchestrator_node_home(state: GraphState) -> GraphState:
if state.get("plan"):
return {}
- context = {**state.get("context", {}), **state.get("memory_context", {})}
- plan = await _plan_with_llm(str(state.get("user_message", "")), context, floating=False)
+ user_message = str(state.get("user_message", ""))
+ base_context = dict(state.get("context", {}))
+ context = {**base_context, **state.get("memory_context", {})}
+
+ if _needs_full_project_snapshot(user_message, floating=False):
+ resolved_project_id = await _resolve_project_id_from_message(user_message)
+ if resolved_project_id:
+ base_context["resolved_project_id"] = resolved_project_id
+ logger.info("deep_agent: resolved_project_id=%s for message=%s", resolved_project_id, user_message[:200])
+ plan = _build_full_project_snapshot_plan(user_message)
+ else:
+ plan = await _plan_with_llm(user_message, context, floating=False)
new_memory = await _apply_memory_updates(str(state.get("user_id", "")), plan.memory_updates, state.get("memory_context", {}))
return {
"plan": [task.model_dump() for task in plan.tasks],
- "memory_context": new_memory
+ "memory_context": new_memory,
+ "context": base_context,
}
@@ -551,6 +876,9 @@ FLOATING_GRAPH = _build_graph(floating=True)
async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
+ if HOME_SINGLE_AGENT_TEST_MODE:
+ return await _run_home_single_agent(user_id, message, context)
+
state = await HOME_GRAPH.ainvoke(
{
"user_id": user_id,
@@ -586,6 +914,11 @@ async def run_home_stream(
message: str,
context: dict[str, Any],
) -> AsyncGenerator[tuple[str, Any], None]:
+ if HOME_SINGLE_AGENT_TEST_MODE:
+ async for event in _run_home_single_agent_stream(user_id, message, context):
+ yield event
+ return
+
state_input = {
"user_id": user_id,
"user_message": message,