"""Tests for the Chatbot Journey endpoints. Covers: 1. Start journey for local agent → session_id + first question, done=False 2. Start journey for cloud agent → contextual email-focused question 3. Start journey with existing agent_id → session seeded, first question returned 4. Start journey with non-existent agent_id → still succeeds (graceful fallback) 5. Message: continue conversation → done=False, follow-up question returned 6. Message: LLM wraps up → done=True + prompt_template extracted correctly 7. Message with max-turns nudge → no crash, returns response 8. Invalid session_id → 404 9. Expired session → 404 10. Session ownership: user B cannot access user A's session 11. No JWT on /start → 401 12. No JWT on /message → 401 """ from __future__ import annotations import time import uuid from unittest.mock import AsyncMock, patch import pytest from fastapi.testclient import TestClient from sqlalchemy.ext.asyncio import AsyncSession from app.api.routes.agent_setup import ( _SESSION_TTL_SECONDS, _TEMPLATE_END, _TEMPLATE_START, _extract_template, _sessions, ) from app.models import LocalAgentConfig from tests.conftest import TEST_USER_IDS, auth_header # ── Helpers ────────────────────────────────────────────────────────────── def _start(client: TestClient, agent_type: str = "local", agent_id: str | None = None, tier: str = "power") -> dict: body: dict = {"agent_type": agent_type} if agent_id: body["agent_id"] = agent_id resp = client.post("/api/v1/agents/journey/start", json=body, headers=auth_header(tier)) return resp def _message(client: TestClient, session_id: str, message: str, tier: str = "power") -> dict: return client.post( "/api/v1/agents/journey/message", json={"session_id": session_id, "message": message}, headers=auth_header(tier), ) # ── Unit: _extract_template ─────────────────────────────────────────────── def test_extract_template_present(): text = f"Some preamble.\n{_TEMPLATE_START}\nExtract tasks from emails.\n{_TEMPLATE_END}\nTrailing text." result = _extract_template(text) assert result == "Extract tasks from emails." def test_extract_template_absent(): assert _extract_template("No markers here.") is None def test_extract_template_empty_content(): text = f"{_TEMPLATE_START}\n{_TEMPLATE_END}" assert _extract_template(text) is None # ── Start journey ───────────────────────────────────────────────────────── def test_start_journey_local(client: TestClient): resp = _start(client, agent_type="local") assert resp.status_code == 200 body = resp.json() assert "session_id" in body assert body["done"] is False assert body["prompt_template"] is None assert len(body["message"]) > 0 # Local question should be about files/directories assert any(w in body["message"].lower() for w in ("file", "director", "document", "monitor")) def test_start_journey_cloud(client: TestClient): resp = _start(client, agent_type="cloud") assert resp.status_code == 200 body = resp.json() assert body["done"] is False # Cloud question should mention emails or messages assert any(w in body["message"].lower() for w in ("email", "message", "communication")) def test_start_journey_with_agent_id(client: TestClient, db_session: AsyncSession): """When agent_id is provided, session should be created even if agent doesn't exist.""" fake_agent_id = str(uuid.uuid4()) resp = _start(client, agent_type="local", agent_id=fake_agent_id) # Should succeed gracefully even if the agent_id doesn't exist assert resp.status_code == 200 body = resp.json() assert body["done"] is False def test_start_journey_with_existing_agent(client: TestClient, db_session: AsyncSession): """When a real local agent is provided, session is seeded with its prompt_template.""" import asyncio user_id = TEST_USER_IDS["power"] agent = LocalAgentConfig( id=str(uuid.uuid4()), user_id=user_id, name="Test Agent", device_id="device-1", directory_paths=["/home/user/emails"], data_types=["tasks"], prompt_template="Extract tasks from .eml files.", file_extensions=[".eml"], schedule_cron="0 */6 * * *", enabled=True, ) async def _seed(): db_session.add(agent) await db_session.commit() asyncio.get_event_loop().run_until_complete(_seed()) resp = _start(client, agent_type="local", agent_id=agent.id) assert resp.status_code == 200 body = resp.json() assert body["done"] is False # The session should be stored assert body["session_id"] in _sessions def test_start_journey_requires_auth(client: TestClient): resp = client.post("/api/v1/agents/journey/start", json={"agent_type": "local"}) assert resp.status_code == 401 # ── Message ─────────────────────────────────────────────────────────────── def test_message_continues_conversation(client: TestClient): """A mid-journey reply (no template markers) returns done=False.""" follow_up = "That looks good. Can you tell me more about priority rules?" with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)): start_resp = _start(client, agent_type="local") assert start_resp.status_code == 200 session_id = start_resp.json()["session_id"] msg_resp = _message(client, session_id, "I have .eml and .txt files") assert msg_resp.status_code == 200 body = msg_resp.json() assert body["done"] is False assert body["prompt_template"] is None assert body["message"] == follow_up assert body["session_id"] == session_id def test_message_produces_template(client: TestClient): """When the LLM includes PROMPT_TEMPLATE markers, done=True and prompt_template is set.""" final_template = "Extract tasks from email. Subject → title. 'urgent' → high priority." llm_response = ( "Great, I have all the information I need.\n" f"{_TEMPLATE_START}\n{final_template}\n{_TEMPLATE_END}\n" ) with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=llm_response)): start_resp = _start(client, agent_type="cloud") assert start_resp.status_code == 200 session_id = start_resp.json()["session_id"] msg_resp = _message(client, session_id, "Only invoices from clients") assert msg_resp.status_code == 200 body = msg_resp.json() assert body["done"] is True assert body["prompt_template"] == final_template # Session should be cleaned up assert session_id not in _sessions def test_message_invalid_session(client: TestClient): resp = _message(client, "nonexistent-session-id", "hello") assert resp.status_code == 404 def test_message_wrong_owner(client: TestClient): """User B cannot access user A's session.""" start_resp = _start(client, agent_type="local", tier="power") session_id = start_resp.json()["session_id"] # user with "pro" tier (different user_id) tries to send a message resp = client.post( "/api/v1/agents/journey/message", json={"session_id": session_id, "message": "hello"}, headers=auth_header("pro"), # different user ) assert resp.status_code == 404 def test_message_expired_session(client: TestClient): """Expired sessions return 404.""" start_resp = _start(client, agent_type="local") session_id = start_resp.json()["session_id"] # Manually expire the session _sessions[session_id].created_at = time.monotonic() - _SESSION_TTL_SECONDS - 1 resp = _message(client, session_id, "hello") assert resp.status_code == 404 def test_message_requires_auth(client: TestClient): resp = client.post( "/api/v1/agents/journey/message", json={"session_id": "any", "message": "hello"}, ) assert resp.status_code == 401 def test_message_max_turns_nudge(client: TestClient): """After _MAX_TURNS user messages, a system nudge is appended but no crash occurs.""" from app.api.routes.agent_setup import _MAX_TURNS follow_up = "Tell me more about priority rules." with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)): start_resp = _start(client, agent_type="local") session_id = start_resp.json()["session_id"] for i in range(_MAX_TURNS): resp = _message(client, session_id, f"Answer {i + 1}") assert resp.status_code == 200 # While no template produced, session must still exist if resp.json()["done"]: break # LLM decided to wrap up early — also fine