Files
api/app/core/execution_plan.py

223 lines
8.6 KiB
Python

"""Execution Plan generator — builder, template registry, and LRU plan cache."""
from __future__ import annotations
from collections import OrderedDict
from typing import Any
from app.schemas import ExecutionPlan, PlanStep
# ── Prompt Template Registry ──────────────────────────────────────────
class PromptTemplateRegistry:
"""Server-side store mapping template IDs to prompt text.
Clients only ever receive template IDs (e.g. ``"tpl_task_agent_default"``).
The actual prompt text is resolved here on the server, keeping prompt IP
out of API responses.
"""
def __init__(self) -> None:
self._templates: dict[str, str] = {}
def register(self, template_id: str, prompt_text: str) -> None:
self._templates[template_id] = prompt_text
def get(self, template_id: str) -> str:
"""Resolve a template ID to its prompt text.
Raises ``KeyError`` if the template is not registered.
"""
text = self._templates.get(template_id)
if text is None:
raise KeyError(f"Template not found: {template_id!r}")
return text
def has(self, template_id: str) -> bool:
return template_id in self._templates
def list_ids(self) -> list[str]:
"""Return all registered template IDs (never the text)."""
return list(self._templates.keys())
# ── Execution Plan Builder ────────────────────────────────────────────
class ExecutionPlanBuilder:
"""Fluent builder for ``ExecutionPlan`` objects.
Example::
plan = (
ExecutionPlanBuilder("task_agent")
.add_llm_step("tpl_task_agent_default", {"message": user_msg})
.add_data_step("create_record", data_from_step=0)
.build()
)
"""
def __init__(self, agent: str) -> None:
self._agent = agent
self._steps: list[PlanStep] = []
# ── step adders ──────────────────────────────────────────────────
def add_step(
self, action: str, params: dict[str, Any] | None = None
) -> ExecutionPlanBuilder:
"""Append a generic action step with optional parameters."""
self._steps.append(PlanStep(action=action, variables=params))
return self
def add_llm_step(
self, template_id: str, variables: dict[str, Any] | None = None
) -> ExecutionPlanBuilder:
"""Append an LLM step referencing a server-side template by ID."""
self._steps.append(
PlanStep(action="llm", prompt_template=template_id, variables=variables)
)
return self
def add_data_step(self, action: str, data_from_step: int) -> ExecutionPlanBuilder:
"""Append a step whose input comes from the output of an earlier step."""
self._steps.append(PlanStep(action=action, data_from_step=data_from_step))
return self
# ── build ────────────────────────────────────────────────────────
def build(self) -> ExecutionPlan:
"""Validate step references and return the ``ExecutionPlan``.
Raises ``ValueError`` if any ``data_from_step`` references a
non-existent or future step index.
"""
for i, step in enumerate(self._steps):
if step.data_from_step is not None:
if not (0 <= step.data_from_step < i):
raise ValueError(
f"Step {i}: data_from_step={step.data_from_step} must "
f"reference a preceding step index in range 0..{i - 1}"
)
return ExecutionPlan(agent=self._agent, steps=list(self._steps))
# ── Plan Cache (LRU) ──────────────────────────────────────────────────
class PlanCache:
"""In-memory LRU cache for ``ExecutionPlan`` objects.
Plans stored here are accessible as playbooks via ``get_all_playbooks()``.
The cache also serves as a runtime memoisation layer so that repeated
identical intent classifications can skip re-building the plan.
"""
def __init__(self, maxsize: int = 1000) -> None:
self._maxsize = maxsize
self._cache: OrderedDict[str, ExecutionPlan] = OrderedDict()
def cache_plan(self, key: str, plan: ExecutionPlan) -> None:
"""Store *plan* under *key*, evicting the LRU entry if at capacity."""
if key in self._cache:
del self._cache[key] # remove so re-insertion places it at the end
elif len(self._cache) >= self._maxsize:
self._cache.popitem(last=False) # evict least-recently-used
self._cache[key] = plan
def get_plan(self, key: str) -> ExecutionPlan | None:
"""Return the cached plan for *key*, or ``None`` if not present.
Accessing a plan marks it as most-recently used.
"""
if key not in self._cache:
return None
self._cache.move_to_end(key)
return self._cache[key]
def get_all_playbooks(self) -> list[ExecutionPlan]:
"""Return all cached plans (most-recently used last)."""
return list(self._cache.values())
# ── Module-level singletons ───────────────────────────────────────────
template_registry = PromptTemplateRegistry()
plan_cache = PlanCache()
def _register_builtin_templates() -> None:
"""Register the built-in server-side prompt templates.
These strings never leave the server. Clients only receive the IDs.
"""
_tpls: dict[str, str] = {
"tpl_task_agent_default": (
"You are a task management assistant. Help the user create, update, "
"list, and track tasks. Use correct status values (todo, in_progress, "
"done) and priority values (high, medium, low) from the workspace model."
),
"tpl_timeline_agent_default": (
"You are a project timeline assistant. Help the user create and manage "
"milestone timelines on their projects. Every timeline requires a "
"project_id and a date expressed as a Unix timestamp in milliseconds."
),
"tpl_project_agent_default": (
"You are a project management assistant. Help the user create, find, "
"update, and archive projects. Projects have a name, an optional client, "
"and a status of either active or archived."
),
"tpl_note_agent_default": (
"You are a note-taking assistant. Help the user create, retrieve, update, "
"and delete Markdown notes. Notes can optionally be linked to a project."
),
"tpl_task_extract_from_project": (
"Extract all actionable tasks from the provided project context. "
"Return a structured list of tasks, each with a title, inferred priority "
"(high, medium, or low), suggested status (todo), and a due_date in "
"milliseconds where a deadline can be inferred."
),
"tpl_note_weekly_summary": (
"Generate a weekly project summary note from the provided workspace data. "
"Include: tasks completed this week, tasks due soon, active projects, "
"and upcoming timelines. Format the output as clean Markdown."
),
}
for tid, text in _tpls.items():
template_registry.register(tid, text)
def _load_playbooks() -> None:
"""Pre-build and cache the built-in playbooks."""
playbooks: list[tuple[str, ExecutionPlan]] = [
(
"create_tasks_from_project",
ExecutionPlanBuilder("project_agent")
.add_llm_step(
"tpl_task_extract_from_project",
{"source": "project_context"},
)
.add_data_step("create_record", data_from_step=0)
.build(),
),
(
"generate_weekly_note",
ExecutionPlanBuilder("note_agent")
.add_llm_step(
"tpl_note_weekly_summary",
{"period": "last_7_days"},
)
.add_data_step("create_record", data_from_step=0)
.build(),
),
]
for key, plan in playbooks:
plan_cache.cache_plan(key, plan)
# Initialise on module load
_register_builtin_templates()
_load_playbooks()