diff --git a/services/chat/Dockerfile b/services/chat/Dockerfile new file mode 100644 index 0000000..66ffea1 --- /dev/null +++ b/services/chat/Dockerfile @@ -0,0 +1,36 @@ +# ── builder ────────────────────────────────────────────────────────────────── +FROM python:3.12-slim AS builder + +WORKDIR /build + +COPY services/chat/requirements.txt ./requirements.txt +RUN pip install --upgrade pip && \ + pip install --no-cache-dir --prefix=/install -r requirements.txt + +# ── runtime ────────────────────────────────────────────────────────────────── +FROM python:3.12-slim AS runtime + +RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser + +WORKDIR /app + +COPY --from=builder /install /usr/local + +# Shared module +COPY shared/ shared/ + +# Service source +COPY services/chat/app/ app/ + +RUN chown -R appuser:appgroup /app + +USER appuser + +EXPOSE 8000 + +# Chat service is CPU-bound (LLM calls) — use multiple workers +CMD ["gunicorn", "app.main:app", \ + "-k", "uvicorn.workers.UvicornWorker", \ + "--bind", "0.0.0.0:8000", \ + "--workers", "2", \ + "--timeout", "120"] diff --git a/services/chat/app/agents/__init__.py b/services/chat/app/agents/__init__.py new file mode 100644 index 0000000..29d6694 --- /dev/null +++ b/services/chat/app/agents/__init__.py @@ -0,0 +1 @@ +"""Chat Service domain agents.""" diff --git a/services/chat/app/agents/note_agent.py b/services/chat/app/agents/note_agent.py new file mode 100644 index 0000000..e296303 --- /dev/null +++ b/services/chat/app/agents/note_agent.py @@ -0,0 +1,142 @@ +"""Note agent — Markdown note management (list, get, create, update, delete). + +Adapted for Chat Service: import from app.ws_context and app.llm. +""" + +from __future__ import annotations + +import re +from typing import Any + +from langchain_core.tools import tool + +from app.llm import embed +from app.ws_context import execute_on_client + +_UUID_RE = re.compile( + r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" +) + + +def _is_uuid(value: str) -> bool: + return bool(_UUID_RE.match(value)) + +NOTE_SYSTEM_PROMPT = ( + "You are a note-taking assistant. You help users create, retrieve, update,\n" + "and delete Markdown notes in their workspace.\n\n" + "Rules:\n" + " - content is always Markdown; preserve formatting when updating\n" + " - project_id is optional; link a note to a project when mentioned\n" + " - When updating, call get_note first if you need to read existing content\n" + " before appending or replacing sections\n" + " - list_notes without project_id returns all notes; scope with project_id\n" + " when the user is working within a specific project\n" + " - project_id must be a UUID; if you only know a project name, do not pass it as project_id\n" + " - Do not fabricate note content — reflect what the user provides or what\n" + " is already in the note (retrieved via get_note)." +) + + +@tool +async def list_notes(project_id: str = "") -> str: + """List notes, optionally scoped to a project by project_id.""" + normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" + result = await execute_on_client( + action="select", + table="notes", + filters={"projectId": normalized_project_id or None}, + ) + rows = result.get("rows", []) + if not rows: + return "No notes found." + lines = [f"- {r['title']} (id: {r['id']})" for r in rows] + return f"Found {len(rows)} note(s):\n" + "\n".join(lines) + + +@tool +async def get_note(note_id: str) -> str: + """Fetch a single note by its UUID to read its full Markdown content.""" + result = await execute_on_client(action="get", table="notes", data={"id": note_id}) + row = result.get("row") + if not row: + return f"Note {note_id} not found." + return f"Note '{row['title']}' (id: {row['id']}):\n\n{row['content']}" + + +@tool +async def create_note( + title: str, + content: str, + project_id: str = "", +) -> str: + """Create a new note. + title: note heading (required) + content: Markdown body text (required) + project_id: optional UUID linking this note to a project + """ + result = await execute_on_client( + action="insert", + table="notes", + data={ + "title": title, + "content": content, + "projectId": project_id or None, + }, + ) + row = result["row"] + # Index the note content in the vector store. + vector = await embed(content) + await execute_on_client( + action="vector_upsert", + data={"id": row["id"], "projectId": row.get("projectId"), "content": content}, + vector=vector, + ) + return f"Note created: '{row['title']}' (id: {row['id']})." + + +@tool +async def update_note( + note_id: str, + title: str = "", + content: str = "", +) -> str: + """Update an existing note. Only pass fields that should change. + note_id: UUID of the note (required) + If you need to preserve existing content, call get_note first. + """ + updates: dict[str, Any] = {} + if title: + updates["title"] = title + if content: + updates["content"] = content + result = await execute_on_client( + action="update", + table="notes", + data={"id": note_id, "updates": updates}, + ) + row = result["row"] + # Re-index if content changed. + if content: + vector = await embed(content) + await execute_on_client( + action="vector_upsert", + data={"id": note_id, "projectId": row.get("projectId"), "content": content}, + vector=vector, + ) + return f"Note updated: '{row['title']}' (id: {row['id']})." + + +@tool +async def delete_note(note_id: str) -> str: + """Delete a note permanently by its UUID.""" + await execute_on_client(action="delete", table="notes", data={"id": note_id}) + return f"Note {note_id} deleted." + + +NOTE_TOOLS: list[Any] = [ + list_notes, + get_note, + create_note, + update_note, + delete_note, +] diff --git a/services/chat/app/agents/project_agent.py b/services/chat/app/agents/project_agent.py new file mode 100644 index 0000000..856d650 --- /dev/null +++ b/services/chat/app/agents/project_agent.py @@ -0,0 +1,146 @@ +"""Project agent — full lifecycle management (list, get, create, update, archive, delete). + +Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +""" + +from __future__ import annotations + +from typing import Any + +from langchain_core.tools import tool + +from app.ws_context import execute_on_client + +PROJECT_SYSTEM_PROMPT = ( + "You are a project management assistant. You help users create, find,\n" + "update, and archive projects in their workspace.\n\n" + "Rules:\n" + " - status must be one of: active, archived\n" + " - client_id is optional; link to a client only when explicitly mentioned\n" + " - ai_summary is populated only when the user asks for a project summary;\n" + " derive it from context data — do not fabricate content\n" + " - Use list_projects for scoped queries; list_all_projects only when the\n" + " user wants a complete cross-client view including archived projects\n" + " - get_project requires a project UUID; resolve the ID first by calling\n" + " list_projects if you only have a project name\n" + " - Prefer archiving (update_project status=archived) over deletion;\n" + " only call delete_project when the user explicitly confirms deletion." +) + + +@tool +async def list_projects( + client_id: str = "", + include_archived: int = 0, +) -> str: + """List projects, optionally filtered by client_id. + include_archived: 1 to include archived projects, 0 for active only (default). + """ + result = await execute_on_client( + action="select", + table="projects", + filters={ + "clientId": client_id or None, + "includeArchived": bool(include_archived), + }, + ) + rows = result.get("rows", []) + if not rows: + return "No projects found." + lines = [f"- {r['name']} (status: {r['status']}, id: {r['id']})" for r in rows] + return f"Found {len(rows)} project(s):\n" + "\n".join(lines) + + +@tool +async def list_all_projects() -> str: + """List every project regardless of client or status. + Use only when the user wants a complete cross-client overview. + """ + result = await execute_on_client(action="select", table="projects") + rows = result.get("rows", []) + if not rows: + return "No projects found." + lines = [f"- {r['name']} (status: {r['status']}, id: {r['id']})" for r in rows] + return f"All projects ({len(rows)}):\n" + "\n".join(lines) + + +@tool +async def get_project(project_id: str) -> str: + """Fetch a single project by its UUID.""" + result = await execute_on_client(action="get", table="projects", data={"id": project_id}) + row = result.get("row") + if not row: + return f"Project {project_id} not found." + return ( + f"Project: '{row['name']}' (id: {row['id']}, status: {row['status']}, " + f"clientId: {row.get('clientId', 'none')})" + ) + + +@tool +async def create_project( + name: str, + client_id: str = "", +) -> str: + """Create a new project. + name: human-readable project name (required) + client_id: optional UUID of the owning client + """ + result = await execute_on_client( + action="insert", + table="projects", + data={"name": name, "clientId": client_id or None}, + ) + row = result["row"] + return f"Project created: '{row['name']}' (id: {row['id']})" + + +@tool +async def update_project( + project_id: str, + name: str = "", + client_id: str = "", + status: str = "", + ai_summary: str = "", +) -> str: + """Update a project. Only pass fields that should change. + project_id: UUID of the project (required) + status: active | archived + ai_summary: AI-generated summary text (populate only when explicitly requested) + """ + updates: dict[str, Any] = {} + if name: + updates["name"] = name + if client_id: + updates["clientId"] = client_id + if status: + updates["status"] = status + if ai_summary: + updates["aiSummary"] = ai_summary + result = await execute_on_client( + action="update", + table="projects", + data={"id": project_id, "updates": updates}, + ) + row = result["row"] + return f"Project updated: '{row['name']}' (id: {row['id']}, status: {row['status']})" + + +@tool +async def delete_project(project_id: str) -> str: + """Permanently delete a project and orphan its tasks. + IMPORTANT: prefer update_project(status='archived') unless the user + has explicitly confirmed they want permanent deletion. + """ + await execute_on_client(action="delete", table="projects", data={"id": project_id}) + return f"Project {project_id} permanently deleted." + + +PROJECT_TOOLS: list[Any] = [ + list_projects, + list_all_projects, + get_project, + create_project, + update_project, + delete_project, +] diff --git a/services/chat/app/agents/task_agent.py b/services/chat/app/agents/task_agent.py new file mode 100644 index 0000000..03d4f32 --- /dev/null +++ b/services/chat/app/agents/task_agent.py @@ -0,0 +1,240 @@ +"""Task agent — full CRUD for tasks and task comments. + +Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +import re +from typing import Any + +from langchain_core.tools import tool + +from app.ws_context import execute_on_client + +_UUID_RE = re.compile( + r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" +) + + +def _is_uuid(value: str) -> bool: + return bool(_UUID_RE.match(value)) + +TASK_SYSTEM_PROMPT = ( + "You are a task management assistant for a project workspace.\n" + "You create, update, list, and track tasks and their comments.\n\n" + "Rules:\n" + " - status must be one of: todo, in_progress, done\n" + " - priority must be one of: high, medium, low\n" + " - due_date is a Unix timestamp in milliseconds; convert human dates\n" + " - assignees is a JSON-encoded array of strings (e.g. '[\"Alice\",\"Bob\"]')\n" + " - project_id is optional; link to a project when the user mentions one\n" + " - is_ai_suggested: 1 only when proactively proposing a task the user\n" + " did not explicitly request; 0 otherwise\n" + " - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n" + " - Use list_tasks_due_today for 'what's due today' queries\n" + " - For update_task, use -1 for integer fields you do not want to change\n" + " - Always confirm the action in plain, user-friendly language." +) + + +# ── Task tools ──────────────────────────────────────────────────────── + + +@tool +async def list_tasks( + project_id: str = "", + status: str = "", + search: str = "", + order_by: str = "", +) -> str: + """List tasks, optionally filtered by project_id, status (todo|in_progress|done), + a search string, or an order_by field name (dueDate|priority|createdAt).""" + normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" + result = await execute_on_client( + action="select", + table="tasks", + filters={ + "projectId": normalized_project_id or None, + "status": status or None, + "search": search or None, + "orderBy": order_by or None, + }, + ) + rows = result.get("rows", []) + if not rows: + return "No tasks found matching the given filters." + lines = [ + f"- {r['title']} (status: {r['status']}, priority: {r['priority']}, id: {r['id']})" + for r in rows + ] + return f"Found {len(rows)} task(s):\n" + "\n".join(lines) + + +@tool +async def create_task( + title: str, + description: str = "", + status: str = "todo", + priority: str = "medium", + assignees: str = "[]", + due_date: int = 0, + project_id: str = "", + is_ai_suggested: int = 0, +) -> str: + """Create a new task. + title: task title (required) + description: optional details + status: todo | in_progress | done (default: todo) + priority: high | medium | low (default: medium) + assignees: JSON-encoded array of assignee names, e.g. '["Alice"]' + due_date: Unix timestamp in milliseconds; 0 means no due date + project_id: optional UUID of the parent project + is_ai_suggested: 1 if proactively suggested, 0 if user-requested + """ + result = await execute_on_client( + action="insert", + table="tasks", + data={ + "title": title, + "description": description or None, + "status": status, + "priority": priority, + "assignee": assignees, + "dueDate": due_date or None, + "projectId": project_id or None, + "isAiSuggested": is_ai_suggested, + }, + ) + row = result["row"] + return ( + f"Task created: '{row['title']}' " + f"(id: {row['id']}, status: {row['status']}, priority: {row['priority']})" + ) + + +@tool +async def update_task( + task_id: str, + title: str = "", + description: str = "", + status: str = "", + priority: str = "", + assignees: str = "", + due_date: int = -1, + project_id: str = "", +) -> str: + """Update fields on an existing task. Only pass fields you want to change. + task_id: the task's UUID (required) + due_date: -1 means unchanged; 0 clears the due date; any positive value sets it + """ + updates: dict[str, Any] = {} + if title: + updates["title"] = title + if description: + updates["description"] = description + if status: + updates["status"] = status + if priority: + updates["priority"] = priority + if assignees: + updates["assignee"] = assignees + if due_date != -1: + updates["dueDate"] = due_date or None + if project_id: + updates["projectId"] = project_id + result = await execute_on_client( + action="update", + table="tasks", + data={"id": task_id, "updates": updates}, + ) + row = result["row"] + return f"Task updated: '{row['title']}' (id: {row['id']}, status: {row['status']})" + + +@tool +async def delete_task(task_id: str) -> str: + """Delete a task permanently by its UUID.""" + await execute_on_client(action="delete", table="tasks", data={"id": task_id}) + return f"Task {task_id} deleted." + + +@tool +async def list_tasks_due_today() -> str: + """List all tasks whose due date falls on today's date.""" + now = datetime.now(tz=timezone.utc) + start_ms = int(datetime(now.year, now.month, now.day, tzinfo=timezone.utc).timestamp() * 1000) + end_ms = start_ms + 86_400_000 - 1 # last ms of today + result = await execute_on_client( + action="select", + table="tasks", + filters={"dueDateFrom": start_ms, "dueDateTo": end_ms}, + ) + rows = result.get("rows", []) + if not rows: + return "No tasks are due today." + lines = [ + f"- {r['title']} (priority: {r['priority']}, status: {r['status']}, id: {r['id']})" + for r in rows + ] + return f"Tasks due today ({len(rows)}):\n" + "\n".join(lines) + + +# ── Task comment tools ──────────────────────────────────────────────── + + +@tool +async def list_task_comments(task_id: str) -> str: + """List all comments on a task by its UUID.""" + result = await execute_on_client( + action="select", + table="taskComments", + filters={"taskId": task_id}, + ) + rows = result.get("rows", []) + if not rows: + return f"No comments found for task {task_id}." + lines = [f"- [{r['author']}]: {r['content']} (id: {r['id']})" for r in rows] + return f"Found {len(rows)} comment(s):\n" + "\n".join(lines) + + +@tool +async def add_task_comment(task_id: str, author: str, content: str) -> str: + """Add a comment to a task. + task_id: UUID of the task to comment on + author: name or ID of the comment author + content: comment text + """ + result = await execute_on_client( + action="insert", + table="taskComments", + data={"taskId": task_id, "author": author, "content": content}, + ) + row = result.get("row", {}) + row_author = row.get("author", author) + row_task_id = row.get("taskId") or row.get("task_id") or task_id + row_comment_id = row.get("id", "unknown") + return f"Comment added by {row_author} on task {row_task_id} (comment id: {row_comment_id})." + + +@tool +async def delete_task_comment(comment_id: str) -> str: + """Delete a task comment by its UUID.""" + await execute_on_client(action="delete", table="taskComments", data={"id": comment_id}) + return f"Comment {comment_id} deleted." + + +# ── Agent ───────────────────────────────────────────────────────────── + + +TASK_TOOLS: list[Any] = [ + list_tasks, + create_task, + update_task, + delete_task, + list_tasks_due_today, + list_task_comments, + add_task_comment, + delete_task_comment, +] diff --git a/services/chat/app/agents/timeline_agent.py b/services/chat/app/agents/timeline_agent.py new file mode 100644 index 0000000..669c558 --- /dev/null +++ b/services/chat/app/agents/timeline_agent.py @@ -0,0 +1,117 @@ +"""Timeline agent — project milestone management (list, create, update, delete). + +Adapted for Chat Service: import from app.ws_context instead of app.core.ws_context. +""" + +from __future__ import annotations + +import re +from typing import Any + +from langchain_core.tools import tool + +from app.ws_context import execute_on_client + +_UUID_RE = re.compile( + r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" +) + + +def _is_uuid(value: str) -> bool: + return bool(_UUID_RE.match(value)) + +TIMELINE_SYSTEM_PROMPT = ( + "You are a project timeline assistant. Timelines are milestone dates that\n" + "track progress on a project — they are not calendar events.\n\n" + "Rules:\n" + " - project_id is REQUIRED for every create; confirm with the user if unknown\n" + " - For listing, project_id must be a UUID; never pass plain names as project_id\n" + " - date is a Unix timestamp in milliseconds; convert human-readable dates\n" + " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" + " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" + " - For update_timeline, use -1 for integer fields you do not want to change\n" + " - Listing without a project_id returns all timelines across projects\n" + " - Always echo the title and formatted date in your confirmation." +) + + +@tool +async def list_timelines(project_id: str = "") -> str: + """List timelines. Provide project_id to scope to a specific project.""" + normalized_project_id = project_id if (project_id and _is_uuid(project_id)) else "" + result = await execute_on_client( + action="select", + table="timelines", + filters={"projectId": normalized_project_id or None}, + ) + rows = result.get("rows", []) + if not rows: + return "No timelines found." + lines = [f"- {r['title']} (date: {r['date']}, id: {r['id']})" for r in rows] + return f"Found {len(rows)} timeline(s):\n" + "\n".join(lines) + + +@tool +async def create_timeline( + project_id: str, + title: str, + date: int, + is_ai_suggested: int = 0, +) -> str: + """Create a project timeline (milestone). + project_id: REQUIRED UUID of the parent project + title: descriptive name for the milestone + date: Unix timestamp in milliseconds + is_ai_suggested: 1 if proactively suggested, 0 if user-requested + """ + result = await execute_on_client( + action="insert", + table="timelines", + data={ + "projectId": project_id, + "title": title, + "date": date, + "isAiSuggested": is_ai_suggested, + }, + ) + row = result["row"] + return f"Timeline created: '{row['title']}' (id: {row['id']}, date: {row['date']})" + + +@tool +async def update_timeline( + timeline_id: str, + title: str = "", + date: int = -1, +) -> str: + """Update a timeline. Only pass fields that should change. + timeline_id: UUID of the timeline (required) + date: -1 means unchanged; any other value sets the new date (ms timestamp) + """ + updates: dict[str, Any] = {} + if title: + updates["title"] = title + if date != -1: + updates["date"] = date + result = await execute_on_client( + action="update", + table="timelines", + data={"id": timeline_id, "updates": updates}, + ) + row = result["row"] + return f"Timeline updated: '{row['title']}' (id: {row['id']})" + + +@tool +async def delete_timeline(timeline_id: str) -> str: + """Delete a timeline permanently by its UUID.""" + await execute_on_client(action="delete", table="timelines", data={"id": timeline_id}) + return f"Timeline {timeline_id} deleted." + + +TIMELINE_TOOLS: list[Any] = [ + list_timelines, + create_timeline, + update_timeline, + delete_timeline, +] diff --git a/services/chat/app/deep_agent.py b/services/chat/app/deep_agent.py new file mode 100644 index 0000000..6f7254d --- /dev/null +++ b/services/chat/app/deep_agent.py @@ -0,0 +1,847 @@ +"""Single-agent runners for home and floating chat contexts. + +Adapted from app/core/deep_agent.py for the Chat Service. +Import paths changed to use local app modules and shared/. +""" + +from __future__ import annotations + +import json +import logging +import re +from datetime import date +from collections.abc import AsyncGenerator +from typing import Any, Literal + +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage +from langchain_core.tools import tool + +from app.agents.note_agent import NOTE_TOOLS +from app.agents.project_agent import PROJECT_TOOLS +from app.agents.task_agent import TASK_TOOLS +from app.agents.timeline_agent import TIMELINE_TOOLS +from app.llm import get_llm +from app.memory_middleware import MemoryMiddleware +from app.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector +from shared.db import async_session + +logger = logging.getLogger(__name__) + +FloatingDomainType = Literal["task", "timeline", "project", "node"] +FloatingDomainSection = Literal["task", "timeline", "note"] + +_HOME_SINGLE_AGENT_SYSTEM = ( + "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " + "Always use tools for factual data retrieval before answering. " + "When the user asks to remember, forget, or update what you know about them, use memory tools. " + "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " + "Return markdown and use tags when relevant: [ids], [ids], " + "[ids], [ids], {json}. " + "When listing tasks or timelines, each id tag must be on its own line with no prefix/suffix text. " + "Never put titles, priorities, or dates on the same line as or tags. " + "For questions about upcoming timelines (e.g. 'prossimi eventi'), include only future items in the current month unless the user asks a different range. " + "For upcoming tasks, after tag lines add a short recommendation based on due date and priority." +) + +_FLOATING_SINGLE_AGENT_SYSTEM = ( + "You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " + "Stay focused on the floating scope in context.scope and answer concisely. " + "Return plain text only. Do not output XML/HTML-like tags such as , , , , or any bracketed id tag wrappers. " + "Always use tools for factual data retrieval before answering. " + "When the user asks to remember, forget, or update what you know about them, use memory tools. " + "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " +) + +_FLOATING_DOMAIN_CLASSIFIER_SYSTEM = ( + "You are a strict domain classifier for websocket floating requests. " + "Return ONLY a JSON object with keys: type, id, section. " + "Allowed type values: task, timeline, project, node. " + "Allowed section values: task, timeline, note, or null. " + "Rules: infer from user message intent first; do not blindly trust scope.type. " + "If user asks tasks/timeline/notes for a project, set type=project and section accordingly. " + "If project id is unknown but context.resolved_project_id exists, use it as id. " + "If id is unknown, use null. " + "No markdown, no prose, JSON only." +) + + +def _as_text(content: Any) -> str: + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + text = item.get("text") + if isinstance(text, str): + parts.append(text) + return "".join(parts) + return str(content) + + +def _candidate_tokens(message: str) -> list[str]: + tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower()) + return [token for token in tokens if len(token) >= 3] + + +async def _resolve_project_id_from_message(message: str) -> str | None: + """Resolve likely project UUID from user message using client project list.""" + try: + result = await execute_on_client(action="select", table="projects") + except Exception as exc: + logger.warning("deep_agent: project resolve select failed: %s", exc) + return None + + rows = result.get("rows", []) + if not isinstance(rows, list) or not rows: + return None + + tokens = _candidate_tokens(message) + scored: list[tuple[int, dict[str, Any]]] = [] + for row in rows: + if not isinstance(row, dict): + continue + name = str(row.get("name", "")).lower() + score = sum(1 for token in tokens if token in name) + if score > 0: + scored.append((score, row)) + + if not scored: + return None + + scored.sort(key=lambda item: item[0], reverse=True) + top_score = scored[0][0] + top_rows = [row for score, row in scored if score == top_score] + if len(top_rows) != 1: + return None + + project_id = top_rows[0].get("id") + return project_id if isinstance(project_id, str) else None + + +def _needs_project_resolution(message: str) -> bool: + lowered = message.lower() + return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"]) + + +async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]: + prepared = dict(context) + if _needs_project_resolution(message): + resolved_project_id = await _resolve_project_id_from_message(message) + if resolved_project_id: + prepared["resolved_project_id"] = resolved_project_id + logger.info("deep_agent: resolved_project_id=%s", resolved_project_id) + return prepared + + +def _all_tools() -> list[Any]: + return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS] + + +def _trace_id_from_context(context: dict[str, Any]) -> str | None: + debug = context.get("_debug") + if isinstance(debug, dict): + request_id = debug.get("request_id") + if isinstance(request_id, str) and request_id: + return request_id + return None + + +def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: + sanitized = dict(context) + sanitized.pop("_debug", None) + return sanitized + + +_TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]") +_TIMELINE_DMY_RE = re.compile(r"(?P\d{2})/(?P\d{2})/(?P\d{4})") + + +def _is_upcoming_timeline_query(message: str) -> bool: + lowered = message.lower() + has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered + has_timeline_topic = any( + token in lowered + for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden") + ) + return has_upcoming and has_timeline_topic + + +def _timeline_date_in_current_month_or_future(dmy: str) -> bool: + match = _TIMELINE_DMY_RE.search(dmy) + if not match: + return True + try: + parsed = date( + int(match.group("y")), + int(match.group("m")), + int(match.group("d")), + ) + except ValueError: + return True + + today = date.today() + return parsed >= today and parsed.year == today.year and parsed.month == today.month + + +def _normalize_tagged_list_lines(text: str, message: str) -> str: + if not text: + return text + + upcoming_timeline_only = _is_upcoming_timeline_query(message) + output_lines: list[str] = [] + + for line in text.splitlines(): + matches = list(_TAG_LINE_RE.finditer(line)) + if not matches: + output_lines.append(line) + continue + + had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)") + if not had_non_tag_text and len(matches) == 1: + tag_text = matches[0].group(0) + if ( + upcoming_timeline_only + and "" in tag_text + and not _timeline_date_in_current_month_or_future(line) + ): + continue + output_lines.append(tag_text) + continue + + for match in matches: + tag_text = match.group(0) + if ( + upcoming_timeline_only + and "" in tag_text + and not _timeline_date_in_current_month_or_future(line) + ): + continue + output_lines.append(tag_text) + + return "\n".join(output_lines) + + +_GENERIC_TAG_RE = re.compile(r"", re.IGNORECASE) +_BRACKETED_ID_RE = re.compile(r"\[(?:[0-9a-fA-F-]{8,}|[A-Za-z0-9_-]{8,})\]") +_FLOATING_EMPTY_FALLBACK = "No results found." + + +def _strip_floating_markup_fragment(text: str) -> str: + if not text: + return text + cleaned = _GENERIC_TAG_RE.sub("", text) + return _BRACKETED_ID_RE.sub("", cleaned) + + +def _strip_floating_markup(text: str) -> str: + """Ensure floating responses stay plain text with no XML-like tag wrappers.""" + if not text: + return text + + cleaned = _strip_floating_markup_fragment(text) + lines = [re.sub(r"[ \t]{2,}", " ", line).strip() for line in cleaned.splitlines()] + return "\n".join(line for line in lines if line) + + +def _fallback_from_raw_floating_text(raw_text: str) -> str: + fallback = _strip_floating_markup_fragment(raw_text or "") + fallback = re.sub(r"[ \t]{2,}", " ", fallback).strip() + return fallback or _FLOATING_EMPTY_FALLBACK + + +class _FloatingStreamSanitizer: + """Streaming sanitizer that removes floating markup without buffering the full answer.""" + + def __init__(self) -> None: + self._pending = "" + + @staticmethod + def _split_safe_boundary(text: str) -> tuple[str, str]: + boundary = len(text) + + last_lt = text.rfind("<") + if last_lt != -1 and ">" not in text[last_lt:]: + boundary = min(boundary, last_lt) + + last_lb = text.rfind("[") + if last_lb != -1 and "]" not in text[last_lb:]: + boundary = min(boundary, last_lb) + + if boundary == len(text): + return text, "" + return text[:boundary], text[boundary:] + + def feed(self, chunk: str) -> str: + combined = f"{self._pending}{chunk}" + safe_text, self._pending = self._split_safe_boundary(combined) + return _strip_floating_markup_fragment(safe_text) + + def finalize(self) -> str: + tail = re.sub(r"<[^>\n]*$", "", self._pending) + tail = re.sub(r"\[[^\]\n]*$", "", tail) + self._pending = "" + return _strip_floating_markup_fragment(tail) + + +def _normalize_memory_label(path_or_label: str) -> str: + value = path_or_label.strip() + if value.startswith("/memories/"): + value = value[len("/memories/"):] + value = value.strip("/") + return value + + +def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: + @tool + async def memory_list_blocks() -> str: + """List all core memory blocks currently stored for the user.""" + logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id) + async with async_session() as db: + memory = MemoryMiddleware(db) + blocks = await memory.list_core_blocks(user_id) + if not blocks: + return "No memory blocks found." + lines = [f"- {b['label']}: {b['value']}" for b in blocks] + return "Memory blocks:\n" + "\n".join(lines) + + @tool + async def memory_get(path_or_label: str) -> str: + """Get one memory block by label or /memories/