"""Single-agent runners for home and floating chat contexts.""" from __future__ import annotations import json import logging import re from datetime import date from collections.abc import AsyncGenerator from typing import Any, Literal from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.tools import tool from app.agents.note_agent import NOTE_TOOLS from app.agents.project_agent import PROJECT_TOOLS from app.agents.task_agent import TASK_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context from app.core.llm import get_agent_llm, model_for_agent from app.core.memory_middleware import MemoryMiddleware from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.db import async_session logger = logging.getLogger(__name__) FloatingDomainType = Literal["task", "timeline", "project", "node"] FloatingDomainSection = Literal["task", "timeline", "note"] # Mapping of core-memory language values to natural-language names for prompts. _LANGUAGE_NAMES: dict[str, str] = { "en": "English", "it": "Italian", "es": "Spanish", "fr": "French", "de": "German", "english": "English", "italian": "Italian", "italiano": "Italian", "spanish": "Spanish", "español": "Spanish", "french": "French", "français": "French", "german": "German", "deutsch": "German", } def _language_instruction(context: dict[str, Any]) -> str: """Return a system-prompt suffix that tells the LLM to respond in the user's language. Returns an empty string when the language is English or unknown — saves tokens. """ core = context.get("core_memory") or {} raw = (core.get("language") or "").strip().lower() if not raw: return "" lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation if lang.lower() == "english": return "" return ( f"\n\nIMPORTANT: Always respond in {lang}. " f"All your output text must be written in {lang}." ) def _proactive_hints_injection(context: dict[str, Any]) -> str: """Return a system-prompt paragraph listing proactive behavioral hints. Returns empty string when no hints or confidence below threshold. Capped at 600 chars. """ hints: list[str] = context.get("proactive_hints") or [] if not hints: return "" body = "\n".join(f"- {h}" for h in hints) section = f"\n\nI noticed (behavioral patterns):\n{body}" if len(section) > 600: section = section[:597] + "..." return section def _relational_memory_injection(context: dict[str, Any]) -> str: """Return a system-prompt paragraph listing known people/projects from relational memory. Returns empty string when no relational rows or tier is Free. Capped at 800 chars to control token spend. """ relations: list[str] = context.get("relational_memory") or [] if not relations: return "" body = "\n".join(f"- {r}" for r in relations) section = f"\n\nKnown people & projects:\n{body}" if len(section) > 800: section = section[:797] + "..." return section _HOME_SYSTEM_PROMPT = ( "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "Always use tools for factual data retrieval before answering. " "When the user asks to remember, forget, or update what you know about them, use memory tools. " "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " "Return markdown and use tags when relevant: [ids], [ids], " "[ids], [ids], {json}. " "When listing tasks or timelines, each id tag must be on its own line with no prefix/suffix text. " "Never put titles, priorities, or dates on the same line as or tags. " "For questions about upcoming timelines (e.g. 'prossimi eventi'), include only future items in the current month unless the user asks a different range. " "For upcoming tasks, after tag lines add a short recommendation based on due date and priority." ) _FLOATING_SYSTEM_PROMPT = ( "You are the floating assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "Stay focused on the floating scope in context.scope and answer concisely. " "Return plain text only. Do not output XML/HTML-like tags such as , , , , or any bracketed id tag wrappers. " "Always use tools for factual data retrieval before answering. " "When the user asks to remember, forget, or update what you know about them, use memory tools. " "If context.context.resolved_project_id exists, use it as project_id for scoped list calls. " ) _FLOATING_DOMAIN_CLASSIFIER_PROMPT = ( "You are a strict domain classifier for websocket floating requests. " "Return ONLY a JSON object with keys: type, id, section. " "Allowed type values: task, timeline, project, node. " "Allowed section values: task, timeline, note, or null. " "Rules: infer from user message intent first; do not blindly trust scope.type. " "If user asks tasks/timeline/notes for a project, set type=project and section accordingly. " "If project id is unknown but context.resolved_project_id exists, use it as id. " "If id is unknown, use null. " "No markdown, no prose, JSON only." ) def _as_text(content: Any) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): text = item.get("text") if isinstance(text, str): parts.append(text) return "".join(parts) return str(content) def _candidate_tokens(message: str) -> list[str]: tokens = re.findall(r"[a-zA-Z0-9_-]+", message.lower()) return [token for token in tokens if len(token) >= 3] async def _resolve_project_id_from_message(message: str) -> str | None: """Resolve likely project UUID from user message using client project list.""" try: result = await execute_on_client(action="select", table="projects") except Exception as exc: logger.warning("deep_agent: project resolve select failed: %s", exc) return None rows = result.get("rows", []) if not isinstance(rows, list) or not rows: return None tokens = _candidate_tokens(message) scored: list[tuple[int, dict[str, Any]]] = [] for row in rows: if not isinstance(row, dict): continue name = str(row.get("name", "")).lower() score = sum(1 for token in tokens if token in name) if score > 0: scored.append((score, row)) if not scored: return None scored.sort(key=lambda item: item[0], reverse=True) top_score = scored[0][0] top_rows = [row for score, row in scored if score == top_score] if len(top_rows) != 1: return None project_id = top_rows[0].get("id") return project_id if isinstance(project_id, str) else None def _needs_project_resolution(message: str) -> bool: lowered = message.lower() return any(keyword in lowered for keyword in ["project", "progetto", "progetti", "whitelist"]) async def _prepare_context(message: str, context: dict[str, Any]) -> dict[str, Any]: prepared = dict(context) if _needs_project_resolution(message): resolved_project_id = await _resolve_project_id_from_message(message) if resolved_project_id: prepared["resolved_project_id"] = resolved_project_id logger.info("deep_agent: resolved_project_id=%s", resolved_project_id) return prepared def _all_tools() -> list[Any]: return [*TASK_TOOLS, *PROJECT_TOOLS, *NOTE_TOOLS, *TIMELINE_TOOLS] def _trace_id_from_context(context: dict[str, Any]) -> str | None: debug = context.get("_debug") if isinstance(debug, dict): request_id = debug.get("request_id") if isinstance(request_id, str) and request_id: return request_id return None def _session_id_from_context(context: dict[str, Any]) -> str | None: debug = context.get("_debug") if isinstance(debug, dict): session_id = debug.get("session_id") if isinstance(session_id, str) and session_id: return session_id return None def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: sanitized = dict(context) sanitized.pop("_debug", None) return sanitized _TAG_LINE_RE = re.compile(r"<(task|timeline)>\[[^\]]+\]") _TIMELINE_DMY_RE = re.compile(r"(?P\d{2})/(?P\d{2})/(?P\d{4})") def _is_upcoming_timeline_query(message: str) -> bool: lowered = message.lower() has_upcoming = "prossim" in lowered or "upcoming" in lowered or "next" in lowered has_timeline_topic = any( token in lowered for token in ("event", "evento", "eventi", "timeline", "milestone", "scaden") ) return has_upcoming and has_timeline_topic def _timeline_date_in_current_month_or_future(dmy: str) -> bool: match = _TIMELINE_DMY_RE.search(dmy) if not match: return True try: parsed = date( int(match.group("y")), int(match.group("m")), int(match.group("d")), ) except ValueError: return True today = date.today() return parsed >= today and parsed.year == today.year and parsed.month == today.month def _normalize_tagged_list_lines(text: str, message: str) -> str: if not text: return text upcoming_timeline_only = _is_upcoming_timeline_query(message) output_lines: list[str] = [] for line in text.splitlines(): matches = list(_TAG_LINE_RE.finditer(line)) if not matches: output_lines.append(line) continue had_non_tag_text = _TAG_LINE_RE.sub("", line).strip(" -\t0123456789.*:)") if not had_non_tag_text and len(matches) == 1: tag_text = matches[0].group(0) if ( upcoming_timeline_only and "" in tag_text and not _timeline_date_in_current_month_or_future(line) ): continue output_lines.append(tag_text) continue for match in matches: tag_text = match.group(0) if ( upcoming_timeline_only and "" in tag_text and not _timeline_date_in_current_month_or_future(line) ): continue output_lines.append(tag_text) return "\n".join(output_lines) _GENERIC_TAG_RE = re.compile(r"", re.IGNORECASE) _BRACKETED_ID_RE = re.compile(r"\[(?:[0-9a-fA-F-]{8,}|[A-Za-z0-9_-]{8,})\]") _FLOATING_EMPTY_FALLBACK = "No results found." def _strip_floating_markup_fragment(text: str) -> str: if not text: return text cleaned = _GENERIC_TAG_RE.sub("", text) return _BRACKETED_ID_RE.sub("", cleaned) def _strip_floating_markup(text: str) -> str: """Ensure floating responses stay plain text with no XML-like tag wrappers.""" if not text: return text cleaned = _strip_floating_markup_fragment(text) # Collapse excessive spaces introduced by tag/id removal while preserving lines. lines = [re.sub(r"[ \t]{2,}", " ", line).strip() for line in cleaned.splitlines()] return "\n".join(line for line in lines if line) def _fallback_from_raw_floating_text(raw_text: str) -> str: fallback = _strip_floating_markup_fragment(raw_text or "") fallback = re.sub(r"[ \t]{2,}", " ", fallback).strip() return fallback or _FLOATING_EMPTY_FALLBACK class _FloatingStreamSanitizer: """Streaming sanitizer that removes floating markup without buffering the full answer.""" def __init__(self) -> None: self._pending = "" @staticmethod def _split_safe_boundary(text: str) -> tuple[str, str]: boundary = len(text) last_lt = text.rfind("<") if last_lt != -1 and ">" not in text[last_lt:]: boundary = min(boundary, last_lt) last_lb = text.rfind("[") if last_lb != -1 and "]" not in text[last_lb:]: boundary = min(boundary, last_lb) if boundary == len(text): return text, "" return text[:boundary], text[boundary:] def feed(self, chunk: str) -> str: combined = f"{self._pending}{chunk}" safe_text, self._pending = self._split_safe_boundary(combined) return _strip_floating_markup_fragment(safe_text) def finalize(self) -> str: # Drop dangling unfinished wrappers at the very end. tail = re.sub(r"<[^>\n]*$", "", self._pending) tail = re.sub(r"\[[^\]\n]*$", "", tail) self._pending = "" return _strip_floating_markup_fragment(tail) def _normalize_memory_label(path_or_label: str) -> str: value = path_or_label.strip() if value.startswith("/memories/"): value = value[len("/memories/"):] value = value.strip("/") return value def _memory_tools(user_id: str, trace_id: str | None) -> list[Any]: @tool async def memory_list_blocks() -> str: """List all core memory blocks currently stored for the user.""" logger.info("deep_agent: memory_list_blocks trace=%s user=%s", trace_id or "-", user_id) async with async_session() as db: memory = MemoryMiddleware(db) blocks = await memory.list_core_blocks(user_id) if not blocks: return "No memory blocks found." lines = [f"- {b['label']}: {b['value']}" for b in blocks] return "Memory blocks:\n" + "\n".join(lines) @tool async def memory_get(path_or_label: str) -> str: """Get one memory block by label or /memories/