"""Single-agent runners for home and floating chat contexts.""" from __future__ import annotations import json import logging import re from collections.abc import AsyncGenerator from typing import Any, Literal from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.tools import tool from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.llm import get_llm from app.core.memory_middleware import MemoryMiddleware from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.db import async_session logger = logging.getLogger(__name__) FloatingDomain = Literal["tasks", "projects", "notes", "timelines"] _HOME_SINGLE_AGENT_SYSTEM = ( "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "Always use tools for factual data retrieval before answering. " "When the user asks to remember, forget, or update what you know about them, use memory tools. " "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " "Return markdown and embed inline tags when relevant: [ids], [ids], " "[ids], [ids], {json}." ) _FLOATING_SINGLE_AGENT_SYSTEM = ( "You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "Stay focused on the floating scope in context.scope and answer concisely. " "Always use tools for factual data retrieval before answering. " "When the user asks to remember, forget, or update what you know about them, use memory tools. " "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " "Return markdown and embed inline tags when relevant: [ids], [ids], " "[ids], [ids], {json}." ) def _as_text(content: Any) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): text = item.get("text") if isinstance(text, str): parts.append(text) return "".join(parts) return str(content) def _candidate_tokens(message: str) -> list[str]: tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower()) return [token for token in tokens if len(token) >= 3] async def _resolve_project_id_from_message(message: str) -> str | None: """Resolve likely project UUID from user message using client project list.""" try: result = await execute_on_client(action="select", table="projects") except Exception as exc: logger.warning("deep_agent: project resolve select failed: %s", exc) return None rows = result.get("rows", []) if not isinstance(rows, list) or not rows: return None tokens = _candidate_tokens(message) scored: list[tuple[int, dict[str, Any]]] = [] for row in rows: if not isinstance(row, dict): continue name = str(row.get("name", "")).lower() score = sum(1 for token in tokens if token in name) if score > 0: scored.append((score, row)) if not scored: return None scored.sort(key=lambda item: item[0], reverse=True) top_score = scored[0][0] top_rows = [row for score, row in scored if score == top_score] if len(top_rows) != 1: return None project_id = top_rows[0].get("id") return project_id if isinstance(project_id, str) else None def _needs_project_resolution(message: str) -> bool: lowered = message.lower() return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"]) async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]: prepared = dict(context) if _needs_project_resolution(message): resolved_project_id = await _resolve_project_id_from_message(message) if resolved_project_id: prepared["resolved_project_id"] = resolved_project_id logger.info("deep_agent: resolved_project_id=%s", resolved_project_id) return prepared def _all_tools() -> list[Any]: return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS] def _trace_id_from_context(context: dict[str, Any]) -> str | None: debug = context.get("_debug") if isinstance(debug, dict): request_id = debug.get("request_id") if isinstance(request_id, str) and request_id: return request_id return None def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: sanitized = dict(context) sanitized.pop("_debug", None) return sanitized def _normalize_memory_label(path_or_label: str) -> str: value = path_or_label.strip() if value.startswith("/memories/"): value = value[len("/memories/"):] value = value.strip("/") return value def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: @tool async def memory_list_blocks() -> str: """List all core memory blocks currently stored for the user.""" logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id) async with async_session() as db: memory = MemoryMiddleware(db) blocks = await memory.list_core_blocks(user_id) if not blocks: return "No memory blocks found." lines = [f"- {b['label']}: {b['value']}" for b in blocks] return "Memory blocks:\n" + "\n".join(lines) @tool async def memory_get(path_or_label: str) -> str: """Get one memory block by label or /memories/