feat(local-agent-v2): step 4 — journey produces structured AgentConfig JSON

Replace freeform prompt_template output with validated AgentConfig JSON:
- agent_setup.py: new system prompt (journey_system_v2), AGENT_CONFIG_START/END
  markers, _extract_agent_config() with Pydantic validation, updated handlers
  returning agent_config key; import AgentConfig from schemas
- tests/test_journey_v2.py: 6 unit tests + 5 parametrized LLM eval cases
  following test_agent_runner_v2.py pattern; _run_journey uses
  set_client_executor/clear_client_executor mirroring device_ws
- tests/fixtures/journey_v2/: cases.yaml + email_action.html + email_info.html
- tests/conftest.py: add --journey-dir CLI option; remove S3/plugin fixtures
  (cleanup from microservices migration, already present in working tree)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Roberto Musso
2026-04-08 00:23:58 +02:00
parent c6c4578f9a
commit d8add7e8cb
6 changed files with 607 additions and 190 deletions

View File

@@ -6,26 +6,21 @@ a per-test session, and a FastAPI ``TestClient`` wired to use it.
from __future__ import annotations
import json
import os
import time
import uuid
from collections.abc import AsyncGenerator, Generator
from unittest.mock import patch
import boto3
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from jose import jwt
from moto import mock_aws
from sqlalchemy import StaticPool, event
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.config.settings import settings
from app.db import Base, get_session
from app.main import app
from app.models import Plugin, Subscription, User
from app.models import Subscription, User
# ── Fixed test user IDs (one per tier) ───────────────────────────────
@@ -109,79 +104,6 @@ def client(db_session: AsyncSession) -> Generator[TestClient, None, None]: # n
app.dependency_overrides.pop(get_session, None)
# ── Seed data helpers ────────────────────────────────────────────────
_SEED_PLUGINS = [
Plugin(
id="plugin-github-sync",
name="GitHub Sync",
description="Sync tasks with GitHub Issues and pull requests.",
version="1.0.0",
author_name="Adiuva",
category="productivity",
price_cents=0,
permissions=json.dumps(["read:tasks", "write:tasks"]),
status="approved",
s3_package_key="plugins/plugin-github-sync/1.0.0/package.zip",
install_count=0,
avg_rating=0.0,
),
Plugin(
id="plugin-slack-notify",
name="Slack Notifier",
description="Post task and timeline updates to Slack channels.",
version="1.2.0",
author_name="Adiuva",
category="communication",
price_cents=499,
permissions=json.dumps(["read:tasks", "read:timelines"]),
status="approved",
s3_package_key="plugins/plugin-slack-notify/1.2.0/package.zip",
install_count=0,
avg_rating=0.0,
),
Plugin(
id="plugin-time-tracker",
name="Time Tracker",
description="Track time spent on tasks with automatic reporting.",
version="0.9.1",
author_name="Third Party",
category="productivity",
price_cents=999,
permissions=json.dumps(["read:tasks", "write:tasks"]),
status="approved",
s3_package_key="plugins/plugin-time-tracker/0.9.1/package.zip",
install_count=0,
avg_rating=0.0,
),
]
@pytest_asyncio.fixture
async def seed_plugins(db_session: AsyncSession) -> list[Plugin]:
"""Insert the 3 default approved plugins and return them."""
plugins = []
for template in _SEED_PLUGINS:
p = Plugin(
id=template.id,
name=template.name,
description=template.description,
version=template.version,
author_name=template.author_name,
category=template.category,
price_cents=template.price_cents,
permissions=template.permissions,
status=template.status,
s3_package_key=template.s3_package_key,
install_count=template.install_count,
avg_rating=template.avg_rating,
)
db_session.add(p)
plugins.append(p)
await db_session.commit()
return plugins
# ── JWT helpers ──────────────────────────────────────────────────────
@@ -212,29 +134,6 @@ def auth_header(tier: str = "power", user_id: str | None = None) -> dict[str, st
return {"Authorization": f"Bearer {make_jwt(tier, user_id)}"}
# ── S3 mock fixture ──────────────────────────────────────────────────
S3_TEST_BUCKET = "test-bucket"
S3_TEST_REGION = "us-east-1"
@pytest.fixture
def s3_bucket():
"""Create a mocked S3 bucket via moto and patch BlobStore settings."""
with mock_aws():
os.environ.setdefault("AWS_ACCESS_KEY_ID", "testing")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testing")
os.environ.setdefault("AWS_DEFAULT_REGION", S3_TEST_REGION)
client = boto3.client("s3", region_name=S3_TEST_REGION)
client.create_bucket(Bucket=S3_TEST_BUCKET)
with patch("app.storage.blob_store.settings") as mock_settings:
mock_settings.S3_BUCKET = S3_TEST_BUCKET
mock_settings.S3_REGION = S3_TEST_REGION
mock_settings.AWS_ACCESS_KEY_ID = "testing"
mock_settings.AWS_SECRET_ACCESS_KEY = "testing"
yield S3_TEST_BUCKET
# ── CLI options ───────────────────────────────────────────────────────
def pytest_addoption(parser):
@@ -243,3 +142,13 @@ def pytest_addoption(parser):
default=None,
help="Override fixture folder for preprocessor tests (must contain cases.yaml + data/)",
)
parser.addoption(
"--runner-dir",
default=None,
help="Override fixture folder for agent_runner_v2 eval tests (must contain cases.yaml + data/)",
)
parser.addoption(
"--journey-dir",
default=None,
help="Override fixture folder for journey_v2 eval tests (must contain cases.yaml + data/)",
)

87
tests/fixtures/journey_v2/cases.yaml vendored Normal file
View File

@@ -0,0 +1,87 @@
# Journey V2 eval test cases — Step 4
#
# Each case simulates a complete journey session:
# 1. handle_journey_start is called with directory + data_types
# 2. handle_journey_message is called for each entry in user_messages
# 3. Assertions are evaluated on the final reply
#
# directory_files: list of {path, content_file} — content_file is relative to data/
#
# Assertion keys:
# expect_question: true → first reply must contain "?"
# expect_done: true → final reply must have done=True
# expect_valid_config: true → agent_config must be parseable as AgentConfig with content_types > 0
# expect_content_type_id: <str> → AgentConfig.content_types must contain an entry with this id
# expect_extraction_contains: <str> → first content_type extraction_prompt must contain this word
# expect_global_rules: true → AgentConfig.global_rules must be non-empty
- id: "4.1"
description: "Journey start explores directory, first reply contains a question"
directory: "/test/emails"
data_types: ["tasks", "notes", "timelines"]
directory_files:
- path: "/test/emails/outlook_export_2024.html"
content_file: "email_action.html"
user_messages: []
score_name: "journey.start"
expect_question: true
- id: "4.2"
description: "Full 3-turn conversation produces a valid AgentConfig JSON"
directory: "/test/emails"
data_types: ["tasks", "notes", "timelines"]
directory_files:
- path: "/test/emails/email_backup.html"
content_file: "email_action.html"
user_messages:
- "These are email exports from Outlook in HTML format"
- "Create tasks for emails with direct action requests, notes for informational emails"
- "Yes, that looks correct. No other rules."
score_name: "journey.valid_json"
expect_done: true
expect_valid_config: true
- id: "4.3"
description: "Journey detects email_html content type from directory exploration"
directory: "/test/emails"
data_types: ["tasks", "notes"]
directory_files:
- path: "/test/emails/message.html"
content_file: "email_action.html"
user_messages:
- "HTML email backups from my mail client, exported from Outlook"
- "Create tasks from emails that contain assignments or direct action items"
- "Correct, no other rules needed"
score_name: "journey.detect_email"
expect_done: true
expect_content_type_id: "email_html"
- id: "4.4"
description: "Custom user rule (only notes, no tasks) reflected in extraction_prompt"
directory: "/test/emails"
data_types: ["notes"]
directory_files:
- path: "/test/emails/email.html"
content_file: "email_info.html"
user_messages:
- "HTML emails from my work inbox"
- "Create only notes from all emails — I do not want tasks or timelines to be created"
- "Yes, exactly"
score_name: "journey.custom_rules"
expect_done: true
expect_extraction_contains: "note"
- id: "4.5"
description: "Global rule (no project = no entity) appears in AgentConfig.global_rules"
directory: "/test/emails"
data_types: ["tasks", "notes"]
directory_files:
- path: "/test/emails/email.html"
content_file: "email_action.html"
user_messages:
- "Email backups from Outlook"
- "Create tasks from action request emails, notes from informational emails"
- "If the email cannot be matched to any project, do not create any entity at all"
score_name: "journey.global_rules"
expect_done: true
expect_global_rules: true

View File

@@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Email: Fix the login bug</title>
<style>body { font-family: Arial; } .header { color: #666; }</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> boss@company.com</p>
<p><strong>To:</strong> dev@company.com</p>
<p><strong>Subject:</strong> Fix the login bug</p>
<p><strong>Date:</strong> Mon, 7 Apr 2026 09:15:00 +0000</p>
</div>
<div class="body">
<p>Hi,</p>
<p>Please fix the login bug in Project Alpha as soon as possible.
Users are reporting that they can't log in with their Google accounts.
This is blocking the whole team. Please resolve it by Friday.</p>
<p>Thanks,<br>Boss</p>
</div>
</body>
</html>

View File

@@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Email: New policy update</title>
<style>body { font-family: Arial; }</style>
</head>
<body>
<div class="header">
<p><strong>From:</strong> hr@company.com</p>
<p><strong>To:</strong> all@company.com</p>
<p><strong>Subject:</strong> FYI: New remote work policy effective May 1</p>
<p><strong>Date:</strong> Tue, 8 Apr 2026 10:00:00 +0000</p>
</div>
<div class="body">
<p>Hi everyone,</p>
<p>Just a heads-up that starting May 1, 2026 the company will be moving to
a hybrid work model. You will be expected to come into the office at least
two days per week. More details will follow in the employee handbook.</p>
<p>Best,<br>HR Team</p>
</div>
</body>
</html>

349
tests/test_journey_v2.py Normal file
View File

@@ -0,0 +1,349 @@
"""Tests for Local Agent V2 journey setup (Step 4).
Covers the chatbot journey that produces a structured AgentConfig JSON
instead of a freeform prompt_template string.
Unit tests (no LLM)
--------------------
4.6a _extract_agent_config: valid JSON → returns serialised config
4.6b _extract_agent_config: invalid JSON → returns None
4.6c _extract_agent_config: markers absent → returns None
4.6d _extract_agent_config: only START marker → returns None
4.6e Session not found → done=True, agent_config=None
4.6f Nudge uses AGENT_CONFIG_START/END markers (not old PROMPT_TEMPLATE)
Eval tests (real LLM + Langfuse scoring)
-----------------------------------------
Cases are defined in tests/fixtures/journey_v2/cases.yaml.
Email HTML files live in tests/fixtures/journey_v2/data/.
Use --journey-dir to point at a custom folder (same structure required).
Run:
pytest tests/test_journey_v2.py -v
pytest tests/test_journey_v2.py -v -k "4_6" # unit only
pytest tests/test_journey_v2.py -v -k "eval" # LLM evals only
pytest tests/test_journey_v2.py -v --journey-dir /p # custom fixtures
"""
from __future__ import annotations
import uuid
from contextlib import nullcontext
from pathlib import Path
from typing import Any
from unittest.mock import patch
import pytest
import yaml
from app.api.routes.agent_setup import (
_CONFIG_END,
_CONFIG_START,
_MAX_TURNS,
_extract_agent_config,
_sessions,
handle_journey_message,
handle_journey_start,
)
from app.core.langfuse_client import get_langfuse
from app.core.ws_context import clear_client_executor, set_client_executor
from app.schemas import AgentConfig
from tests.conftest import TEST_USER_IDS
# ── Constants ─────────────────────────────────────────────────────────────
_USER_ID = TEST_USER_IDS["power"]
_DEFAULT_FIXTURE_DIR = Path(__file__).parent / "fixtures" / "journey_v2"
# ── Fixture loading ───────────────────────────────────────────────────────
def _fixtures_dir(config) -> Path:
override = config.getoption("--journey-dir")
return Path(override) if override else _DEFAULT_FIXTURE_DIR
def _load_cases(config) -> list[dict]:
return yaml.safe_load(
(_fixtures_dir(config) / "cases.yaml").read_text(encoding="utf-8")
)
def _read_data_file(filename: str, fixtures_dir: Path) -> str:
return (fixtures_dir / "data" / filename).read_text(encoding="utf-8")
# ── pytest_generate_tests ─────────────────────────────────────────────────
def pytest_generate_tests(metafunc):
if "journey_case" not in metafunc.fixturenames:
return
cases = _load_cases(metafunc.config)
metafunc.parametrize("journey_case", cases, ids=[c["id"] for c in cases])
# ── Executor builder ──────────────────────────────────────────────────────
def _make_fs_executor(directory_files: list[dict], fixtures_dir: Path):
"""Return an async callback that simulates filesystem tool responses.
Matches the signature expected by ``set_client_executor`` / ``execute_on_client``:
receives the full ``payload`` dict and returns a result dict.
``directory_files`` is a list of ``{path, content_file}`` dicts;
``content_file`` is relative to ``fixtures_dir/data/``.
"""
file_map: dict[str, str] = {
entry["path"]: _read_data_file(entry["content_file"], fixtures_dir)
for entry in directory_files
}
async def _executor(payload: dict) -> dict:
action = payload.get("action", "")
data = payload.get("data") or {}
if action == "list_directory":
return {"entries": [
{"type": "file", "name": p.split("/")[-1], "path": p}
for p in file_map
]}
if action == "read_file_content":
path = data.get("path", "")
return {"content": file_map.get(path, "")}
if action == "get_file_metadata":
path = data.get("path", "")
name = path.split("/")[-1]
ext = "." + name.rsplit(".", 1)[-1] if "." in name else ""
return {"name": name, "extension": ext, "size": 1024,
"createdAt": None, "modifiedAt": None}
return {}
return _executor
# ── Journey runner helper ─────────────────────────────────────────────────
async def _run_journey(user_id: str, case: dict, executor) -> dict[str, Any]:
"""Drive start + all user_messages for a case. Returns the final reply dict.
Mirrors ``device_ws._handle_journey_start/message``: sets the client
executor (so filesystem tools work) before each handler call.
"""
session_id = str(uuid.uuid4())
try:
set_client_executor(executor)
reply = await handle_journey_start(user_id, {
"agent_type": "local",
"directory": case["directory"],
"data_types": case["data_types"],
"session_id": session_id,
})
for msg in case.get("user_messages", []):
if reply.get("done"):
break
set_client_executor(executor)
reply = await handle_journey_message(user_id, {
"session_id": reply["session_id"],
"message": msg,
})
finally:
clear_client_executor()
_sessions.pop(session_id, None)
return reply
# ── Assertion helper ──────────────────────────────────────────────────────
def _evaluate_case(case: dict, reply: dict) -> tuple[float, str]:
"""Return (score, comment) for a journey case given the final reply dict."""
if case.get("expect_question"):
has_q = "?" in reply.get("message", "")
return (1.0 if has_q else 0.0), f"first_reply_has_question={has_q}"
if case.get("expect_done") and not reply.get("done"):
return 0.0, "expected done=True but journey did not complete"
agent_config_raw = reply.get("agent_config")
if case.get("expect_valid_config"):
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
valid = len(parsed.content_types) > 0
return (1.0 if valid else 0.0), f"content_types={len(parsed.content_types)}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_content_type_id"):
expected_id = case["expect_content_type_id"]
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
ids = [ct.id for ct in parsed.content_types]
found = expected_id in ids
return (1.0 if found else 0.0), f"content_type_ids={ids}, expected={expected_id}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_extraction_contains"):
keyword = case["expect_extraction_contains"].lower()
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
if not parsed.content_types:
return 0.0, "no content_types in config"
prompt = parsed.content_types[0].extraction_prompt.lower()
found = keyword in prompt
return (1.0 if found else 0.0), f"keyword='{keyword}' in extraction_prompt={found}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
if case.get("expect_global_rules"):
if not agent_config_raw:
return 0.0, "agent_config is None"
try:
parsed = AgentConfig.model_validate_json(agent_config_raw)
has_rules = len(parsed.global_rules) > 0
return (1.0 if has_rules else 0.0), f"global_rules={parsed.global_rules}"
except Exception as exc:
return 0.0, f"parse error: {exc}"
return 1.0, "no specific assertion"
# ── Unit tests ────────────────────────────────────────────────────────────
def test_4_6a_extract_valid_json():
"""_extract_agent_config: valid JSON between markers → returns serialised config."""
config = AgentConfig(
content_types=[],
global_rules=["No project = no entity"],
data_types=["tasks"],
)
text = f"Some preamble\n{_CONFIG_START}\n{config.model_dump_json()}\n{_CONFIG_END}\nTrailing"
result = _extract_agent_config(text)
assert result is not None
parsed = AgentConfig.model_validate_json(result)
assert parsed.global_rules == ["No project = no entity"]
def test_4_6b_extract_invalid_json():
"""_extract_agent_config: malformed JSON between markers → returns None."""
text = f"{_CONFIG_START}\n{{not: valid json\n{_CONFIG_END}"
assert _extract_agent_config(text) is None
def test_4_6c_extract_markers_absent():
"""_extract_agent_config: no markers at all → returns None."""
assert _extract_agent_config("No markers here at all") is None
def test_4_6d_extract_only_start_marker():
"""_extract_agent_config: START without END → returns None."""
assert _extract_agent_config(f"text {_CONFIG_START} no end marker") is None
@pytest.mark.asyncio
async def test_4_6e_session_not_found():
"""4.6e Session not found → done=True, agent_config=None, informative message."""
reply = await handle_journey_message(_USER_ID, {
"session_id": "nonexistent-session-id",
"message": "Hello",
})
assert reply["done"] is True
assert reply["agent_config"] is None
assert "not found" in reply["message"].lower() or "expired" in reply["message"].lower()
@pytest.mark.asyncio
async def test_4_6f_nudge_uses_new_markers():
"""4.6f Nudge injected after max turns uses AGENT_CONFIG markers, not PROMPT_TEMPLATE."""
session_id = str(uuid.uuid4())
captured_histories: list[list[dict]] = []
async def _mock_llm(system_prompt, history, tools, **kwargs) -> str:
captured_histories.append(list(history))
# Return plain text — no markers — to trigger the nudge path.
return "I still need more information from you."
from app.api.routes.agent_setup import JourneySession
fake_session = JourneySession(
session_id=session_id,
user_id=_USER_ID,
agent_type="local",
directory="/test",
data_types=["tasks"],
system_prompt="system",
langfuse_prompt=None,
)
# Fill history to the turn limit so the next message triggers the nudge.
for i in range(_MAX_TURNS):
fake_session.history.append({"role": "user", "content": f"msg {i}"})
fake_session.history.append({"role": "assistant", "content": "ok"})
_sessions[session_id] = fake_session
try:
with patch("app.api.routes.agent_setup._call_llm_with_tools", side_effect=_mock_llm):
await handle_journey_message(_USER_ID, {
"session_id": session_id,
"message": "one more message to trigger nudge",
})
finally:
_sessions.pop(session_id, None)
# Second LLM call receives the nudge appended to history.
assert len(captured_histories) >= 2, "Expected ≥ 2 LLM calls (main reply + nudge)"
nudge_history = captured_histories[1]
user_msgs = " ".join(t["content"] for t in nudge_history if t["role"] == "user")
assert _CONFIG_START in user_msgs, f"Nudge must reference {_CONFIG_START}"
assert _CONFIG_END in user_msgs, f"Nudge must reference {_CONFIG_END}"
assert "PROMPT_TEMPLATE" not in user_msgs, "Old PROMPT_TEMPLATE markers must not appear in nudge"
# ── Eval tests (real LLM + Langfuse) ─────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.eval
async def test_eval_journey(journey_case, pytestconfig):
"""Parametrized eval test — one invocation per YAML case."""
case: dict = journey_case
fixtures_dir = _fixtures_dir(pytestconfig)
executor = _make_fs_executor(case.get("directory_files", []), fixtures_dir)
lf = get_langfuse()
obs_ctx = lf.start_as_current_observation(
name=f"eval-journey-{case['id']}-{case.get('score_name', 'unknown').replace('.', '-')}",
metadata={"step": "4", "case_id": case["id"]},
) if lf else nullcontext()
with obs_ctx as obs:
reply = await _run_journey(_USER_ID, case, executor)
score, comment = _evaluate_case(case, reply)
if obs is not None:
obs.score(
name=case.get("score_name", f"journey.case_{case['id']}"),
value=score,
comment=comment,
)
if lf:
lf.flush()
assert score == 1.0, f"[{case['id']}] {case.get('description', '')}{comment}"