3 Commits

Author SHA1 Message Date
Roberto Musso
1a8bf11f90 update migration plan 2026-03-20 23:48:36 +01:00
Roberto Musso
e7cdce8287 Improve Step 1 project matching and Step 2 update-first enforcement
- Rewrite _STEP1_SYSTEM_PROMPT: lower matching threshold (no longer requires
  "clear" match), strongly prefer existing projects over creating new ones,
  use structured id=|name=|status= format with aiSummary for richer context
- Add code-level UUID validation: reject hallucinated ids not in the fetched
  projects list, fall back to "new" instead of creating a bad link
- Rewrite _PROCESSING_SYSTEM_PROMPT: enforce explicit scan-before-create
  process (read existing → search → update if found → create only if not)
  with hard rule against calling create_* without checking existing records

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 23:45:29 +01:00
Roberto Musso
58bc6efd4b Rewrite run_local_agent: code-based flow, concurrency guard, remove isApproved
- Replace LLM-driven triage with code-based directory scan and project fetch
- Two-step LLM approach: Step 1 classifies file→project+domains, Step 2 processes with tools
- Add domain descriptions to Step 1 prompt for better extraction accuracy
- Add _running_agents set for per-agent concurrency guard (one running instance per agent)
- Return 409 from route before DB write when agent already running
- Remove is_approved from task_agent create/update tools and system prompt
- Remove is_approved from timeline_agent create/update tools and system prompt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 22:21:30 +01:00
5 changed files with 736 additions and 456 deletions

View File

@@ -29,7 +29,7 @@ TASK_SYSTEM_PROMPT = (
" - project_id is optional; link to a project when the user mentions one\n" " - project_id is optional; link to a project when the user mentions one\n"
" - is_ai_suggested: 1 only when proactively proposing a task the user\n" " - is_ai_suggested: 1 only when proactively proposing a task the user\n"
" did not explicitly request; 0 otherwise\n" " did not explicitly request; 0 otherwise\n"
" - is_approved defaults to 0; set to 1 only when the user confirms\n" " - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n"
" - Use list_tasks_due_today for 'what's due today' queries\n" " - Use list_tasks_due_today for 'what's due today' queries\n"
" - For update_task, use -1 for integer fields you do not want to change\n" " - For update_task, use -1 for integer fields you do not want to change\n"
" - Always confirm the action in plain, user-friendly language." " - Always confirm the action in plain, user-friendly language."
@@ -79,7 +79,6 @@ async def create_task(
due_date: int = 0, due_date: int = 0,
project_id: str = "", project_id: str = "",
is_ai_suggested: int = 0, is_ai_suggested: int = 0,
is_approved: int = 0,
) -> str: ) -> str:
"""Create a new task. """Create a new task.
title: task title (required) title: task title (required)
@@ -90,7 +89,6 @@ async def create_task(
due_date: Unix timestamp in milliseconds; 0 means no due date due_date: Unix timestamp in milliseconds; 0 means no due date
project_id: optional UUID of the parent project project_id: optional UUID of the parent project
is_ai_suggested: 1 if proactively suggested, 0 if user-requested is_ai_suggested: 1 if proactively suggested, 0 if user-requested
is_approved: 0 until the user confirms; 1 when confirmed
""" """
result = await execute_on_client( result = await execute_on_client(
action="insert", action="insert",
@@ -104,7 +102,6 @@ async def create_task(
"dueDate": due_date or None, "dueDate": due_date or None,
"projectId": project_id or None, "projectId": project_id or None,
"isAiSuggested": is_ai_suggested, "isAiSuggested": is_ai_suggested,
"isApproved": is_approved,
}, },
) )
row = result["row"] row = result["row"]
@@ -124,12 +121,10 @@ async def update_task(
assignees: str = "", assignees: str = "",
due_date: int = -1, due_date: int = -1,
project_id: str = "", project_id: str = "",
is_approved: int = -1,
) -> str: ) -> str:
"""Update fields on an existing task. Only pass fields you want to change. """Update fields on an existing task. Only pass fields you want to change.
task_id: the task's UUID (required) task_id: the task's UUID (required)
due_date: -1 means unchanged; 0 clears the due date; any positive value sets it due_date: -1 means unchanged; 0 clears the due date; any positive value sets it
is_approved: -1 means unchanged; 0 or 1 sets the value
""" """
updates: dict[str, Any] = {} updates: dict[str, Any] = {}
if title: if title:
@@ -146,8 +141,6 @@ async def update_task(
updates["dueDate"] = due_date or None updates["dueDate"] = due_date or None
if project_id: if project_id:
updates["projectId"] = project_id updates["projectId"] = project_id
if is_approved != -1:
updates["isApproved"] = is_approved
result = await execute_on_client( result = await execute_on_client(
action="update", action="update",
table="tasks", table="tasks",

View File

@@ -25,7 +25,7 @@ TIMELINE_SYSTEM_PROMPT = (
" - For listing, project_id must be a UUID; never pass plain names as project_id\n" " - For listing, project_id must be a UUID; never pass plain names as project_id\n"
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n" " - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n" " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
" - is_approved: 0 until the user explicitly confirms; then 1\n" " - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
" - For update_timeline, use -1 for integer fields you do not want to change\n" " - For update_timeline, use -1 for integer fields you do not want to change\n"
" - Listing without a project_id returns all timelines across projects\n" " - Listing without a project_id returns all timelines across projects\n"
" - Always echo the title and formatted date in your confirmation." " - Always echo the title and formatted date in your confirmation."
@@ -54,14 +54,12 @@ async def create_timeline(
title: str, title: str,
date: int, date: int,
is_ai_suggested: int = 0, is_ai_suggested: int = 0,
is_approved: int = 0,
) -> str: ) -> str:
"""Create a project timeline (milestone). """Create a project timeline (milestone).
project_id: REQUIRED UUID of the parent project project_id: REQUIRED UUID of the parent project
title: descriptive name for the milestone title: descriptive name for the milestone
date: Unix timestamp in milliseconds date: Unix timestamp in milliseconds
is_ai_suggested: 1 if proactively suggested, 0 if user-requested is_ai_suggested: 1 if proactively suggested, 0 if user-requested
is_approved: 0 until the user confirms
""" """
result = await execute_on_client( result = await execute_on_client(
action="insert", action="insert",
@@ -71,7 +69,6 @@ async def create_timeline(
"title": title, "title": title,
"date": date, "date": date,
"isAiSuggested": is_ai_suggested, "isAiSuggested": is_ai_suggested,
"isApproved": is_approved,
}, },
) )
row = result["row"] row = result["row"]
@@ -83,20 +80,16 @@ async def update_timeline(
timeline_id: str, timeline_id: str,
title: str = "", title: str = "",
date: int = -1, date: int = -1,
is_approved: int = -1,
) -> str: ) -> str:
"""Update a timeline. Only pass fields that should change. """Update a timeline. Only pass fields that should change.
timeline_id: UUID of the timeline (required) timeline_id: UUID of the timeline (required)
date: -1 means unchanged; any other value sets the new date (ms timestamp) date: -1 means unchanged; any other value sets the new date (ms timestamp)
is_approved: -1 means unchanged; 0 or 1 sets the approval state
""" """
updates: dict[str, Any] = {} updates: dict[str, Any] = {}
if title: if title:
updates["title"] = title updates["title"] = title
if date != -1: if date != -1:
updates["date"] = date updates["date"] = date
if is_approved != -1:
updates["isApproved"] = is_approved
result = await execute_on_client( result = await execute_on_client(
action="update", action="update",
table="timelines", table="timelines",

View File

@@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user from app.api.deps import get_current_user
from app.billing.tier_manager import FEATURES from app.billing.tier_manager import FEATURES
from app.core.agent_runner import run_local_agent from app.core.agent_runner import is_agent_running, run_local_agent
from app.core.device_manager import device_manager from app.core.device_manager import device_manager
from app.db import get_session from app.db import get_session
from app.models import AgentRunLog, LocalAgentConfig from app.models import AgentRunLog, LocalAgentConfig
@@ -193,6 +193,12 @@ async def trigger_agent_run(
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id. # Use the FE's stable agent_id if provided, fall back to the ephemeral config id.
stable_agent_id = body.agent_id or config.id stable_agent_id = body.agent_id or config.id
if is_agent_running(stable_agent_id):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Agent is already running. Only one run per agent is allowed at a time.",
)
run_log = AgentRunLog( run_log = AgentRunLog(
agent_id=stable_agent_id, agent_id=stable_agent_id,
agent_type="local", agent_type="local",

View File

@@ -2,11 +2,12 @@
Drives two agent types: Drives two agent types:
* **Local directory agent** — two-phase execution that mirrors the * **Local directory agent** — two-step execution per file:
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the Step 1 (Classification) uses code to fetch all projects and asks the LLM
user's directory via file-system tools and groups files by project. to identify which project the file belongs to and which domains are relevant.
Phase 2 (Processing) reads full file contents and performs CRUD Step 2 (Processing) fetches existing entities for that project/domains via
operations using the standard entity tools (tasks, notes, etc.). code and runs an LLM with tools — existing data in context enforces
update-first naturally.
* **Cloud connector agent** — fetches data from third-party APIs (Gmail, * **Cloud connector agent** — fetches data from third-party APIs (Gmail,
Teams, Outlook) and pushes extracted items to Electron. Teams, Outlook) and pushes extracted items to Electron.
@@ -43,19 +44,30 @@ from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.llm import get_llm from app.core.llm import get_llm
from app.core.ws_context import clear_client_executor, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session from app.db import async_session
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ── Concurrency guard ─────────────────────────────────────────────────────
# Tracks agent IDs that currently have a run in progress.
# Prevents multiple simultaneous runs of the same agent within a single process.
_running_agents: set[str] = set()
def is_agent_running(agent_id: str) -> bool:
"""Return ``True`` if *agent_id* already has a run in progress."""
return agent_id in _running_agents
# ── Timeouts ─────────────────────────────────────────────────────────────── # ── Timeouts ───────────────────────────────────────────────────────────────
# Max seconds to wait for a single tool-call round-trip (FE → BE). # Max seconds to wait for a single tool-call round-trip (FE → BE).
_TOOL_CALL_TIMEOUT: int = 30 _TOOL_CALL_TIMEOUT: int = 30
# Max LLM reasoning steps per phase. # Max LLM reasoning steps for Step 2 processing.
_MAX_TRIAGE_STEPS: int = 10
_MAX_PROCESSING_STEPS: int = 12 _MAX_PROCESSING_STEPS: int = 12
# Max directory recursion depth during scan.
_MAX_SCAN_DEPTH: int = 5
# ── Data-type to tool mapping ───────────────────────────────────────────── # ── Data-type to tool mapping ─────────────────────────────────────────────
@@ -66,46 +78,88 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"timelines": TIMELINE_TOOLS, "timelines": TIMELINE_TOOLS,
} }
# ── Triage prompt ───────────────────────────────────────────────────────── # ── Step 1: Classification prompt ─────────────────────────────────────────
_TRIAGE_SYSTEM_PROMPT = """\ _DOMAIN_DESCRIPTIONS: dict[str, str] = {
You are a file triage assistant for a freelance project management tool. "tasks": (
Your job is to explore a local directory on the user's device, understand its "Action items, to-dos, deliverables — anything that describes work to be done, "
structure, and group files by project context. "assigned to someone, or tracked with a due date or status."
),
"notes": (
"Documentation, meeting notes, summaries, reference material — "
"written content meant to be read and referenced rather than acted on."
),
"timelines": (
"Project milestones, deadlines, scheduled events — "
"specific dates that mark a point in the progress of a project."
),
"projects": (
"High-level project entities — only relevant if the file clearly introduces "
"a new project or updates the scope of an existing one."
),
}
You have access to these tools: _STEP1_SYSTEM_PROMPT = """\
- list_directory: to map folder structure You are a file classifier for a freelance project management tool.
- get_file_metadata: to check creation/modification dates
- read_file_content: to read brief snippets when needed for categorisation
- list_projects / list_all_projects / get_project: to fetch existing projects
from the user's workspace and match files to them
Instructions: Your job is to match a file to an existing project and identify which data domains to extract.
1. Start by calling list_directory on the configured root path.
2. Explore subdirectories as needed to understand the structure.
3. Use get_file_metadata to check modification dates. Skip files that have
NOT been modified since: {last_run_at}.
4. Call list_all_projects to get the user's existing projects.
5. Match files to existing projects by name, folder structure, or content hints.
6. If files don't match any existing project, group them under "standalone".
{custom_prompt_section} ## Project matching rules (STRICT — follow in order)
Target entity types to extract: {data_types} 1. Search the file content for any mention of a project name, client name, acronym, or topic
File extensions to consider: {file_extensions} that overlaps with the existing projects listed below.
2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough.
3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort
when the file has zero meaningful connection to any listed project.
4. When in doubt, pick the closest match from the list.
When you have finished exploring, output ONLY a JSON object (no markdown ## Response format
fences, no explanation) mapping project IDs or "standalone" to file path
arrays:
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}} Respond ONLY with a JSON object — no markdown, no explanation:
Return ONLY the JSON object as your final message. {{"project_id": "<exact id from the list below, or new>", "new_project_name": "<concise 2-5 word name, only when project_id is new>", "domains": ["tasks", "notes"]}}
## Domain definitions (only consider domains in the allowed list)
{domain_definitions}
## Existing projects
{projects_list}
""" """
# ── Processing prompt ───────────────────────────────────────────────────── # ── Step 2: Processing prompt ─────────────────────────────────────────────
_PROCESSING_BASE_PROMPT = """\ _PROCESSING_SYSTEM_PROMPT = """\
You are a data extraction assistant for a freelance project management tool.
Your task: extract structured data from the file content and persist it using the available tools.
## Mandatory process — follow this order for EVERY item you extract
1. READ the existing records listed below for the relevant domain.
2. SEARCH for a match by title, topic, or semantic similarity.
3. If a match exists → call the update_* tool with the existing record's id.
4. If no match exists → call the create_* tool and set isAiSuggested=1.
NEVER call create_* without first checking the existing records.
NEVER duplicate a record that already exists under a different wording.
## Existing records (source of truth)
{existing_context}
## Context
Project: {project_context}
Domains to extract: {data_types}
{custom_prompt_section}
"""
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
_CLOUD_PROCESSING_PROMPT = """\
You are a data extraction and management assistant for a freelance project You are a data extraction and management assistant for a freelance project
management tool. management tool.
@@ -124,26 +178,6 @@ Your task:
4. Do NOT invent data. Only extract what is clearly present in the files. 4. Do NOT invent data. Only extract what is clearly present in the files.
5. If a file contains no relevant data for the target entity types, skip it. 5. If a file contains no relevant data for the target entity types, skip it.
Update-first rules (apply in this order):
Tasks:
- Call list_tasks to find a match by title or context.
- If found: call add_task_comment (author "Adiuva"), update_task to set
assignees, state (ToDo / In Progress / Completed), or other fields.
- If NOT found: call create_task with isAiSuggested=1, isApproved=0.
Timelines:
- Call list_timelines to find a match by title or date.
- If found: call update_timeline to edit fields or mark it complete.
- If NOT found: call create_timeline with isAiSuggested=1, isApproved=0.
Notes:
- Call list_notes to find a match by title or topic, then get_note to
read its current content.
- If found: call update_note with the merged content.
- If NOT found: call create_note with isAiSuggested=1, isApproved=0.
Projects:
- Call list_all_projects to check for a match first.
- Only call create_project if the information is clearly significant and
no existing project matches. Set isAiSuggested=1, isApproved=0.
{project_context} {project_context}
Files to process: Files to process:
@@ -168,7 +202,6 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
try: try:
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
if last_run_at is None: if last_run_at is None:
# Validate the expression before deciding this is overdue.
croniter(schedule_cron, now) croniter(schedule_cron, now)
return True return True
ts = last_run_at ts = last_run_at
@@ -179,7 +212,7 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
return now >= next_run return now >= next_run
except Exception as exc: except Exception as exc:
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc) logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
return False # Fail-safe: don't trigger if expression is invalid. return False
# ── WS executor for agent context ───────────────────────────────────────── # ── WS executor for agent context ─────────────────────────────────────────
@@ -207,7 +240,7 @@ def _make_agent_executor(
return _executor return _executor
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ────────── # ── LLM tool-calling loop ─────────────────────────────────────────────────
def _as_text(content: Any) -> str: def _as_text(content: Any) -> str:
@@ -235,11 +268,7 @@ async def _run_agent_with_tools(
tools: list[Any], tools: list[Any],
max_steps: int, max_steps: int,
) -> str: ) -> str:
"""Run an LLM agent with tool-calling, returning the final text response. """Run an LLM agent with tool-calling, returning the final text response."""
Follows the same pattern as ``deep_agent._run_single_agent``:
bind tools → invoke → handle tool calls → repeat until final text.
"""
llm = get_llm() llm = get_llm()
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [ messages: list[Any] = [
@@ -247,7 +276,6 @@ async def _run_agent_with_tools(
HumanMessage(content=user_message), HumanMessage(content=user_message),
] ]
tool_calls_count = 0
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
for _ in range(max_steps): for _ in range(max_steps):
@@ -258,7 +286,6 @@ async def _run_agent_with_tools(
return _as_text(response.content) return _as_text(response.content)
for call in response.tool_calls: for call in response.tool_calls:
tool_calls_count += 1
call_id = str(call.get("id", "")) call_id = str(call.get("id", ""))
call_name = str(call.get("name", "")) call_name = str(call.get("name", ""))
call_args = call.get("args", {}) call_args = call.get("args", {})
@@ -277,47 +304,19 @@ async def _run_agent_with_tools(
logger.info( logger.info(
"agent_runner: tool_result name=%s output=%s", "agent_runner: tool_result name=%s output=%s",
call_name, call_name,
str(tool_output)[:1200], str(tool_output)[:200],
) )
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"])) messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
# Fallback: exceeded max steps, get final response without tools.
final = await llm.ainvoke(messages) final = await llm.ainvoke(messages)
return _as_text(final.content) return _as_text(final.content)
# ── Triage map parser ─────────────────────────────────────────────────────
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
"""Extract the JSON triage map from the LLM's final response."""
text = raw.strip()
# Try direct parse first.
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown fences or surrounding text.
import re
match = re.search(r"\{[\s\S]*\}", text)
if match:
try:
parsed = json.loads(match.group(0))
if isinstance(parsed, dict):
return {k: v for k, v in parsed.items() if isinstance(v, list)}
except json.JSONDecodeError:
pass
return None
# ── Tool list builder ───────────────────────────────────────────────────── # ── Tool list builder ─────────────────────────────────────────────────────
def _build_processing_tools(data_types: list[str]) -> list[Any]: def _build_processing_tools(data_types: list[str]) -> list[Any]:
"""Build the tool list for Phase 2 based on user's data_types selection.""" """Build the tool list for processing based on user's data_types selection."""
tools: list[Any] = list(FILESYSTEM_TOOLS) tools: list[Any] = list(FILESYSTEM_TOOLS)
for dt in data_types: for dt in data_types:
dt_tools = _DATA_TYPE_TOOLS.get(dt) dt_tools = _DATA_TYPE_TOOLS.get(dt)
@@ -326,7 +325,236 @@ def _build_processing_tools(data_types: list[str]) -> list[Any]:
return tools return tools
# ── Local agent runner (two-phase) ───────────────────────────────────────── # ── Code-based directory scanner ─────────────────────────────────────────
async def _scan_directories(
paths: list[str],
extensions: list[str],
last_run_at: datetime | None,
) -> list[str]:
"""Walk directories via WS tool calls and return filtered file paths.
Recursion is capped at ``_MAX_SCAN_DEPTH``. Files are filtered by
extension (if configured) and by modification date (if ``last_run_at``
is set). Fails open: if metadata cannot be read, the file is included.
"""
all_files: list[str] = []
ext_set = {e.lstrip(".").lower() for e in extensions} if extensions else set()
async def _walk(path: str, depth: int) -> None:
if depth > _MAX_SCAN_DEPTH:
return
try:
result = await execute_on_client(action="list_directory", data={"path": path})
except Exception as exc:
logger.warning("agent_runner: list_directory failed %r: %s", path, exc)
return
for entry in result.get("entries", []):
entry_path = entry.get("path", "")
if not entry_path:
continue
if entry.get("type") == "directory":
await _walk(entry_path, depth + 1)
elif entry.get("type") == "file":
if ext_set:
dot_pos = entry_path.rfind(".")
file_ext = entry_path[dot_pos + 1:].lower() if dot_pos != -1 else ""
if file_ext not in ext_set:
continue
all_files.append(entry_path)
for root in paths:
await _walk(root, depth=0)
if last_run_at is None:
return all_files
# Filter by modification date.
last_run_ms = int(last_run_at.timestamp() * 1000)
filtered: list[str] = []
for file_path in all_files:
try:
meta = await execute_on_client(action="get_file_metadata", data={"path": file_path})
modified_at = meta.get("modifiedAt")
if modified_at is None:
filtered.append(file_path)
continue
if isinstance(modified_at, (int, float)):
mod_ms = int(modified_at)
else:
mod_ms = int(datetime.fromisoformat(str(modified_at)).timestamp() * 1000)
if mod_ms > last_run_ms:
filtered.append(file_path)
except Exception:
filtered.append(file_path) # fail-open
return filtered
# ── Code-based entity fetchers ────────────────────────────────────────────
async def _fetch_projects() -> list[dict]:
"""Fetch all projects from the Electron client via WS."""
try:
result = await execute_on_client(action="select", table="projects")
return result.get("rows", [])
except Exception as exc:
logger.warning("agent_runner: failed to fetch projects: %s", exc)
return []
_DOMAIN_TABLE: dict[str, str] = {
"tasks": "tasks",
"notes": "notes",
"timelines": "timelines",
"projects": "projects",
}
async def _fetch_domain_entities(domain: str, project_id: str) -> list[dict]:
"""Fetch existing rows for a domain, scoped to a project where applicable."""
table = _DOMAIN_TABLE.get(domain)
if not table:
return []
filters: dict[str, Any] = {}
if project_id != "standalone" and domain != "projects":
filters["projectId"] = project_id
try:
result = await execute_on_client(
action="select",
table=table,
filters=filters if filters else None,
)
return result.get("rows", [])
except Exception as exc:
logger.warning("agent_runner: failed to fetch %s: %s", domain, exc)
return []
def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
"""Format existing entity rows as a readable context block for the LLM.
Includes enough detail per record for the LLM to make a confident
update-vs-create decision without overwhelming the context.
Note content is truncated to 200 chars to stay within token budget.
"""
if not rows:
return f"No existing {domain}."
lines: list[str] = []
for r in rows:
if domain == "tasks":
desc = r.get("description") or ""
desc_part = f"{desc[:120]}" if desc else ""
assignee = r.get("assignee") or r.get("assignees") or ""
due = r.get("dueDate") or r.get("due_date") or ""
meta = ", ".join(filter(None, [
f"priority: {r.get('priority', '')}" if r.get("priority") else "",
f"assignee: {assignee}" if assignee else "",
f"due: {due}" if due else "",
]))
lines.append(
f" - [{r.get('status', '?')}] {r.get('title', '')}{desc_part}"
f" ({meta}, id: {r['id']})"
)
elif domain == "notes":
snippet = (r.get("content") or "")[:200].replace("\n", " ")
snippet_part = f"\n Preview: {snippet}" if snippet else ""
lines.append(
f" - {r.get('title', '')} (id: {r['id']}){snippet_part}"
)
elif domain == "timelines":
lines.append(
f" - {r.get('title', '')} date={r.get('date', '')} (id: {r['id']})"
)
elif domain == "projects":
summary = (r.get("aiSummary") or r.get("ai_summary") or "")[:120]
summary_part = f"{summary}" if summary else ""
lines.append(
f" - {r.get('name', '')} [{r.get('status', '')}]{summary_part}"
f" (id: {r['id']})"
)
return f"Existing {domain}:\n" + "\n".join(lines)
# ── Step 1: LLM file classifier ───────────────────────────────────────────
async def _classify_file(
file_path: str,
file_content: str,
projects: list[dict],
config_data_types: list[str],
) -> tuple[str, list[str], str | None]:
"""Call the LLM to classify a file by project and relevant domains.
Returns ``(project_id_or_"new", domains, new_project_name_or_None)``.
- ``project_id`` is an existing project UUID, or ``"new"`` when no match found.
- ``new_project_name`` is only set when ``project_id == "new"``.
Falls back to ``("new", config_data_types, None)`` on any error.
"""
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
if not file_content.strip():
return fallback
valid_project_ids = {p["id"] for p in projects}
def _fmt_project(p: dict) -> str:
summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip()
summary_part = f"{summary[:100]}" if summary else ""
return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}"
projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)"
domain_definitions = "\n".join(
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}"
for d in config_data_types
if d in _DOMAIN_DESCRIPTIONS
)
system = _STEP1_SYSTEM_PROMPT.format(
domain_definitions=domain_definitions,
projects_list=projects_list,
)
llm = get_llm()
try:
response = await llm.ainvoke([
SystemMessage(content=system),
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
])
raw = _as_text(response.content).strip()
# Strip markdown fences if the model wraps the JSON.
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
parsed = json.loads(raw.strip())
raw_project_id: str = str(parsed.get("project_id") or "new")
# Reject hallucinated UUIDs — only accept ids that exist in the fetched list.
project_id = raw_project_id if raw_project_id in valid_project_ids else "new"
new_project_name: str | None = (
str(parsed["new_project_name"]).strip() or None
if project_id == "new" and parsed.get("new_project_name")
else None
)
domains: list[str] = [
d for d in parsed.get("domains", [])
if d in config_data_types
]
if not domains:
domains = list(config_data_types)
return project_id, domains, new_project_name
except Exception as exc:
logger.warning(
"agent_runner: step1 classification failed for %r: %s", file_path, exc
)
return fallback
# ── Local agent runner (two-step per file) ────────────────────────────────
async def run_local_agent( async def run_local_agent(
@@ -336,24 +564,28 @@ async def run_local_agent(
device_mgr: DeviceConnectionManager, device_mgr: DeviceConnectionManager,
run_context: dict | None = None, run_context: dict | None = None,
) -> None: ) -> None:
"""Execute a local directory agent run using two-phase LLM-with-tools. """Execute a local directory agent run using a two-step approach per file.
Phase 1 — Triage: Step 1 — Classification (code + 1 LLM call per file, no tools):
Explore the directory structure, check metadata, match files to Code scans directories and fetches all projects via WS.
existing projects. Output: a JSON map of project → file paths. For each file, LLM identifies the project and relevant domains.
Phase 2 — Processing: Step 2 — Processing (code + 1 LLM call per file, with tools):
For each project group, read full file contents and perform CRUD Code fetches existing entities for the identified project/domains.
operations using the standard entity tools. LLM receives file content + existing entities in context and uses
tools to update existing records or create new ones.
""" """
run_id = run_log.id run_id = run_log.id
agent_id = (run_context or {}).get("agent_id") or config.id
_running_agents.add(agent_id)
# ── Device online check ───────────────────────────────────────── # ── Device online check ─────────────────────────────────────────
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else "" target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
if target_device_id: is_online = (
is_online = device_mgr.is_online(user_id, target_device_id) device_mgr.is_online(user_id, target_device_id)
else: if target_device_id
is_online = device_mgr.is_online(user_id) else device_mgr.is_online(user_id)
)
if not is_online: if not is_online:
logger.info( logger.info(
@@ -377,116 +609,124 @@ async def run_local_agent(
items_processed = 0 items_processed = 0
items_created = 0 items_created = 0
custom_section = (
f"User instructions:\n{config.prompt_template}"
if config.prompt_template
else ""
)
try: try:
# ── Phase 1: Triage ───────────────────────────────────────── # ── Code: scan directories ───────────────────────────────────
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id) logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id)
file_paths = await _scan_directories(
last_run_str = "never (process all files)" paths=config.directory_paths,
if config.last_run_at: extensions=config.file_extensions or [],
last_run_str = config.last_run_at.isoformat() last_run_at=config.last_run_at,
)
custom_section = "" logger.info(
if config.prompt_template: "agent_runner: run=%s found %d file(s) after filtering", run_id, len(file_paths)
custom_section = f"User instructions:\n{config.prompt_template}"
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
last_run_at=last_run_str,
custom_prompt_section=custom_section,
data_types=", ".join(config.data_types),
file_extensions=file_ext_str,
) )
directory_paths = config.directory_paths if not file_paths:
triage_user_msg = ( await _finalize_run(run_log, status="success", items_processed=0, items_created=0)
f"Explore these directories and produce the triage map:\n"
f"{json.dumps(directory_paths, ensure_ascii=False)}"
)
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
triage_response = await _run_agent_with_tools(
system_prompt=triage_prompt,
user_message=triage_user_msg,
tools=triage_tools,
max_steps=_MAX_TRIAGE_STEPS,
)
triage_map = _parse_triage_map(triage_response)
if not triage_map:
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
await _finalize_run(run_log, status="error", errors=errors)
return return
logger.info( # ── Code: fetch all projects once ────────────────────────────
"agent_runner: run=%s triage complete groups=%d total_files=%d", projects = await _fetch_projects()
run_id,
len(triage_map), for file_path in file_paths:
sum(len(files) for files in triage_map.values()), try:
# Read file content via code.
file_result = await execute_on_client(
action="read_file_content", data={"path": file_path}
) )
file_content: str = file_result.get("content", "")
# ── Phase 2: Processing (per group) ───────────────────────── if not file_content:
processing_tools = _build_processing_tools(config.data_types) logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path)
for group_key, file_paths in triage_map.items():
if not file_paths:
continue continue
items_processed += 1
# Step 1 — classify file.
project_id, domains, new_project_name = await _classify_file(
file_path=file_path,
file_content=file_content,
projects=projects,
config_data_types=config.data_types,
)
logger.info( logger.info(
"agent_runner: run=%s phase=processing group=%s files=%d", "agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s",
run_id, run_id,
group_key, file_path,
len(file_paths), project_id,
new_project_name,
domains,
) )
# Build project context for the LLM. # Step 2 — fetch existing entities for this project + domains.
if group_key == "standalone": # When project_id is "new", entities are fetched without a project
project_context = "These files are not associated with any existing project." # filter; the LLM will create the project and link records to it.
effective_project_id = project_id if project_id != "new" else "standalone"
existing_blocks: list[str] = []
for domain in domains:
rows = await _fetch_domain_entities(domain, effective_project_id)
existing_blocks.append(_format_entities_for_context(domain, rows))
existing_context = "\n\n".join(existing_blocks)
if project_id == "new":
name_hint = f' Use "{new_project_name}" as the project name.' if new_project_name else ""
project_context = (
f"No existing project matches this file. "
f"Create a new project first using the create_project tool, "
f"then link all extracted records to its id.{name_hint}"
)
# Ensure the LLM has the project tools available.
if "projects" not in domains:
domains = ["projects"] + domains
else: else:
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records." project_context = (
f"This file belongs to project ID: {project_id}. "
"Use this project_id when creating records."
)
file_list_str = "\n".join(f"- {fp}" for fp in file_paths) system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
existing_context=existing_context,
processing_prompt = _PROCESSING_BASE_PROMPT.format(
data_types=", ".join(config.data_types),
project_context=project_context, project_context=project_context,
file_list=file_list_str, data_types=", ".join(domains),
custom_prompt_section=custom_section, custom_prompt_section=custom_section,
) )
items_processed += len(file_paths) processing_tools = _build_processing_tools(domains)
try:
result_text = await _run_agent_with_tools( result_text = await _run_agent_with_tools(
system_prompt=processing_prompt, system_prompt=system_prompt,
user_message="Process the listed files now.", user_message=(
f"Process this file and extract relevant information.\n\n"
f"File: {file_path}\n\nContent:\n{file_content}"
),
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
) )
logger.info( logger.info(
"agent_runner: run=%s group=%s processing_result=%s", "agent_runner: run=%s file=%r result=%s",
run_id, run_id,
group_key, file_path,
result_text[:500], result_text[:200],
) )
# Count created items by scanning tool call results.
# The tools themselves handle creation; we estimate from the
# summary. A more precise count would require intercepting
# tool results, but the summary is sufficient for the run log.
except Exception as exc: except Exception as exc:
errors.append(f"Processing error for group '{group_key}': {exc}") errors.append(f"Error processing '{file_path}': {exc}")
logger.error( logger.error(
"agent_runner: run=%s group=%s processing failed: %s", "agent_runner: run=%s file=%r failed: %s", run_id, file_path, exc
run_id,
group_key,
exc,
) )
except Exception as exc: except Exception as exc:
errors.append(f"Agent run failed: {exc}") errors.append(f"Agent run failed: {exc}")
logger.error("agent_runner: run=%s failed: %s", run_id, exc) logger.error("agent_runner: run=%s failed: %s", run_id, exc)
finally: finally:
_running_agents.discard(agent_id)
clear_client_executor() clear_client_executor()
# ── Finalise ──────────────────────────────────────────────────── # ── Finalise ────────────────────────────────────────────────────
@@ -503,9 +743,6 @@ async def run_local_agent(
items_processed=items_processed, items_processed=items_processed,
items_created=items_created, items_created=items_created,
errors=errors, errors=errors,
update_config_last_run=False,
config_id=config.id,
config_type="local",
) )
logger.info( logger.info(
"agent_runner: run=%s done status=%s processed=%d errors=%d", "agent_runner: run=%s done status=%s processed=%d errors=%d",
@@ -515,8 +752,7 @@ async def run_local_agent(
len(errors), len(errors),
) )
# Notify the Electron client that the run is complete so it can close # Notify Electron that the run is complete.
# the run record in its local SQLite.
if run_context and device_mgr.is_online(user_id): if run_context and device_mgr.is_online(user_id):
try: try:
await device_mgr.send_frame(user_id, { await device_mgr.send_frame(user_id, {
@@ -525,12 +761,13 @@ async def run_local_agent(
"status": final_status, "status": final_status,
}) })
except Exception as exc: except Exception as exc:
logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc) logger.warning(
"agent_runner: run=%s failed to send run_complete: %s", run_id, exc
)
# ── Cloud agent runner ───────────────────────────────────────────────────── # ── Cloud agent runner ─────────────────────────────────────────────────────
# Default lookback window when an agent has never run before.
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7 _CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
@@ -544,8 +781,7 @@ async def run_cloud_agent(
Steps: Steps:
1. Verify the user's device is online — results are pushed to Electron 1. Verify the user's device is online.
via WS tool-call frames. If no device is connected, abort.
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``. 2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
3. Instantiate the provider client (Gmail or MS Graph). 3. Instantiate the provider client (Gmail or MS Graph).
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for 4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
@@ -598,11 +834,7 @@ async def run_cloud_agent(
try: try:
provider = get_provider(config.provider, credentials_info) provider = get_provider(config.provider, credentials_info)
except ValueError as exc: except ValueError as exc:
await _finalize_run( await _finalize_run(run_log, status="error", errors=[str(exc)])
run_log,
status="error",
errors=[str(exc)],
)
return return
# ── 4. Fetch messages ───────────────────────────────────────────── # ── 4. Fetch messages ─────────────────────────────────────────────
@@ -636,9 +868,7 @@ async def run_cloud_agent(
raw_messages = [] raw_messages = []
except RuntimeError as exc: except RuntimeError as exc:
logger.error( logger.error(
"agent_runner: provider fetch failed for cloud agent %s: %s", "agent_runner: provider fetch failed for cloud agent %s: %s", config.id, exc
config.id,
exc,
) )
await _finalize_run( await _finalize_run(
run_log, run_log,
@@ -664,9 +894,11 @@ async def run_cloud_agent(
try: try:
processing_tools = _build_processing_tools(config.data_types) processing_tools = _build_processing_tools(config.data_types)
custom_section = "" custom_section = (
if config.prompt_template: f"User instructions:\n{config.prompt_template}"
custom_section = f"User instructions:\n{config.prompt_template}" if config.prompt_template
else ""
)
for msg in raw_messages: for msg in raw_messages:
content_text = msg.as_text content_text = msg.as_text
@@ -674,7 +906,7 @@ async def run_cloud_agent(
continue continue
items_processed += 1 items_processed += 1
processing_prompt = _PROCESSING_BASE_PROMPT.format( processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
data_types=", ".join(config.data_types), data_types=", ".join(config.data_types),
project_context="Determine the appropriate project from the message context.", project_context="Determine the appropriate project from the message context.",
file_list=f"Message from {config.provider} (id: {msg.id})", file_list=f"Message from {config.provider} (id: {msg.id})",
@@ -708,7 +940,11 @@ async def run_cloud_agent(
await db.commit() await db.commit()
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id) logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
except Exception as exc: except Exception as exc:
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc) logger.warning(
"agent_runner: failed to persist refreshed token for agent %s: %s",
config.id,
exc,
)
# ── 8. Finalise ──────────────────────────────────────────────────── # ── 8. Finalise ────────────────────────────────────────────────────
if errors and items_created == 0: if errors and items_created == 0:
@@ -749,12 +985,6 @@ async def trigger_pending_runs(
"""Dispatch any overdue agent runs after an Electron device connects. """Dispatch any overdue agent runs after an Electron device connects.
Called as a background task from the device WS endpoint on ``device_hello``. Called as a background task from the device WS endpoint on ``device_hello``.
Scheduling rules:
* **Local agents**: only triggered when ``config.device_id == device_id``.
* **Cloud agents**: triggered on any connected device (no device binding).
* Runs execute **sequentially** to avoid flooding the WS connection.
""" """
logger.info( logger.info(
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)", "agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
@@ -778,11 +1008,7 @@ async def _finalize_run(
config_id: str | None = None, config_id: str | None = None,
config_type: str | None = None, config_type: str | None = None,
) -> None: ) -> None:
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``. """Persist the run outcome and optionally update ``last_run_at`` on the config."""
Uses a fresh DB session so this is safe to call from background tasks
after the original request session has closed.
"""
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
try: try:
async with async_session() as db: async with async_session() as db:

View File

@@ -1,8 +1,10 @@
# Adiuva — Architettura Microservizi # Adiuva — Architettura Microservizi (MVP)
## Panoramica ## Panoramica
Il monolite attuale viene suddiviso in **5 servizi** + un **API Gateway**, orchestrati con Docker Compose e raggiungibili tramite dominio su Cloudflare. Il monolite viene suddiviso in **4 servizi MVP** + un **API Gateway (Traefik)**, orchestrati con Docker Compose su un singolo VPS raggiungibile via Cloudflare.
> **Fuori dall'MVP**: Storage Service (S3/backup CRUD) e Plugin Service (marketplace). Verranno aggiunti come servizi indipendenti in una fase successiva.
``` ```
┌──────────────┐ ┌──────────────┐
@@ -14,20 +16,21 @@ Il monolite attuale viene suddiviso in **5 servizi** + un **API Gateway**, orche
│ Traefik │ │ Traefik │
│ API Gateway │ │ API Gateway │
│ (routing, │ │ (routing, │
│ TLS term.) │ TLS, rate
│ limiting) │
└──────┬───────┘ └──────┬───────┘
┌──────────┬───────────┼───────────┬────────── ┌──────────┬───────────┼───────────┐
│ │ │ │ │ │ │ │
┌─────▼────┐ ┌───▼───┐ ┌────▼────┐ ┌────▼───┐ ┌───▼─────┐ ┌─────▼────┐ ┌───▼───┐ ┌────▼────┐ ┌────▼───┐
│ Auth │ │ Chat │ │ Storage │ │Billing │ │ Plugins │ │ Auth │ │ Chat │ │ Agent │ │Billing │
│ Service │ │Service│ │ Service │ │Service │ │ Service │ │ Service │ │Service│ │ Service │ │Service │
└─────┬────┘ └───┬───┘ └────┬────┘ └────┬───┘ └───┬─────┘ └─────┬────┘ └───┬───┘ └────┬────┘ └────┬───┘
│ │ │ │ │ │ │ │
┌─────▼──────────▼──────────▼───────────▼──────────▼───── ┌─────▼──────────▼──────────▼───────────▼────┐
│ Infrastruttura │ │ Infrastruttura │
│ PostgreSQL Redis │ MinIO (S3) │ Qdrant │ (Pinecone) │ PostgreSQL Redis │ Qdrant
└──────────────────────────────────────────────────────── └─────────────────────────────────────────────┘
``` ```
--- ---
@@ -83,46 +86,68 @@ def verify_token(token: str) -> dict:
--- ---
### 1.2 Chat Service (`chat-service`) ⭐ Core ### 1.2 Chat Service (`chat-service`) ⭐ Real-time
**Responsabilità**: WebSocket device, home chat, floating chat, agent runner, memory middleware, agent setup journeys. **Responsabilità**: WebSocket device connection, home chat, floating chat, memory middleware, streaming LLM responses verso il client.
| Endpoint originale | Tipo | Questo servizio gestisce la **connessione persistente** con l'app Electron e le interazioni **real-time** dell'utente (chat home, floating chat). È il proprietario della WebSocket.
| Endpoint | Tipo |
|---|---| |---|---|
| `/api/v1/ws/device` | WebSocket | | `/api/v1/ws/device` | WebSocket (connessione persistente) |
| `/api/v1/chat` | POST (REST fallback) | | `/api/v1/chat` | POST (REST fallback) |
| `/api/v1/agents/catalog` | GET |
| `/api/v1/agents/can-create` | POST |
| `/api/v1/agents/trigger` | POST |
**Moduli inclusi**: `deep_agent`, `agent_runner`, `agent_registry`, `memory_middleware`, `ws_context`, `device_manager`, tutti gli agent tools (`task_agent`, `project_agent`, `note_agent`, `timeline_agent`, `filesystem_agent`). **Moduli inclusi**: `deep_agent`, `memory_middleware`, `ws_context`, `device_manager` (Redis-backed), `output_formatter`, `llm`, tutti gli agent tools (`task_agent`, `project_agent`, `note_agent`, `timeline_agent`).
**Questa è la bestia che deve scalare orizzontalmente** — è il servizio più CPU/memory intensive (LLM calls, tool loops, WebSocket persistenti). **Perché separato dall'Agent Service**: Il Chat Service tiene la WebSocket aperta e risponde in tempo reale (streaming). Scalare aggiungendo repliche è semplice con sticky sessions + Redis pub/sub per il cross-instance routing dei tool_call.
**Scaling**: 2N repliche. Sticky cookies per le WS + Redis per cross-instance.
--- ---
### 1.3 Storage Service (`storage-service`) ### 1.3 Agent Service (`agent-service`) ⭐ Batch
**Responsabilità**: CRUD record crittografati su S3, vector operations, backup. **Responsabilità**: Batch agent processing (directory scanning, file classification, entity extraction), agent setup journeys, agent configuration CRUD.
| Endpoint originale | Metodo | Questo servizio gestisce i processi **long-running** e **CPU-intensive**: scansione filesystem, classificazione file con LLM, estrazione entità in batch. Non possiede la WebSocket — comunica con il device dell'utente tramite **Redis pub/sub** passando per il Chat Service.
| Endpoint | Tipo |
|---|---| |---|---|
| `/api/v1/storage/records` | POST / GET | | `/api/v1/agents/catalog` | GET |
| `/api/v1/storage/records/{id}` | GET / PUT / DELETE | | `/api/v1/agents/can-create` | POST |
| `/api/v1/vectors/upsert` | POST | | `/api/v1/agents/trigger` | POST |
| `/api/v1/vectors/search` | POST | | `/api/v1/agents/journey/start` | POST (o WS relay) |
| `/api/v1/vectors/embed` | POST | | `/api/v1/agents/journey/message` | POST (o WS relay) |
| `/api/v1/vectors` | DELETE |
| `/api/v1/backup` | PUT / GET / DELETE |
| `/api/v1/backup/history` | GET |
**Scaling**: 23 repliche. I/O bound (S3, Qdrant). Stateless. **Moduli inclusi**: `agent_runner`, `agent_registry`, `filesystem_agent`, `llm`.
**Flusso tool-call cross-service** (l'Agent Service non ha la WS):
```
┌──────────────┐ ┌──────────────┐ ┌──────────┐
│ Agent Service│ │ Redis │ │ Chat │
│ (batch run) │ │ │ │ Service │
│ │ │ │ │ (ha WS) │
│ 1. Needs to │ PUBLISH │ │ SUBSCRIBE │ │
│ read file ├───────────►│tool_call:u123├───────────►│ 2. Invia │
│ from │ │ │ │ al │
│ device │ │ │ │ device│
│ │ │ │ │ via WS│
│ │ SUBSCRIBE │ │ PUBLISH │ │
│ 4. Riceve ◄────────────┤tool_result:id│◄───────────┤ 3. Device│
│ risultato │ │ │ │ reply │
└──────────────┘ └──────────────┘ └──────────┘
```
**Scaling**: 1N repliche. Completamente stateless, scala indipendentemente dalla chat. Ogni replica processa batch job diversi. Può essere scalato a 0 se non ci sono agent attivi (risparmio risorse).
**Vantaggio dello split**: Se 50 utenti triggerano agenti batch contemporaneamente, il Chat Service non ne risente — le risposte real-time rimangono veloci.
--- ---
### 1.4 Billing Service (`billing-service`) ### 1.4 Billing Service (`billing-service`)
**Responsabilità**: Stripe checkout, webhook, subscription management, tier enforcement. **Responsabilità**: Stripe checkout, webhook, subscription management.
| Endpoint originale | Metodo | | Endpoint originale | Metodo |
|---|---| |---|---|
@@ -132,31 +157,125 @@ def verify_token(token: str) -> dict:
**Database**: Tabelle `subscriptions` (schema `billing`). **Database**: Tabelle `subscriptions` (schema `billing`).
**Comunicazione inter-servizio**: Quando Stripe invia un webhook e il tier cambia, il Billing Service pubblica un evento su **Redis pub/sub** channel `tier_changed:{user_id}`. L'Auth Service aggiorna il campo `tier` nella tabella users (oppure i servizi leggono il tier direttamente dal JWT, aggiornato al prossimo refresh). **Comunicazione inter-servizio**: Quando Stripe invia un webhook e il tier cambia, il Billing Service pubblica un evento su **Redis pub/sub** channel `tier_changed:{user_id}`. L'Auth Service aggiorna il campo `tier` nella tabella users. Al prossimo token refresh il JWT conterrà il tier aggiornato.
**Scaling**: 1 replica sufficiente. Basso traffico. **Scaling**: 1 replica sufficiente. Basso traffico.
--- ---
### 1.5 Plugin Service (`plugin-service`) ### 1.5 Servizi esclusi dall'MVP
**Responsabilità**: Marketplace, installazione plugin, revenue split. I seguenti servizi verranno aggiunti post-MVP come servizi indipendenti:
| Endpoint originale | Metodo | | Servizio | Responsabilità | Note |
|---|---| |---|---|---|
| `/api/v1/plugins` | GET | | **Storage Service** | S3 blobs CRUD, vector ops, backup | Le funzionalità vector/embed possono restare nel Chat Service per il MVP |
| `/api/v1/plugins/{id}` | GET | | **Plugin Service** | Marketplace, install, revenue split | Feature non critica per il lancio |
| `/api/v1/plugins/{id}/install` | POST / DELETE |
**Database**: Tabelle `plugins`, `plugin_installations`, `revenue_events`.
**Scaling**: 1 replica. Basso traffico.
--- ---
## 2. WebSocket con Scaling Orizzontale — Il Problema Chiave ## 2. Tier Check — Dove e Come
### Il problema attuale Il tier dell'utente (free/pro/power/team) determina rate-limiting, quote e accesso a funzionalità. Con i microservizi, **ogni servizio controlla il tier autonomamente** senza chiamare l'Auth Service.
### Strategia: Tier nel JWT
L'Auth Service include il `tier` come claim nel JWT al momento del login/refresh:
```json
{
"sub": "user_123",
"tier": "pro",
"exp": 1742515200,
"iat": 1742511600
}
```
Ogni servizio:
1. Decodifica il JWT con la chiave pubblica (già lo fa per l'auth)
2. Legge `payload["tier"]`**zero chiamate extra**
3. Applica le sue regole di enforcement localmente
```python
# shared/auth.py — dependency FastAPI condivisa
from fastapi import Depends, HTTPException, Request
from jose import jwt
PUBLIC_KEY = ...
class CurrentUser:
def __init__(self, user_id: str, tier: str):
self.user_id = user_id
self.tier = tier
async def get_current_user(request: Request) -> CurrentUser:
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
return CurrentUser(user_id=payload["sub"], tier=payload["tier"])
def require_tier(*allowed_tiers: str):
"""Dependency che blocca se il tier non è tra quelli ammessi."""
async def check(user: CurrentUser = Depends(get_current_user)):
if user.tier not in allowed_tiers:
raise HTTPException(403, "Tier insufficient")
return user
return check
```
### Cosa succede quando il tier cambia (upgrade/downgrade)?
```
┌──────────┐ Stripe webhook ┌──────────┐ tier_changed ┌──────────┐
│ Stripe │ ─────────────────►│ Billing │ ───────────────►│ Auth │
│ │ │ Service │ (Redis pub/sub) │ Service │
└──────────┘ └──────────┘ └────┬─────┘
UPDATE users
SET tier = 'power'
Al prossimo /refresh
il JWT conterrà tier='power'
```
**Latenza del cambio**: Il tier si propaga al prossimo token refresh (tipicamente 1530 min, o il client può forzare un refresh immediato dopo il checkout). Per il billing webhook, il downgrade può essere forzato invalidando il refresh token su Redis → il client è obbligato a ri-autenticarsi.
### Dove si applica in ciascun servizio
| Servizio | Enforcement |
|---|---|
| **Auth Service** | Nessuno (è lui che scrive il tier) |
| **Chat Service** | Rate-limit per tier (req/min), quota messaggi |
| **Agent Service** | Max agent configs, max runs/day, max concurrent batches |
| **Billing Service** | Nessuno (gestisce i tier, non li consuma) |
### Rate-limit distribuito via Redis
Poiché ogni servizio ha le sue repliche, il rate-limiting deve essere **condiviso** via Redis:
```python
# shared/middleware/rate_limit.py
import redis.asyncio as aioredis
class DistributedRateLimiter:
def __init__(self, redis: aioredis.Redis):
self._redis = redis
async def check(self, user_id: str, tier: str, service: str) -> bool:
limits = {"free": 20, "pro": 60, "power": 120, "team": 200}
max_req = limits.get(tier, 20)
key = f"rate:{service}:{user_id}"
pipe = self._redis.pipeline()
pipe.incr(key)
pipe.expire(key, 60)
count, _ = await pipe.execute()
return count <= max_req
```
---
## 3. WebSocket con Scaling Orizzontale — Il Problema Chiave
`DeviceConnectionManager` è un **singleton in-memory**: `DeviceConnectionManager` è un **singleton in-memory**:
@@ -354,7 +473,7 @@ class RedisDeviceManager:
--- ---
## 3. Struttura Directory Proposta ## 4. Struttura Directory Proposta (MVP)
``` ```
adiuva-api/ adiuva-api/
@@ -364,7 +483,7 @@ adiuva-api/
│ ├── auth.py # JWT verification (chiave pubblica) │ ├── auth.py # JWT verification (chiave pubblica)
│ ├── schemas.py # Pydantic schemas condivisi │ ├── schemas.py # Pydantic schemas condivisi
│ ├── middleware/ │ ├── middleware/
│ │ ├── rate_limit.py │ │ ├── rate_limit.py # DistributedRateLimiter (Redis)
│ │ └── sanitizer.py │ │ └── sanitizer.py
│ └── models/ │ └── models/
│ └── base.py # SQLAlchemy base condivisa │ └── base.py # SQLAlchemy base condivisa
@@ -390,42 +509,45 @@ adiuva-api/
│ ├── main.py │ ├── main.py
│ ├── config.py │ ├── config.py
│ ├── db.py │ ├── db.py
│ ├── models.py # agent_run_logs, memory_* │ ├── models.py # memory_*
│ ├── routes/ │ ├── routes/
│ │ ├── device_ws.py │ │ ├── device_ws.py # WS connection owner
│ │ ── chat.py │ │ ── chat.py # REST fallback
│ │ └── agents.py
│ ├── core/ │ ├── core/
│ │ ├── device_manager.py # RedisDeviceManager │ │ ├── device_manager.py # RedisDeviceManager
│ │ ├── deep_agent.py │ │ ├── deep_agent.py # Home + floating chat
│ │ ├── agent_runner.py
│ │ ├── agent_registry.py
│ │ ├── memory_middleware.py │ │ ├── memory_middleware.py
│ │ ├── ws_context.py │ │ ├── ws_context.py
│ │ ├── output_formatter.py │ │ ├── output_formatter.py
│ │ └── llm.py │ │ └── llm.py
│ └── agents/ │ └── agents/ # Tool definitions (used by deep_agent)
│ ├── task_agent.py │ ├── task_agent.py
│ ├── project_agent.py │ ├── project_agent.py
│ ├── note_agent.py │ ├── note_agent.py
── timeline_agent.py ── timeline_agent.py
│ └── filesystem_agent.py
├── storage-service/ ├── agent-service/
│ ├── Dockerfile │ ├── Dockerfile
│ ├── requirements.txt │ ├── requirements.txt
│ └── app/ │ └── app/
│ ├── main.py │ ├── main.py
│ ├── config.py │ ├── config.py
│ ├── db.py │ ├── db.py
│ ├── models.py # storage_records, backup_metadata │ ├── models.py # agent_run_logs, local/cloud_agent_configs
│ ├── routes/ │ ├── routes/
│ │ ├── storage.py │ │ ├── agents.py # catalog, can-create, trigger
│ │ ── vectors.py │ │ ── agent_setup.py # journey start/message
│ └── backup.py ├── core/
└── services/ │ ├── agent_runner.py # Batch classify → process
├── blob_store.py ├── agent_registry.py
── vector_store.py ── redis_executor.py # execute_on_client via Redis pub/sub
│ │ └── llm.py
│ └── agents/
│ ├── task_agent.py # Tool definitions (batch context)
│ ├── project_agent.py
│ ├── note_agent.py
│ ├── timeline_agent.py
│ └── filesystem_agent.py
├── billing-service/ ├── billing-service/
│ ├── Dockerfile │ ├── Dockerfile
@@ -441,26 +563,18 @@ adiuva-api/
│ ├── stripe_service.py │ ├── stripe_service.py
│ └── tier_manager.py │ └── tier_manager.py
├── plugin-service/
│ ├── Dockerfile
│ ├── requirements.txt
│ └── app/
│ ├── main.py
│ ├── config.py
│ ├── db.py
│ ├── models.py # plugins, installations, revenue
│ └── routes/
│ └── plugins.py
└── infra/ └── infra/
├── traefik/ ├── traefik/
│ └── traefik.yml │ └── traefik.yml
├── keys/
│ ├── jwt_private.pem # Solo auth-service
│ └── jwt_public.pem # Tutti i servizi
└── alembic/ # Migrazioni condivise o per-servizio └── alembic/ # Migrazioni condivise o per-servizio
``` ```
--- ---
## 4. Docker Compose — Configurazione Completa ## 5. Docker Compose — Configurazione MVP
```yaml ```yaml
# docker-compose.yml # docker-compose.yml
@@ -478,14 +592,14 @@ services:
- "--providers.docker.exposedbydefault=false" - "--providers.docker.exposedbydefault=false"
- "--entrypoints.web.address=:80" - "--entrypoints.web.address=:80"
- "--entrypoints.websecure.address=:443" - "--entrypoints.websecure.address=:443"
# Cloudflare gestisce TLS, Traefik riceve HTTP dal proxy
- "--entrypoints.web.http.redirections.entrypoint.to=websecure" - "--entrypoints.web.http.redirections.entrypoint.to=websecure"
ports: ports:
- "80:80" - "80:80"
- "443:443" - "443:443"
- "8080:8080" # Dashboard Traefik - "8080:8080" # Dashboard Traefik (disabilitare in prod)
volumes: volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro - /var/run/docker.sock:/var/run/docker.sock:ro
- ./infra/certs:/certs:ro
restart: unless-stopped restart: unless-stopped
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
@@ -498,10 +612,12 @@ services:
env_file: .env env_file: .env
environment: environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
REDIS_URL: redis://redis:6379
JWT_PRIVATE_KEY_FILE: /run/secrets/jwt_private_key JWT_PRIVATE_KEY_FILE: /run/secrets/jwt_private_key
SERVICE_NAME: auth SERVICE_NAME: auth
secrets: secrets:
- jwt_private_key - jwt_private_key
- jwt_public_key
labels: labels:
- "traefik.enable=true" - "traefik.enable=true"
- "traefik.http.routers.auth.rule=PathPrefix(`/api/v1/auth`)" - "traefik.http.routers.auth.rule=PathPrefix(`/api/v1/auth`)"
@@ -509,14 +625,16 @@ services:
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
redis:
condition: service_healthy
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
# Chat Service (scalabile, N repliche) # Chat Service — Real-time WS + Chat (scalabile)
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
chat-service: chat-service:
build: ./chat-service build: ./chat-service
deploy: deploy:
replicas: 3 replicas: 2
env_file: .env env_file: .env
environment: environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
@@ -527,8 +645,8 @@ services:
- jwt_public_key - jwt_public_key
labels: labels:
- "traefik.enable=true" - "traefik.enable=true"
# REST routes # REST chat endpoint
- "traefik.http.routers.chat.rule=PathPrefix(`/api/v1/chat`) || PathPrefix(`/api/v1/agents`)" - "traefik.http.routers.chat.rule=PathPrefix(`/api/v1/chat`)"
- "traefik.http.services.chat.loadbalancer.server.port=8000" - "traefik.http.services.chat.loadbalancer.server.port=8000"
# WebSocket route con sticky session # WebSocket route con sticky session
- "traefik.http.routers.ws.rule=PathPrefix(`/api/v1/ws`)" - "traefik.http.routers.ws.rule=PathPrefix(`/api/v1/ws`)"
@@ -543,26 +661,29 @@ services:
condition: service_healthy condition: service_healthy
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
# Storage Service (2 repliche) # Agent Service — Batch processing (scalabile indipendentemente)
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
storage-service: agent-service:
build: ./storage-service build: ./agent-service
deploy: deploy:
replicas: 2 replicas: 2
env_file: .env env_file: .env
environment: environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
REDIS_URL: redis://redis:6379
JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key
SERVICE_NAME: storage SERVICE_NAME: agent
secrets: secrets:
- jwt_public_key - jwt_public_key
labels: labels:
- "traefik.enable=true" - "traefik.enable=true"
- "traefik.http.routers.storage.rule=PathPrefix(`/api/v1/storage`) || PathPrefix(`/api/v1/vectors`) || PathPrefix(`/api/v1/backup`)" - "traefik.http.routers.agents.rule=PathPrefix(`/api/v1/agents`)"
- "traefik.http.services.storage.loadbalancer.server.port=8000" - "traefik.http.services.agents.loadbalancer.server.port=8000"
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
redis:
condition: service_healthy
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
# Billing Service (1 replica) # Billing Service (1 replica)
@@ -589,28 +710,6 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
# ══════════════════════════════════════════════════════════
# Plugin Service (1 replica)
# ══════════════════════════════════════════════════════════
plugin-service:
build: ./plugin-service
deploy:
replicas: 1
env_file: .env
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key
SERVICE_NAME: plugins
secrets:
- jwt_public_key
labels:
- "traefik.enable=true"
- "traefik.http.routers.plugins.rule=PathPrefix(`/api/v1/plugins`)"
- "traefik.http.services.plugins.loadbalancer.server.port=8000"
depends_on:
db:
condition: service_healthy
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
# Infrastruttura # Infrastruttura
# ══════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════
@@ -641,19 +740,6 @@ services:
retries: 5 retries: 5
restart: unless-stopped restart: unless-stopped
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio_data:/data
restart: unless-stopped
qdrant: qdrant:
image: qdrant/qdrant:latest image: qdrant/qdrant:latest
volumes: volumes:
@@ -669,22 +755,21 @@ secrets:
volumes: volumes:
postgres_data: postgres_data:
redis_data: redis_data:
minio_data:
qdrant_data: qdrant_data:
``` ```
--- ---
## 5. Configurazione Cloudflare + VPS ## 6. Configurazione Cloudflare + VPS
### 5.1 DNS ### 6.1 DNS
``` ```
api.tuodominio.com → A record → IP del VPS api.tuodominio.com → A record → IP del VPS
→ Proxy: ON (orange cloud) → Proxy: ON (orange cloud)
``` ```
### 5.2 Cloudflare Settings ### 6.2 Cloudflare Settings
| Setting | Valore | Motivo | | Setting | Valore | Motivo |
|---------|--------|--------| |---------|--------|--------|
@@ -693,7 +778,7 @@ api.tuodominio.com → A record → IP del VPS
| Proxy timeout | **100s** (Enterprise) o default | Le LLM calls possono durare 30s+ | | Proxy timeout | **100s** (Enterprise) o default | Le LLM calls possono durare 30s+ |
| Under Attack Mode | Off (attivare se necessario) | | | Under Attack Mode | Off (attivare se necessario) | |
### 5.3 TLS sul VPS ### 6.3 TLS sul VPS
Due opzioni: Due opzioni:
- **Opzione A (consigliata)**: Cloudflare Origin Certificate → montato in Traefik - **Opzione A (consigliata)**: Cloudflare Origin Certificate → montato in Traefik
@@ -711,7 +796,7 @@ tls:
keyFile: /certs/origin-key.pem keyFile: /certs/origin-key.pem
``` ```
### 5.4 Rete VPS ### 6.4 Rete VPS
```bash ```bash
# UFW firewall — solo Cloudflare può raggiungere le porte 80/443 # UFW firewall — solo Cloudflare può raggiungere le porte 80/443
@@ -726,9 +811,9 @@ ufw enable
--- ---
## 6. Comunicazione Inter-Servizio ## 7. Comunicazione Inter-Servizio
### 6.1 Pattern: Event Bus via Redis Pub/Sub ### 7.1 Redis Pub/Sub — Event Bus
``` ```
┌──────────┐ tier_changed:user_123 ┌──────────┐ ┌──────────┐ tier_changed:user_123 ┌──────────┐
@@ -736,87 +821,55 @@ ufw enable
│ Service │ │ Service │ │ Service │ │ Service │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
┌──────────┐ agent_triggered:user_123 ┌──────────┐ ┌──────────┐ tool_call:user_123 ┌──────────┐
Chat ──────────────────────── │ Any Agent │ ──────────────────────── Chat
│ Service │ │ Service │ │ Service │ │ Service │
└──────────┘ └──────────┘ │ (batch) │ ◄────────────────────────│ (ha WS) │
└──────────┘ tool_result:{call_id} └──────────┘
``` ```
### 6.2 Pattern: HTTP Sincrono (per query semplici) ### 7.2 Health Checks e Service Discovery
Il Chat Service può avere bisogno del tier dell'utente per il rate-limiting degli agent. Due strategie:
- **Strategia A (preferita)**: Il tier è nel JWT. All'aggiornamento, il Billing Service forza token refresh invalidando i vecchi token su Redis.
- **Strategia B**: Il Chat Service chiama `http://auth-service:8000/internal/user/{id}/tier` (rete Docker interna, non esposta).
### 6.3 Health Checks e Service Discovery
Traefik gestisce automaticamente il service discovery via Docker labels. I servizi non devono conoscersi tra loro — comunicano solo via: Traefik gestisce automaticamente il service discovery via Docker labels. I servizi non devono conoscersi tra loro — comunicano solo via:
- **Redis pub/sub** (eventi asincroni) - **Redis pub/sub** (tool-call cross-instance, tier events)
- **Redis hash** (stato condiviso, es. `ws:connections`) - **Redis hash** (stato condiviso: `ws:connections`, rate-limit counters)
- **PostgreSQL** (dati persistenti condivisi) - **PostgreSQL** (dati persistenti condivisi)
--- ---
## 7. Piano di Migrazione Incrementale ## 8. Piano di Migrazione Incrementale (MVP)
### Fase 1 — Preparazione (senza rompere nulla) ### Fase 1 — Preparazione (nel monolite attuale)
1. Aggiungere Redis al `docker-compose.yml` attuale 1. Aggiungere Redis al `docker-compose.yml` attuale
2. Migrare JWT da HS256 → RS256 (backward-compatible: accetta entrambi) 2. Migrare JWT da HS256 → RS256 (backward-compatible: accetta entrambi per un periodo)
3. Implementare `RedisDeviceManager` come drop-in replacement 3. Implementare `RedisDeviceManager` come drop-in replacement del singleton in-memory
4. Estrarre `shared/` con auth verification, schemas, middleware 4. Estrarre `shared/` con auth verification, schemas, middleware
### Fase 2 — Primo split: Auth Service ### Fase 2 — Auth Service (primo split)
1. Estrarre `auth.py` routes + models in `auth-service/` 1. Estrarre `auth.py` routes + models in `auth-service/`
2. Verificare che i JWT firmati da `auth-service` vengano validati dal monolite 2. Verificare che i JWT firmati da `auth-service` vengano validati dal monolite
3. Aggiornare Traefik per routare `/api/v1/auth/*` al nuovo servizio 3. Aggiungere Traefik e routare `/api/v1/auth/*` al nuovo servizio
4. Il monolite continua a servire tutto il resto 4. Il monolite continua a servire tutto il resto
### Fase 3 — Storage + Billing + Plugins ### Fase 3 — Billing Service
1. Servizi stateless e senza WebSocket → facili da estrarre 1. Estrarre billing routes, Stripe service, tier manager
2. Estrarre uno alla volta, testare, routare via Traefik 2. Configurare Redis pub/sub per `tier_changed` events
3. Il monolite diventa sempre più magro 3. Routare via Traefik
### Fase 4 — Chat Service (il più delicato) ### Fase 4 — Split Chat + Agent (il più delicato)
1. Il monolite residuo **diventa** il Chat Service 1. Il monolite residuo contiene WS + chat + agents
2. Rimuovere i route migrati, tenere solo WS + chat + agents 2. Separare Agent Service: estrarre `agent_runner`, `agent_registry`, `agent_setup`, route `/agents/*`
3. Testare lo scaling a 2+ istanze con `RedisDeviceManager` 3. Implementare `redis_executor.py` nell'Agent Service per tool-call via Redis
4. Verificare tool-call cross-instance 4. Il Chat Service resta proprietario della WS e sottoscrive i canali `tool_call:{user_id}`
5. Testare: trigger agent dall'Agent Service → tool_call via Redis → Chat Service → WS → device → risposta
### Fase 5 — Cleanup ### Fase 5 — Scaling test
1. Rimuovere il monolite originale 1. Scalare Chat Service a 2 repliche, verificare sticky sessions
2. CI/CD pipeline per build/push separati 2. Scalare Agent Service a 2 repliche, verificare batch processing distribuito
3. Monitoring (Prometheus + Grafana) per ogni servizio 3. Monitoring (Prometheus + Grafana) per ogni servizio
--- ---
## 8. Rate Limiting Distribuito
Il middleware attuale usa un contatore in-memory per il rate-limiting. Con i microservizi:
```python
# shared/middleware/rate_limit.py
import redis.asyncio as aioredis
class DistributedRateLimiter:
def __init__(self, redis: aioredis.Redis):
self._redis = redis
async def check(self, user_id: str, tier: str) -> bool:
limits = {"free": 20, "pro": 60, "power": 120, "team": 200}
max_req = limits.get(tier, 20)
key = f"rate:{user_id}"
pipe = self._redis.pipeline()
pipe.incr(key)
pipe.expire(key, 60) # Finestra di 60 secondi
count, _ = await pipe.execute()
return count <= max_req
```
---
## 9. Monitoraggio e Logging ## 9. Monitoraggio e Logging
```yaml ```yaml
@@ -845,35 +898,44 @@ Ogni servizio espone `/metrics` (Prometheus) e scrive log strutturati (JSON) rac
--- ---
## 10. Sizing VPS Minimo Consigliato ## 10. Sizing VPS Minimo Consigliato (MVP)
| Componente | CPU | RAM | Note | | Componente | CPU | RAM | Note |
|---|---|---|---| |---|---|---|---|
| Traefik | 0.25 | 128MB | | | Traefik | 0.25 | 128MB | |
| Auth Service ×2 | 0.25 ×2 | 128MB ×2 | | | Auth Service ×2 | 0.25 ×2 | 128MB ×2 | Stateless, leggero |
| Chat Service ×2 | 1.0 ×2 | 1GB ×2 | Il più pesante (LLM calls) | | Chat Service ×2 | 1.0 ×2 | 1GB ×2 | WS + streaming LLM |
| Storage Service ×2 | 0.5 ×2 | 256MB ×2 | I/O bound | | Agent Service ×2 | 0.75 ×2 | 512MB ×2 | Batch LLM, CPU-bound |
| Billing Service | 0.25 | 128MB | | | Billing Service | 0.25 | 128MB | |
| Plugin Service | 0.25 | 128MB | |
| PostgreSQL | 1.0 | 1GB | | | PostgreSQL | 1.0 | 1GB | |
| Redis | 0.25 | 256MB | | | Redis | 0.25 | 256MB | |
| Qdrant | 0.5 | 512MB | | | Qdrant | 0.5 | 512MB | |
| MinIO | 0.25 | 256MB | | | **Totale MVP** | **~5.5 vCPU** | **~5 GB** | |
| **Totale** | **~6 vCPU** | **~5.5 GB** | |
**Raccomandazione**: VPS con **8 vCPU / 16 GB RAM** per avere margine. Hetzner CPX41 (~€30/mese) o equivalente. **Raccomandazione**: VPS con **8 vCPU / 16 GB RAM** per avere margine. Hetzner CPX41 (~€30/mese) o equivalente. Senza Storage/Plugin si risparmia ~1 vCPU e 512MB rispetto alla versione completa.
--- ---
## Riepilogo Decisioni Architetturali ## Riepilogo Architettura MVP
| Servizio | Repliche | Proprietario di |
|---|---|---|
| **Traefik** | 1 | Routing, TLS, sticky sessions |
| **Auth Service** | 2 | JWT RS256, registrazione, login, profilo |
| **Chat Service** | 2N | WebSocket, home/floating chat, streaming |
| **Agent Service** | 2N | Batch processing, directory scan, agent setup |
| **Billing Service** | 1 | Stripe, subscriptions, tier management |
| Decisione | Scelta | Motivazione | | Decisione | Scelta | Motivazione |
|---|---|---| |---|---|---|
| API Gateway | Traefik | Nativo Docker, WebSocket support, service discovery automatico | | API Gateway | Traefik | Nativo Docker, WebSocket support, service discovery automatico |
| JWT | RS256 (asimmetrico) | Verifica distribuita senza contattare Auth Service | | JWT | RS256 (asimmetrico) | Verifica distribuita senza contattare Auth Service |
| WebSocket scaling | Redis pub/sub + registry | Cross-instance tool-call routing | | Tier check | Claim nel JWT | Ogni servizio verifica localmente, zero roundtrip |
| Rate limiting | Redis contatori | Distribuito, sliding window | | WebSocket scaling | Redis pub/sub + sticky cookies | Cross-instance tool-call routing |
| Service communication | Redis pub/sub + HTTP interno | Asincrono per eventi, sincrono per query | | Chat ↔ Agent split | Servizi separati | Batch CPU-bound non impatta real-time chat |
| Database | PostgreSQL condiviso (un DB, schema separation opzionale) | Semplicità; split DB futuro facile | | Agent → Device comms | Redis pub/sub via Chat Service | Agent non possiede la WS, usa un relay |
| TLS | Cloudflare Origin Certificate | Zero maintenance, trust Cloudflare | | Rate limiting | Redis contatori distribuiti | Sliding window condivisa tra repliche |
| Database | PostgreSQL condiviso | Semplicità MVP; split DB futuro facile |
| TLS | Cloudflare Origin Certificate | Zero maintenance |
| Orchestrazione | Docker Compose | Sufficiente per un singolo VPS | | Orchestrazione | Docker Compose | Sufficiente per un singolo VPS |
| Storage / Plugin | Post-MVP | Non critici per il lancio |