Files
api/tests/test_agent_setup.py
Roberto Musso 7ccdad431f feat(i18n): inject user language into AI agent system prompts
- Add _language_instruction() to deep_agent.py, reads language from core memory
- Append language directive to all 4 run_* functions (task/project/checkpoint/note)
- Minor fixes: alembic env, route imports, test cleanup
2026-04-12 00:35:23 +02:00

243 lines
9.1 KiB
Python

"""Tests for the Chatbot Journey endpoints.
Covers:
1. Start journey for local agent → session_id + first question, done=False
2. Start journey for cloud agent → contextual email-focused question
3. Start journey with existing agent_id → session seeded, first question returned
4. Start journey with non-existent agent_id → still succeeds (graceful fallback)
5. Message: continue conversation → done=False, follow-up question returned
6. Message: LLM wraps up → done=True + prompt_template extracted correctly
7. Message with max-turns nudge → no crash, returns response
8. Invalid session_id → 404
9. Expired session → 404
10. Session ownership: user B cannot access user A's session
11. No JWT on /start → 401
12. No JWT on /message → 401
"""
from __future__ import annotations
import time
import uuid
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.routes.agent_setup import (
_SESSION_TTL_SECONDS,
_TEMPLATE_END,
_TEMPLATE_START,
_extract_template,
_sessions,
)
from app.models import LocalAgentConfig
from tests.conftest import TEST_USER_IDS, auth_header
# ── Helpers ──────────────────────────────────────────────────────────────
def _start(client: TestClient, agent_type: str = "local", agent_id: str | None = None, tier: str = "power") -> dict:
body: dict = {"agent_type": agent_type}
if agent_id:
body["agent_id"] = agent_id
resp = client.post("/api/v1/agents/journey/start", json=body, headers=auth_header(tier))
return resp
def _message(client: TestClient, session_id: str, message: str, tier: str = "power") -> dict:
return client.post(
"/api/v1/agents/journey/message",
json={"session_id": session_id, "message": message},
headers=auth_header(tier),
)
# ── Unit: _extract_template ───────────────────────────────────────────────
def test_extract_template_present():
text = f"Some preamble.\n{_TEMPLATE_START}\nExtract tasks from emails.\n{_TEMPLATE_END}\nTrailing text."
result = _extract_template(text)
assert result == "Extract tasks from emails."
def test_extract_template_absent():
assert _extract_template("No markers here.") is None
def test_extract_template_empty_content():
text = f"{_TEMPLATE_START}\n{_TEMPLATE_END}"
assert _extract_template(text) is None
# ── Start journey ─────────────────────────────────────────────────────────
def test_start_journey_local(client: TestClient):
resp = _start(client, agent_type="local")
assert resp.status_code == 200
body = resp.json()
assert "session_id" in body
assert body["done"] is False
assert body["prompt_template"] is None
assert len(body["message"]) > 0
# Local question should be about files/directories
assert any(w in body["message"].lower() for w in ("file", "director", "document", "monitor"))
def test_start_journey_cloud(client: TestClient):
resp = _start(client, agent_type="cloud")
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
# Cloud question should mention emails or messages
assert any(w in body["message"].lower() for w in ("email", "message", "communication"))
def test_start_journey_with_agent_id(client: TestClient, db_session: AsyncSession):
"""When agent_id is provided, session should be created even if agent doesn't exist."""
fake_agent_id = str(uuid.uuid4())
resp = _start(client, agent_type="local", agent_id=fake_agent_id)
# Should succeed gracefully even if the agent_id doesn't exist
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
def test_start_journey_with_existing_agent(client: TestClient, db_session: AsyncSession):
"""When a real local agent is provided, session is seeded with its prompt_template."""
import asyncio
user_id = TEST_USER_IDS["power"]
agent = LocalAgentConfig(
id=str(uuid.uuid4()),
user_id=user_id,
name="Test Agent",
device_id="device-1",
directory_paths=["/home/user/emails"],
data_types=["tasks"],
prompt_template="Extract tasks from .eml files.",
file_extensions=[".eml"],
schedule_cron="0 */6 * * *",
enabled=True,
)
async def _seed():
db_session.add(agent)
await db_session.commit()
asyncio.get_event_loop().run_until_complete(_seed())
resp = _start(client, agent_type="local", agent_id=agent.id)
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
# The session should be stored
assert body["session_id"] in _sessions
def test_start_journey_requires_auth(client: TestClient):
resp = client.post("/api/v1/agents/journey/start", json={"agent_type": "local"})
assert resp.status_code == 401
# ── Message ───────────────────────────────────────────────────────────────
def test_message_continues_conversation(client: TestClient):
"""A mid-journey reply (no template markers) returns done=False."""
follow_up = "That looks good. Can you tell me more about priority rules?"
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)):
start_resp = _start(client, agent_type="local")
assert start_resp.status_code == 200
session_id = start_resp.json()["session_id"]
msg_resp = _message(client, session_id, "I have .eml and .txt files")
assert msg_resp.status_code == 200
body = msg_resp.json()
assert body["done"] is False
assert body["prompt_template"] is None
assert body["message"] == follow_up
assert body["session_id"] == session_id
def test_message_produces_template(client: TestClient):
"""When the LLM includes PROMPT_TEMPLATE markers, done=True and prompt_template is set."""
final_template = "Extract tasks from email. Subject → title. 'urgent' → high priority."
llm_response = (
"Great, I have all the information I need.\n"
f"{_TEMPLATE_START}\n{final_template}\n{_TEMPLATE_END}\n"
)
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=llm_response)):
start_resp = _start(client, agent_type="cloud")
assert start_resp.status_code == 200
session_id = start_resp.json()["session_id"]
msg_resp = _message(client, session_id, "Only invoices from clients")
assert msg_resp.status_code == 200
body = msg_resp.json()
assert body["done"] is True
assert body["prompt_template"] == final_template
# Session should be cleaned up
assert session_id not in _sessions
def test_message_invalid_session(client: TestClient):
resp = _message(client, "nonexistent-session-id", "hello")
assert resp.status_code == 404
def test_message_wrong_owner(client: TestClient):
"""User B cannot access user A's session."""
start_resp = _start(client, agent_type="local", tier="power")
session_id = start_resp.json()["session_id"]
# user with "pro" tier (different user_id) tries to send a message
resp = client.post(
"/api/v1/agents/journey/message",
json={"session_id": session_id, "message": "hello"},
headers=auth_header("pro"), # different user
)
assert resp.status_code == 404
def test_message_expired_session(client: TestClient):
"""Expired sessions return 404."""
start_resp = _start(client, agent_type="local")
session_id = start_resp.json()["session_id"]
# Manually expire the session
_sessions[session_id].created_at = time.monotonic() - _SESSION_TTL_SECONDS - 1
resp = _message(client, session_id, "hello")
assert resp.status_code == 404
def test_message_requires_auth(client: TestClient):
resp = client.post(
"/api/v1/agents/journey/message",
json={"session_id": "any", "message": "hello"},
)
assert resp.status_code == 401
def test_message_max_turns_nudge(client: TestClient):
"""After _MAX_TURNS user messages, a system nudge is appended but no crash occurs."""
from app.api.routes.agent_setup import _MAX_TURNS
follow_up = "Tell me more about priority rules."
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)):
start_resp = _start(client, agent_type="local")
session_id = start_resp.json()["session_id"]
for i in range(_MAX_TURNS):
resp = _message(client, session_id, f"Answer {i + 1}")
assert resp.status_code == 200
# While no template produced, session must still exist
if resp.json()["done"]:
break # LLM decided to wrap up early — also fine