- Updated `TestModuleSingletons` in `test_execution_plan.py` to reflect new agent templates and playbook names. - Changed assertions in playbook tests to match updated templates and agents. - Introduced `test_storage.py` to cover the storage layer, including encryption, BlobStore, and VectorStore functionalities. - Added tests for S3 interactions, ensuring upload, download, delete, and list operations work as expected. - Implemented mock tests for Pinecone and Qdrant vector stores to validate upsert, search, and delete operations.
287 lines
11 KiB
Python
287 lines
11 KiB
Python
"""Tests for execution_plan: PromptTemplateRegistry, ExecutionPlanBuilder, PlanCache."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from app.core.execution_plan import (
|
|
ExecutionPlanBuilder,
|
|
PlanCache,
|
|
PromptTemplateRegistry,
|
|
plan_cache,
|
|
template_registry,
|
|
)
|
|
from app.schemas import ExecutionPlan
|
|
|
|
|
|
# ── PromptTemplateRegistry ────────────────────────────────────────────
|
|
|
|
|
|
class TestPromptTemplateRegistry:
|
|
def test_register_and_get(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
reg.register("tpl_foo", "You are a foo agent.")
|
|
assert reg.get("tpl_foo") == "You are a foo agent."
|
|
|
|
def test_get_unknown_raises_key_error(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
with pytest.raises(KeyError, match="tpl_missing"):
|
|
reg.get("tpl_missing")
|
|
|
|
def test_has_returns_true_for_registered(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
reg.register("tpl_x", "prompt text")
|
|
assert reg.has("tpl_x") is True
|
|
|
|
def test_has_returns_false_for_unregistered(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
assert reg.has("tpl_missing") is False
|
|
|
|
def test_list_ids_returns_all_registered_ids(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
reg.register("tpl_a", "a")
|
|
reg.register("tpl_b", "b")
|
|
assert set(reg.list_ids()) == {"tpl_a", "tpl_b"}
|
|
|
|
def test_list_ids_does_not_return_prompt_text(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
reg.register("tpl_secret", "top secret prompt")
|
|
ids = reg.list_ids()
|
|
assert "top secret prompt" not in ids
|
|
|
|
def test_overwrite_existing_template(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
reg.register("tpl_x", "v1")
|
|
reg.register("tpl_x", "v2")
|
|
assert reg.get("tpl_x") == "v2"
|
|
|
|
def test_empty_registry_has_no_ids(self) -> None:
|
|
reg = PromptTemplateRegistry()
|
|
assert reg.list_ids() == []
|
|
|
|
|
|
# ── ExecutionPlanBuilder ──────────────────────────────────────────────
|
|
|
|
|
|
class TestExecutionPlanBuilder:
|
|
def test_builds_empty_plan(self) -> None:
|
|
plan = ExecutionPlanBuilder("task_agent").build()
|
|
assert plan.agent == "task_agent"
|
|
assert plan.steps == []
|
|
|
|
def test_add_step_basic(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("task_agent")
|
|
.add_step("create_task", {"priority": "high"})
|
|
.build()
|
|
)
|
|
assert len(plan.steps) == 1
|
|
assert plan.steps[0].action == "create_task"
|
|
assert plan.steps[0].variables == {"priority": "high"}
|
|
assert plan.steps[0].prompt_template is None
|
|
assert plan.steps[0].data_from_step is None
|
|
|
|
def test_add_step_no_params(self) -> None:
|
|
plan = ExecutionPlanBuilder("task_agent").add_step("fetch").build()
|
|
assert plan.steps[0].variables is None
|
|
|
|
def test_add_llm_step(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("task_agent")
|
|
.add_llm_step("tpl_task_default", {"message": "hi"})
|
|
.build()
|
|
)
|
|
assert plan.steps[0].action == "llm"
|
|
assert plan.steps[0].prompt_template == "tpl_task_default"
|
|
assert plan.steps[0].variables == {"message": "hi"}
|
|
|
|
def test_add_llm_step_no_variables(self) -> None:
|
|
plan = ExecutionPlanBuilder("task_agent").add_llm_step("tpl_x").build()
|
|
assert plan.steps[0].variables is None
|
|
|
|
def test_add_data_step(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("task_agent")
|
|
.add_step("fetch_data")
|
|
.add_data_step("transform", data_from_step=0)
|
|
.build()
|
|
)
|
|
assert plan.steps[1].action == "transform"
|
|
assert plan.steps[1].data_from_step == 0
|
|
|
|
def test_fluent_chaining_returns_builder(self) -> None:
|
|
builder = ExecutionPlanBuilder("analytics_agent")
|
|
result = builder.add_step("a")
|
|
assert result is builder
|
|
|
|
def test_fluent_chain_multiple_steps(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("analytics_agent")
|
|
.add_llm_step("tpl_analytics_default")
|
|
.add_step("format_output")
|
|
.add_data_step("store", data_from_step=0)
|
|
.build()
|
|
)
|
|
assert len(plan.steps) == 3
|
|
|
|
def test_build_validates_data_from_step_out_of_range(self) -> None:
|
|
with pytest.raises(ValueError, match="data_from_step"):
|
|
ExecutionPlanBuilder("task_agent").add_data_step("bad", data_from_step=5).build()
|
|
|
|
def test_build_validates_data_from_step_self_reference(self) -> None:
|
|
"""data_from_step=0 on the first step (index 0) is invalid."""
|
|
with pytest.raises(ValueError, match="data_from_step"):
|
|
ExecutionPlanBuilder("task_agent").add_data_step("bad", data_from_step=0).build()
|
|
|
|
def test_build_validates_data_from_step_negative(self) -> None:
|
|
with pytest.raises(ValueError, match="data_from_step"):
|
|
ExecutionPlanBuilder("task_agent").add_data_step("bad", data_from_step=-1).build()
|
|
|
|
def test_valid_data_from_step_at_index_two(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("task_agent")
|
|
.add_step("step0")
|
|
.add_step("step1")
|
|
.add_data_step("step2", data_from_step=1)
|
|
.build()
|
|
)
|
|
assert plan.steps[2].data_from_step == 1
|
|
|
|
def test_data_from_step_zero_valid_at_index_one(self) -> None:
|
|
plan = (
|
|
ExecutionPlanBuilder("task_agent")
|
|
.add_step("step0")
|
|
.add_data_step("step1", data_from_step=0)
|
|
.build()
|
|
)
|
|
assert plan.steps[1].data_from_step == 0
|
|
|
|
def test_build_returns_new_plan_each_call(self) -> None:
|
|
builder = ExecutionPlanBuilder("task_agent").add_step("do_thing")
|
|
plan1 = builder.build()
|
|
plan2 = builder.build()
|
|
assert plan1 is not plan2
|
|
assert plan1.steps == plan2.steps
|
|
|
|
def test_plan_is_execution_plan_instance(self) -> None:
|
|
plan = ExecutionPlanBuilder("task_agent").build()
|
|
assert isinstance(plan, ExecutionPlan)
|
|
|
|
|
|
# ── PlanCache ─────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestPlanCache:
|
|
def _plan(self, agent: str = "a") -> ExecutionPlan:
|
|
return ExecutionPlanBuilder(agent).build()
|
|
|
|
def test_cache_and_get(self) -> None:
|
|
cache = PlanCache()
|
|
plan = self._plan()
|
|
cache.cache_plan("key1", plan)
|
|
assert cache.get_plan("key1") is plan
|
|
|
|
def test_get_missing_returns_none(self) -> None:
|
|
cache = PlanCache()
|
|
assert cache.get_plan("nonexistent") is None
|
|
|
|
def test_get_all_playbooks_empty(self) -> None:
|
|
cache = PlanCache()
|
|
assert cache.get_all_playbooks() == []
|
|
|
|
def test_get_all_playbooks_returns_all_stored(self) -> None:
|
|
cache = PlanCache()
|
|
p1, p2 = self._plan("a"), self._plan("b")
|
|
cache.cache_plan("k1", p1)
|
|
cache.cache_plan("k2", p2)
|
|
playbooks = cache.get_all_playbooks()
|
|
assert len(playbooks) == 2
|
|
assert p1 in playbooks
|
|
assert p2 in playbooks
|
|
|
|
def test_lru_evicts_oldest_entry(self) -> None:
|
|
cache = PlanCache(maxsize=2)
|
|
p1, p2, p3 = self._plan("a"), self._plan("b"), self._plan("c")
|
|
cache.cache_plan("k1", p1)
|
|
cache.cache_plan("k2", p2)
|
|
cache.cache_plan("k3", p3) # k1 should be evicted
|
|
assert cache.get_plan("k1") is None
|
|
assert cache.get_plan("k2") is p2
|
|
assert cache.get_plan("k3") is p3
|
|
|
|
def test_lru_access_updates_recency(self) -> None:
|
|
cache = PlanCache(maxsize=2)
|
|
p1, p2, p3 = self._plan("a"), self._plan("b"), self._plan("c")
|
|
cache.cache_plan("k1", p1)
|
|
cache.cache_plan("k2", p2)
|
|
cache.get_plan("k1") # k1 is now most-recently used
|
|
cache.cache_plan("k3", p3) # k2 should be evicted (LRU)
|
|
assert cache.get_plan("k1") is p1
|
|
assert cache.get_plan("k2") is None
|
|
assert cache.get_plan("k3") is p3
|
|
|
|
def test_overwrite_existing_key(self) -> None:
|
|
cache = PlanCache()
|
|
p1, p2 = self._plan("a"), self._plan("b")
|
|
cache.cache_plan("same_key", p1)
|
|
cache.cache_plan("same_key", p2)
|
|
assert cache.get_plan("same_key") is p2
|
|
assert len(cache.get_all_playbooks()) == 1
|
|
|
|
def test_overwrite_does_not_consume_capacity(self) -> None:
|
|
cache = PlanCache(maxsize=2)
|
|
p1, p2 = self._plan("a"), self._plan("b")
|
|
cache.cache_plan("k1", p1)
|
|
cache.cache_plan("k1", p2) # overwrite, not a new slot
|
|
cache.cache_plan("k2", p1) # should fit without eviction
|
|
assert cache.get_plan("k1") is p2
|
|
assert cache.get_plan("k2") is p1
|
|
|
|
|
|
# ── Module-level singletons ───────────────────────────────────────────
|
|
|
|
|
|
class TestModuleSingletons:
|
|
def test_template_registry_has_all_agent_defaults(self) -> None:
|
|
for agent in ("task_agent", "checkpoint_agent", "project_agent", "note_agent"):
|
|
assert template_registry.has(f"tpl_{agent}_default"), (
|
|
f"Missing template: tpl_{agent}_default"
|
|
)
|
|
|
|
def test_template_registry_has_operation_templates(self) -> None:
|
|
assert template_registry.has("tpl_task_extract_from_project")
|
|
assert template_registry.has("tpl_note_weekly_summary")
|
|
|
|
def test_template_registry_get_returns_non_empty_string(self) -> None:
|
|
text = template_registry.get("tpl_task_agent_default")
|
|
assert isinstance(text, str)
|
|
assert len(text) > 0
|
|
|
|
def test_plan_cache_has_prebuilt_playbooks(self) -> None:
|
|
assert len(plan_cache.get_all_playbooks()) >= 2
|
|
|
|
def test_playbook_create_tasks_from_project(self) -> None:
|
|
plan = plan_cache.get_plan("create_tasks_from_project")
|
|
assert plan is not None
|
|
assert plan.agent == "project_agent"
|
|
assert len(plan.steps) == 2
|
|
assert plan.steps[0].prompt_template == "tpl_task_extract_from_project"
|
|
assert plan.steps[1].data_from_step == 0
|
|
|
|
def test_playbook_generate_weekly_note(self) -> None:
|
|
plan = plan_cache.get_plan("generate_weekly_note")
|
|
assert plan is not None
|
|
assert plan.agent == "note_agent"
|
|
assert len(plan.steps) == 2
|
|
assert plan.steps[0].prompt_template == "tpl_note_weekly_summary"
|
|
assert plan.steps[1].data_from_step == 0
|
|
|
|
def test_playbook_steps_have_no_raw_prompt_text(self) -> None:
|
|
"""Plans must not embed prompt text — only template IDs."""
|
|
for plan in plan_cache.get_all_playbooks():
|
|
for step in plan.steps:
|
|
if step.prompt_template is not None:
|
|
assert step.prompt_template.startswith("tpl_"), (
|
|
f"prompt_template looks like raw text: {step.prompt_template!r}"
|
|
)
|