"""Execution Plan generator — builder, template registry, and LRU plan cache.""" from __future__ import annotations from collections import OrderedDict from typing import Any from app.schemas import ExecutionPlan, PlanStep # ── Prompt Template Registry ────────────────────────────────────────── class PromptTemplateRegistry: """Server-side store mapping template IDs to prompt text. Clients only ever receive template IDs (e.g. ``"tpl_task_agent_default"``). The actual prompt text is resolved here on the server, keeping prompt IP out of API responses. """ def __init__(self) -> None: self._templates: dict[str, str] = {} def register(self, template_id: str, prompt_text: str) -> None: self._templates[template_id] = prompt_text def get(self, template_id: str) -> str: """Resolve a template ID to its prompt text. Raises ``KeyError`` if the template is not registered. """ text = self._templates.get(template_id) if text is None: raise KeyError(f"Template not found: {template_id!r}") return text def has(self, template_id: str) -> bool: return template_id in self._templates def list_ids(self) -> list[str]: """Return all registered template IDs (never the text).""" return list(self._templates.keys()) # ── Execution Plan Builder ──────────────────────────────────────────── class ExecutionPlanBuilder: """Fluent builder for ``ExecutionPlan`` objects. Example:: plan = ( ExecutionPlanBuilder("task_agent") .add_llm_step("tpl_task_agent_default", {"message": user_msg}) .add_data_step("create_record", data_from_step=0) .build() ) """ def __init__(self, agent: str) -> None: self._agent = agent self._steps: list[PlanStep] = [] # ── step adders ────────────────────────────────────────────────── def add_step( self, action: str, params: dict[str, Any] | None = None ) -> ExecutionPlanBuilder: """Append a generic action step with optional parameters.""" self._steps.append(PlanStep(action=action, variables=params)) return self def add_llm_step( self, template_id: str, variables: dict[str, Any] | None = None ) -> ExecutionPlanBuilder: """Append an LLM step referencing a server-side template by ID.""" self._steps.append( PlanStep(action="llm", prompt_template=template_id, variables=variables) ) return self def add_data_step(self, action: str, data_from_step: int) -> ExecutionPlanBuilder: """Append a step whose input comes from the output of an earlier step.""" self._steps.append(PlanStep(action=action, data_from_step=data_from_step)) return self # ── build ──────────────────────────────────────────────────────── def build(self) -> ExecutionPlan: """Validate step references and return the ``ExecutionPlan``. Raises ``ValueError`` if any ``data_from_step`` references a non-existent or future step index. """ for i, step in enumerate(self._steps): if step.data_from_step is not None: if not (0 <= step.data_from_step < i): raise ValueError( f"Step {i}: data_from_step={step.data_from_step} must " f"reference a preceding step index in range 0..{i - 1}" ) return ExecutionPlan(agent=self._agent, steps=list(self._steps)) # ── Plan Cache (LRU) ────────────────────────────────────────────────── class PlanCache: """In-memory LRU cache for ``ExecutionPlan`` objects. Plans stored here are accessible as playbooks via ``get_all_playbooks()``. The cache also serves as a runtime memoisation layer so that repeated identical intent classifications can skip re-building the plan. """ def __init__(self, maxsize: int = 1000) -> None: self._maxsize = maxsize self._cache: OrderedDict[str, ExecutionPlan] = OrderedDict() def cache_plan(self, key: str, plan: ExecutionPlan) -> None: """Store *plan* under *key*, evicting the LRU entry if at capacity.""" if key in self._cache: del self._cache[key] # remove so re-insertion places it at the end elif len(self._cache) >= self._maxsize: self._cache.popitem(last=False) # evict least-recently-used self._cache[key] = plan def get_plan(self, key: str) -> ExecutionPlan | None: """Return the cached plan for *key*, or ``None`` if not present. Accessing a plan marks it as most-recently used. """ if key not in self._cache: return None self._cache.move_to_end(key) return self._cache[key] def get_all_playbooks(self) -> list[ExecutionPlan]: """Return all cached plans (most-recently used last).""" return list(self._cache.values()) # ── Module-level singletons ─────────────────────────────────────────── template_registry = PromptTemplateRegistry() plan_cache = PlanCache() def _register_builtin_templates() -> None: """Register the built-in server-side prompt templates. These strings never leave the server. Clients only receive the IDs. """ _tpls: dict[str, str] = { "tpl_task_agent_default": ( "You are a task management assistant. Help the user create, update, " "and prioritize tasks based on their message and context." ), "tpl_calendar_agent_default": ( "You are a calendar assistant. Help manage events, detect scheduling " "conflicts, and suggest improvements based on the provided context." ), "tpl_email_agent_default": ( "You are an email analysis assistant. Classify emails, extract action " "items, and draft responses using only the metadata provided." ), "tpl_analytics_agent_default": ( "You are a workspace analytics assistant. Calculate metrics, generate " "reports, and surface trends from the data provided in context." ), "tpl_email_extract_action_items": ( "Extract all action items from the provided email metadata. " "Return a structured list of tasks, each with a title, inferred " "priority, and suggested due date where possible." ), "tpl_analytics_weekly_summary": ( "Generate a weekly performance summary from the provided analytics " "data. Include task completion rate, overdue item count, top " "priorities for the coming week, and notable trends." ), } for tid, text in _tpls.items(): template_registry.register(tid, text) def _load_playbooks() -> None: """Pre-build and cache the built-in playbooks.""" playbooks: list[tuple[str, ExecutionPlan]] = [ ( "create_task_from_email", ExecutionPlanBuilder("email_agent") .add_llm_step( "tpl_email_extract_action_items", {"source": "email_metadata"}, ) .add_data_step("create_record", data_from_step=0) .build(), ), ( "generate_weekly_report", ExecutionPlanBuilder("analytics_agent") .add_llm_step( "tpl_analytics_weekly_summary", {"period": "last_7_days"}, ) .add_data_step("create_record", data_from_step=0) .build(), ), ] for key, plan in playbooks: plan_cache.cache_plan(key, plan) # Initialise on module load _register_builtin_templates() _load_playbooks()