step 3.5 complete: chatbot journey endpoint

This commit is contained in:
2026-03-05 17:35:37 +01:00
parent fd1396a710
commit 24772f2b67
5 changed files with 591 additions and 4 deletions

View File

@@ -248,6 +248,8 @@ Tools must use **camelCase** field names (Drizzle maps them to snake_case intern
> **Objective:** Backend manages all agent configuration, scheduling, orchestration, and cloud data fetching. Two agent types: **Local Directory Agent** (backend triggers Electron to read files, then AI analyzes) and **Cloud Connector Agent** (backend fetches Gmail/Teams data directly, AI analyzes, pushes results to Electron via WS tool_call). All extracted items use existing WS tool infrastructure to insert into Electron's local DB with `is_ai_suggested=True`.
>
> **Electron Phase 3 plan:** `../adiuva/AI_REFACTOR_PLAN.md` Phase 3 section.
>
> **Electron UI status (2025):** Steps 3.6, 3.7, 3.8 of the Electron plan are ✅ complete. Agents are configured inside the Settings page (`/settings?section=agents`) — not a standalone route. The `JourneyDialog` (Step 3.8) is embedded inline in the Settings → Agents section. `LocalAgentConfigPanel` and `CloudAgentConfigPanel` (Step 3.7) are also inline. This affects the journey API contract (see Step 3.5 below).
### Architecture
@@ -412,22 +414,27 @@ Cloud Agent:
- **Outcome:** Backend drives all agent execution — both local (via WS file request) and cloud (direct API calls — stub until Step 3.6).
### Step 3.5 — Chatbot Journey endpoint
- [ ] Create `app/api/routes/agent_setup.py`:
- [x] Create `app/api/routes/agent_setup.py`:
- `POST /api/v1/agents/journey/start`:
- Body: `{ agent_type: "local"|"cloud", data_types: ["tasks", "notes", ...] }`
- Body: `{ agent_type: "local"|"cloud", agent_id: str | None }`
- `agent_type`: which kind of agent this journey configures.
- `agent_id`: optional — if provided, the session is pre-seeded with the existing agent's `prompt_template` so the user can refine it. If absent, fresh journey.
- **No `data_types` field** — data types are determined through the conversation itself, not sent upfront.
- Creates a journey session (in-memory or Redis-backed)
- Returns first AI message: contextual question based on agent type
- Local: "What kind of files are in the directories you want to monitor? (emails, documents, logs, etc.)"
- Cloud: "What kind of emails/messages should I look for? (client communications, invoices, meeting notes, etc.)"
- Response: `{ session_id, message, done: false }`
- **Electron note:** `proxyPost` auto-converts camelCase keys to snake_case. Electron sends `{ agentType, agentId }` → backend receives `{ agent_type, agent_id }`.
- `POST /api/v1/agents/journey/message`:
- Body: `{ session_id, message }`
- AI processes user's answer, asks follow-up questions (max 5 turns)
- System prompt: "You are configuring a data extraction agent for a freelancer. Ask about file format, what data to extract (tasks, notes, checkpoints), naming conventions, priority rules, and any special mapping. After 3-5 questions, generate a detailed prompt_template."
- When AI determines enough context: `{ session_id, message: "Here's your configuration...", done: true, prompt_template: "..." }`
- The `prompt_template` is a structured instruction for the extraction LLM (e.g. "Extract tasks from email. Subject becomes task title. If body contains 'urgent' or 'ASAP', set priority to 'high'. Extract due dates if mentioned.")
- **Electron note:** `toCamelCase` converts the response → Electron reads `promptTemplate` from the final message and auto-fills the agent config panel. User clicks "Save & apply" which calls `agent.local.update` / `agent.cloud.update` tRPC mutation.
- **Files:** `app/api/routes/agent_setup.py`, `app/main.py`
- **Outcome:** Users configure AI prompts through guided conversation, not manual text editing.
- **Outcome:** Users configure AI prompts through guided conversation. Journey can refine an existing config when `agent_id` is provided. ✅
### Step 3.6 — Cloud provider integrations
- [ ] Create `app/integrations/gmail.py`:

View File

@@ -0,0 +1,317 @@
"""Chatbot Journey endpoints — guided conversation to build an agent prompt_template.
Endpoints:
POST /agents/journey/start — start a new journey session
POST /agents/journey/message — continue the conversation
Sessions are stored in-memory with a 30-minute TTL. Stale entries are
cleaned up lazily on access. Upgrade to Redis for multi-instance deployments.
Journey flow:
1. Client sends ``{ agent_type, agent_id? }`` to ``/start``.
2. Server creates a session, calls the LLM with a contextual system prompt,
and returns the first question.
3. Client sends follow-up messages to ``/message``.
4. After 3-5 turns the LLM wraps up by emitting a ``prompt_template`` block
delimited by ``PROMPT_TEMPLATE_START`` / ``PROMPT_TEMPLATE_END``.
5. Server parses the block, sets ``done=True``, and returns the template.
The ``prompt_template`` from the final response is meant to be stored in
``LocalAgentConfig.prompt_template`` or ``CloudAgentConfig.prompt_template``
by the Electron client (via the agent CRUD endpoints).
"""
from __future__ import annotations
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user
from app.core.llm import get_llm
from app.db import get_session
from app.models import CloudAgentConfig, LocalAgentConfig
from app.schemas import (
JourneyMessageRequest,
JourneyResponse,
JourneyStartRequest,
UserProfile,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/agents/journey", tags=["agents"])
# ── Session TTL ───────────────────────────────────────────────────────────
_SESSION_TTL_SECONDS: int = 1800 # 30 minutes
# Sentinel strings used to delimit the LLM-produced prompt_template.
_TEMPLATE_START = "PROMPT_TEMPLATE_START"
_TEMPLATE_END = "PROMPT_TEMPLATE_END"
# Maximum number of conversation turns before the LLM is nudged to wrap up.
_MAX_TURNS: int = 5
# ── In-memory session store ───────────────────────────────────────────────
@dataclass
class _JourneySession:
session_id: str
user_id: str
agent_type: str # "local" | "cloud"
history: list[dict[str, Any]] = field(default_factory=list)
created_at: float = field(default_factory=time.monotonic)
def is_expired(self) -> bool:
return (time.monotonic() - self.created_at) > _SESSION_TTL_SECONDS
# session_id → session
_sessions: dict[str, _JourneySession] = {}
def _get_session(session_id: str, user_id: str) -> _JourneySession:
"""Retrieve session; raise 404 on missing, expired, or wrong owner."""
s = _sessions.get(session_id)
if s is None or s.is_expired():
_sessions.pop(session_id, None)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Journey session not found or expired")
if s.user_id != user_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Journey session not found or expired")
return s
# ── System prompt builder ─────────────────────────────────────────────────
_LOCAL_PREAMBLE = """\
What kind of files are in the directories you want to monitor? \
(for example: emails saved as .eml, documents in .pdf or .txt, markdown notes, etc.)"""
_CLOUD_PREAMBLE = """\
What kind of emails or messages should I look for? \
(for example: client communications, invoices, meeting notes, project updates, etc.)"""
_SYSTEM_PROMPT_TEMPLATE = """\
You are a friendly assistant helping a freelancer configure a data-extraction agent.
Your job is to understand exactly what data the user wants to extract from their {source_description} \
and produce a detailed prompt_template that a separate AI will use as its instruction set.
Ask concise, focused questions one at a time. Cover these topics (not necessarily in this order):
1. The type and format of the source content.
2. Which data types to extract: tasks, notes, checkpoints, and/or projects.
3. How fields should be mapped (e.g. email subject → task title).
4. Priority or status rules (e.g. "urgent" keyword → high priority).
5. Any special handling, date extraction, or exclusions.
After 3-5 questions (when you have enough information), output the final prompt_template between \
these exact markers on their own lines:
{template_start}
<the complete extraction prompt here>
{template_end}
The prompt_template must be a self-contained instruction for an AI that receives a document/email/message \
and must return a JSON array of records in this shape:
[{{ "table": "<tasks|notes|checkpoints|projects>", "data": {{ <field: value> }} }}, ...]
Rules for the generated template:
- Be explicit about field names (camelCase: title, status, priority, dueDate, projectId, content, etc.).
- Include concrete examples of mappings.
- Mention that Electron adds id/createdAt/updatedAt automatically.
- Set isAiSuggested: true and isApproved: false on every record.
{existing_section}\
Do not ask more than {max_turns} questions total. Start with your first question now.\
"""
def _build_system_prompt(agent_type: str, existing_template: str | None) -> str:
source_description = (
"files in local directories" if agent_type == "local" else "emails and messages from cloud providers"
)
existing_section = (
f"\nThe user already has the following prompt_template — refine it based on their answers:\n"
f"---\n{existing_template}\n---\n"
if existing_template
else ""
)
return _SYSTEM_PROMPT_TEMPLATE.format(
source_description=source_description,
template_start=_TEMPLATE_START,
template_end=_TEMPLATE_END,
existing_section=existing_section,
max_turns=_MAX_TURNS,
)
def _first_question(agent_type: str) -> str:
return _LOCAL_PREAMBLE if agent_type == "local" else _CLOUD_PREAMBLE
# ── Template extraction ───────────────────────────────────────────────────
def _extract_template(text: str) -> str | None:
"""Return the text between PROMPT_TEMPLATE_START and PROMPT_TEMPLATE_END, or None."""
if _TEMPLATE_START not in text or _TEMPLATE_END not in text:
return None
start_idx = text.index(_TEMPLATE_START) + len(_TEMPLATE_START)
end_idx = text.index(_TEMPLATE_END)
return text[start_idx:end_idx].strip() or None
# ── LLM call ─────────────────────────────────────────────────────────────
async def _call_llm(system_prompt: str, history: list[dict[str, Any]]) -> str:
"""Build LangChain messages from history and invoke the LLM."""
messages: list[Any] = [SystemMessage(content=system_prompt)]
for turn in history:
if turn["role"] == "user":
messages.append(HumanMessage(content=turn["content"]))
else:
messages.append(AIMessage(content=turn["content"]))
llm = get_llm(model=None, temperature=0.4)
response = await llm.ainvoke(messages)
return response.content # type: ignore[return-value]
# ── Existing-config loader ────────────────────────────────────────────────
async def _load_existing_template(
agent_id: str,
user_id: str,
db: AsyncSession,
) -> str | None:
"""Return the prompt_template of an existing agent config, or None."""
# Try local first, then cloud.
local_result = await db.execute(
select(LocalAgentConfig).where(
LocalAgentConfig.id == agent_id,
LocalAgentConfig.user_id == user_id,
)
)
local = local_result.scalar_one_or_none()
if local is not None:
return local.prompt_template
cloud_result = await db.execute(
select(CloudAgentConfig).where(
CloudAgentConfig.id == agent_id,
CloudAgentConfig.user_id == user_id,
)
)
cloud = cloud_result.scalar_one_or_none()
return cloud.prompt_template if cloud is not None else None
# ── Routes ────────────────────────────────────────────────────────────────
@router.post("/start", response_model=JourneyResponse, status_code=status.HTTP_200_OK)
async def start_journey(
body: JourneyStartRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> JourneyResponse:
"""Start a new Chatbot Journey session.
If ``agent_id`` is provided the session is pre-seeded with the existing
agent's ``prompt_template`` so the user can refine it.
"""
# Load existing template (may be None).
existing_template: str | None = None
if body.agent_id:
existing_template = await _load_existing_template(body.agent_id, current_user.id, db)
# If agent_id was given but not found, proceed without seeding (don't 404 —
# the user may be starting a fresh journey for a not-yet-persisted config).
system_prompt = _build_system_prompt(body.agent_type, existing_template)
first_question = _first_question(body.agent_type)
session_id = str(uuid.uuid4())
session = _JourneySession(
session_id=session_id,
user_id=current_user.id,
agent_type=body.agent_type,
# Seed history with the AI's first question so it stays consistent.
history=[{"role": "assistant", "content": first_question}],
)
# Store the system prompt inside the session for reuse in /message.
session.__dict__["_system_prompt"] = system_prompt # type: ignore[index]
_sessions[session_id] = session
logger.info("Journey session %s started for user %s (agent_type=%s)", session_id, current_user.id, body.agent_type)
return JourneyResponse(session_id=session_id, message=first_question, done=False)
@router.post("/message", response_model=JourneyResponse, status_code=status.HTTP_200_OK)
async def send_journey_message(
body: JourneyMessageRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> JourneyResponse:
"""Send a message in an existing Chatbot Journey session.
The server appends the user's message to the conversation history,
calls the LLM, and appends the AI reply. When the LLM wraps up with a
``prompt_template`` block the response includes ``done=True`` and the
extracted template.
"""
session = _get_session(body.session_id, current_user.id)
system_prompt: str = session.__dict__.get("_system_prompt", _build_system_prompt(session.agent_type, None)) # type: ignore[assignment]
# Append user turn to history.
session.history.append({"role": "user", "content": body.message})
# Call the LLM with the full conversation so far.
ai_reply = await _call_llm(system_prompt, session.history)
# Append AI turn.
session.history.append({"role": "assistant", "content": ai_reply})
# Check if the LLM produced the final template.
prompt_template = _extract_template(ai_reply)
done = prompt_template is not None
# Strip the sentinel markers from the message shown to the user.
display_message = ai_reply
if done:
display_message = (
ai_reply[: ai_reply.index(_TEMPLATE_START)].strip()
or "Here is your agent configuration. You can save it or continue refining."
)
if done:
logger.info("Journey session %s completed for user %s", body.session_id, current_user.id)
# Clean up the session immediately on completion.
_sessions.pop(body.session_id, None)
else:
# Nudge the LLM to wrap up after max turns.
turns = sum(1 for t in session.history if t["role"] == "user")
if turns >= _MAX_TURNS:
# Add a system-level nudge as a hidden user message.
session.history.append({
"role": "user",
"content": (
"[System: You have enough information. Please generate the final "
f"prompt_template now, wrapped in {_TEMPLATE_START} / {_TEMPLATE_END} markers.]"
),
})
return JourneyResponse(
session_id=body.session_id,
message=display_message,
done=done,
prompt_template=prompt_template,
)

View File

@@ -43,7 +43,7 @@ def create_app() -> FastAPI:
app.add_middleware(SanitizerMiddleware)
app.add_middleware(TierRateLimitMiddleware)
from app.api.routes import agents, auth, backup, billing, chat, device_ws, plans, plugins, storage, vectors
from app.api.routes import agent_setup, agents, auth, backup, billing, chat, device_ws, plans, plugins, storage, vectors
app.include_router(auth.router, prefix="/api/v1")
app.include_router(chat.router, prefix="/api/v1")
@@ -54,6 +54,7 @@ def create_app() -> FastAPI:
app.include_router(plugins.router, prefix="/api/v1")
app.include_router(billing.router, prefix="/api/v1")
app.include_router(agents.router, prefix="/api/v1")
app.include_router(agent_setup.router, prefix="/api/v1")
app.include_router(device_ws.router, prefix="/api/v1")
@app.get("/api/v1/health", tags=["health"])

View File

@@ -347,3 +347,22 @@ class AgentRunLogResponse(BaseModel):
errors: list[str]
started_at: int
completed_at: int | None
# ── Chatbot Journey ───────────────────────────────────────────────────
class JourneyStartRequest(BaseModel):
agent_type: Literal["local", "cloud"]
agent_id: str | None = None
class JourneyMessageRequest(BaseModel):
session_id: str
message: str
class JourneyResponse(BaseModel):
session_id: str
message: str
done: bool
prompt_template: str | None = None

243
tests/test_agent_setup.py Normal file
View File

@@ -0,0 +1,243 @@
"""Tests for the Chatbot Journey endpoints.
Covers:
1. Start journey for local agent → session_id + first question, done=False
2. Start journey for cloud agent → contextual email-focused question
3. Start journey with existing agent_id → session seeded, first question returned
4. Start journey with non-existent agent_id → still succeeds (graceful fallback)
5. Message: continue conversation → done=False, follow-up question returned
6. Message: LLM wraps up → done=True + prompt_template extracted correctly
7. Message with max-turns nudge → no crash, returns response
8. Invalid session_id → 404
9. Expired session → 404
10. Session ownership: user B cannot access user A's session
11. No JWT on /start → 401
12. No JWT on /message → 401
"""
from __future__ import annotations
import time
import uuid
from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.routes.agent_setup import (
_SESSION_TTL_SECONDS,
_TEMPLATE_END,
_TEMPLATE_START,
_extract_template,
_sessions,
)
from app.models import LocalAgentConfig
from tests.conftest import TEST_USER_IDS, auth_header
# ── Helpers ──────────────────────────────────────────────────────────────
def _start(client: TestClient, agent_type: str = "local", agent_id: str | None = None, tier: str = "power") -> dict:
body: dict = {"agent_type": agent_type}
if agent_id:
body["agent_id"] = agent_id
resp = client.post("/api/v1/agents/journey/start", json=body, headers=auth_header(tier))
return resp
def _message(client: TestClient, session_id: str, message: str, tier: str = "power") -> dict:
return client.post(
"/api/v1/agents/journey/message",
json={"session_id": session_id, "message": message},
headers=auth_header(tier),
)
# ── Unit: _extract_template ───────────────────────────────────────────────
def test_extract_template_present():
text = f"Some preamble.\n{_TEMPLATE_START}\nExtract tasks from emails.\n{_TEMPLATE_END}\nTrailing text."
result = _extract_template(text)
assert result == "Extract tasks from emails."
def test_extract_template_absent():
assert _extract_template("No markers here.") is None
def test_extract_template_empty_content():
text = f"{_TEMPLATE_START}\n{_TEMPLATE_END}"
assert _extract_template(text) is None
# ── Start journey ─────────────────────────────────────────────────────────
def test_start_journey_local(client: TestClient):
resp = _start(client, agent_type="local")
assert resp.status_code == 200
body = resp.json()
assert "session_id" in body
assert body["done"] is False
assert body["prompt_template"] is None
assert len(body["message"]) > 0
# Local question should be about files/directories
assert any(w in body["message"].lower() for w in ("file", "director", "document", "monitor"))
def test_start_journey_cloud(client: TestClient):
resp = _start(client, agent_type="cloud")
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
# Cloud question should mention emails or messages
assert any(w in body["message"].lower() for w in ("email", "message", "communication"))
def test_start_journey_with_agent_id(client: TestClient, db_session: AsyncSession):
"""When agent_id is provided, session should be created even if agent doesn't exist."""
fake_agent_id = str(uuid.uuid4())
resp = _start(client, agent_type="local", agent_id=fake_agent_id)
# Should succeed gracefully even if the agent_id doesn't exist
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
def test_start_journey_with_existing_agent(client: TestClient, db_session: AsyncSession):
"""When a real local agent is provided, session is seeded with its prompt_template."""
import asyncio
user_id = TEST_USER_IDS["power"]
agent = LocalAgentConfig(
id=str(uuid.uuid4()),
user_id=user_id,
name="Test Agent",
device_id="device-1",
directory_paths=["/home/user/emails"],
data_types=["tasks"],
prompt_template="Extract tasks from .eml files.",
file_extensions=[".eml"],
schedule_cron="0 */6 * * *",
enabled=True,
)
async def _seed():
db_session.add(agent)
await db_session.commit()
asyncio.get_event_loop().run_until_complete(_seed())
resp = _start(client, agent_type="local", agent_id=agent.id)
assert resp.status_code == 200
body = resp.json()
assert body["done"] is False
# The session should be stored
assert body["session_id"] in _sessions
def test_start_journey_requires_auth(client: TestClient):
resp = client.post("/api/v1/agents/journey/start", json={"agent_type": "local"})
assert resp.status_code == 401
# ── Message ───────────────────────────────────────────────────────────────
def test_message_continues_conversation(client: TestClient):
"""A mid-journey reply (no template markers) returns done=False."""
follow_up = "That looks good. Can you tell me more about priority rules?"
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)):
start_resp = _start(client, agent_type="local")
assert start_resp.status_code == 200
session_id = start_resp.json()["session_id"]
msg_resp = _message(client, session_id, "I have .eml and .txt files")
assert msg_resp.status_code == 200
body = msg_resp.json()
assert body["done"] is False
assert body["prompt_template"] is None
assert body["message"] == follow_up
assert body["session_id"] == session_id
def test_message_produces_template(client: TestClient):
"""When the LLM includes PROMPT_TEMPLATE markers, done=True and prompt_template is set."""
final_template = "Extract tasks from email. Subject → title. 'urgent' → high priority."
llm_response = (
"Great, I have all the information I need.\n"
f"{_TEMPLATE_START}\n{final_template}\n{_TEMPLATE_END}\n"
)
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=llm_response)):
start_resp = _start(client, agent_type="cloud")
assert start_resp.status_code == 200
session_id = start_resp.json()["session_id"]
msg_resp = _message(client, session_id, "Only invoices from clients")
assert msg_resp.status_code == 200
body = msg_resp.json()
assert body["done"] is True
assert body["prompt_template"] == final_template
# Session should be cleaned up
assert session_id not in _sessions
def test_message_invalid_session(client: TestClient):
resp = _message(client, "nonexistent-session-id", "hello")
assert resp.status_code == 404
def test_message_wrong_owner(client: TestClient):
"""User B cannot access user A's session."""
start_resp = _start(client, agent_type="local", tier="power")
session_id = start_resp.json()["session_id"]
# user with "pro" tier (different user_id) tries to send a message
resp = client.post(
"/api/v1/agents/journey/message",
json={"session_id": session_id, "message": "hello"},
headers=auth_header("pro"), # different user
)
assert resp.status_code == 404
def test_message_expired_session(client: TestClient):
"""Expired sessions return 404."""
start_resp = _start(client, agent_type="local")
session_id = start_resp.json()["session_id"]
# Manually expire the session
_sessions[session_id].created_at = time.monotonic() - _SESSION_TTL_SECONDS - 1
resp = _message(client, session_id, "hello")
assert resp.status_code == 404
def test_message_requires_auth(client: TestClient):
resp = client.post(
"/api/v1/agents/journey/message",
json={"session_id": "any", "message": "hello"},
)
assert resp.status_code == 401
def test_message_max_turns_nudge(client: TestClient):
"""After _MAX_TURNS user messages, a system nudge is appended but no crash occurs."""
from app.api.routes.agent_setup import _MAX_TURNS
follow_up = "Tell me more about priority rules."
with patch("app.api.routes.agent_setup._call_llm", new=AsyncMock(return_value=follow_up)):
start_resp = _start(client, agent_type="local")
session_id = start_resp.json()["session_id"]
for i in range(_MAX_TURNS):
resp = _message(client, session_id, f"Answer {i + 1}")
assert resp.status_code == 200
# While no template produced, session must still exist
if resp.json()["done"]:
break # LLM decided to wrap up early — also fine