Compare commits
3 Commits
6c450805cb
...
1a8bf11f90
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a8bf11f90 | ||
|
|
e7cdce8287 | ||
|
|
58bc6efd4b |
@@ -29,7 +29,7 @@ TASK_SYSTEM_PROMPT = (
|
|||||||
" - project_id is optional; link to a project when the user mentions one\n"
|
" - project_id is optional; link to a project when the user mentions one\n"
|
||||||
" - is_ai_suggested: 1 only when proactively proposing a task the user\n"
|
" - is_ai_suggested: 1 only when proactively proposing a task the user\n"
|
||||||
" did not explicitly request; 0 otherwise\n"
|
" did not explicitly request; 0 otherwise\n"
|
||||||
" - is_approved defaults to 0; set to 1 only when the user confirms\n"
|
" - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n"
|
||||||
" - Use list_tasks_due_today for 'what's due today' queries\n"
|
" - Use list_tasks_due_today for 'what's due today' queries\n"
|
||||||
" - For update_task, use -1 for integer fields you do not want to change\n"
|
" - For update_task, use -1 for integer fields you do not want to change\n"
|
||||||
" - Always confirm the action in plain, user-friendly language."
|
" - Always confirm the action in plain, user-friendly language."
|
||||||
@@ -79,7 +79,6 @@ async def create_task(
|
|||||||
due_date: int = 0,
|
due_date: int = 0,
|
||||||
project_id: str = "",
|
project_id: str = "",
|
||||||
is_ai_suggested: int = 0,
|
is_ai_suggested: int = 0,
|
||||||
is_approved: int = 0,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Create a new task.
|
"""Create a new task.
|
||||||
title: task title (required)
|
title: task title (required)
|
||||||
@@ -90,7 +89,6 @@ async def create_task(
|
|||||||
due_date: Unix timestamp in milliseconds; 0 means no due date
|
due_date: Unix timestamp in milliseconds; 0 means no due date
|
||||||
project_id: optional UUID of the parent project
|
project_id: optional UUID of the parent project
|
||||||
is_ai_suggested: 1 if proactively suggested, 0 if user-requested
|
is_ai_suggested: 1 if proactively suggested, 0 if user-requested
|
||||||
is_approved: 0 until the user confirms; 1 when confirmed
|
|
||||||
"""
|
"""
|
||||||
result = await execute_on_client(
|
result = await execute_on_client(
|
||||||
action="insert",
|
action="insert",
|
||||||
@@ -104,7 +102,6 @@ async def create_task(
|
|||||||
"dueDate": due_date or None,
|
"dueDate": due_date or None,
|
||||||
"projectId": project_id or None,
|
"projectId": project_id or None,
|
||||||
"isAiSuggested": is_ai_suggested,
|
"isAiSuggested": is_ai_suggested,
|
||||||
"isApproved": is_approved,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
row = result["row"]
|
row = result["row"]
|
||||||
@@ -124,12 +121,10 @@ async def update_task(
|
|||||||
assignees: str = "",
|
assignees: str = "",
|
||||||
due_date: int = -1,
|
due_date: int = -1,
|
||||||
project_id: str = "",
|
project_id: str = "",
|
||||||
is_approved: int = -1,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Update fields on an existing task. Only pass fields you want to change.
|
"""Update fields on an existing task. Only pass fields you want to change.
|
||||||
task_id: the task's UUID (required)
|
task_id: the task's UUID (required)
|
||||||
due_date: -1 means unchanged; 0 clears the due date; any positive value sets it
|
due_date: -1 means unchanged; 0 clears the due date; any positive value sets it
|
||||||
is_approved: -1 means unchanged; 0 or 1 sets the value
|
|
||||||
"""
|
"""
|
||||||
updates: dict[str, Any] = {}
|
updates: dict[str, Any] = {}
|
||||||
if title:
|
if title:
|
||||||
@@ -146,8 +141,6 @@ async def update_task(
|
|||||||
updates["dueDate"] = due_date or None
|
updates["dueDate"] = due_date or None
|
||||||
if project_id:
|
if project_id:
|
||||||
updates["projectId"] = project_id
|
updates["projectId"] = project_id
|
||||||
if is_approved != -1:
|
|
||||||
updates["isApproved"] = is_approved
|
|
||||||
result = await execute_on_client(
|
result = await execute_on_client(
|
||||||
action="update",
|
action="update",
|
||||||
table="tasks",
|
table="tasks",
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ TIMELINE_SYSTEM_PROMPT = (
|
|||||||
" - For listing, project_id must be a UUID; never pass plain names as project_id\n"
|
" - For listing, project_id must be a UUID; never pass plain names as project_id\n"
|
||||||
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
|
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
|
||||||
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
|
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
|
||||||
" - is_approved: 0 until the user explicitly confirms; then 1\n"
|
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
|
||||||
" - For update_timeline, use -1 for integer fields you do not want to change\n"
|
" - For update_timeline, use -1 for integer fields you do not want to change\n"
|
||||||
" - Listing without a project_id returns all timelines across projects\n"
|
" - Listing without a project_id returns all timelines across projects\n"
|
||||||
" - Always echo the title and formatted date in your confirmation."
|
" - Always echo the title and formatted date in your confirmation."
|
||||||
@@ -54,14 +54,12 @@ async def create_timeline(
|
|||||||
title: str,
|
title: str,
|
||||||
date: int,
|
date: int,
|
||||||
is_ai_suggested: int = 0,
|
is_ai_suggested: int = 0,
|
||||||
is_approved: int = 0,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Create a project timeline (milestone).
|
"""Create a project timeline (milestone).
|
||||||
project_id: REQUIRED UUID of the parent project
|
project_id: REQUIRED UUID of the parent project
|
||||||
title: descriptive name for the milestone
|
title: descriptive name for the milestone
|
||||||
date: Unix timestamp in milliseconds
|
date: Unix timestamp in milliseconds
|
||||||
is_ai_suggested: 1 if proactively suggested, 0 if user-requested
|
is_ai_suggested: 1 if proactively suggested, 0 if user-requested
|
||||||
is_approved: 0 until the user confirms
|
|
||||||
"""
|
"""
|
||||||
result = await execute_on_client(
|
result = await execute_on_client(
|
||||||
action="insert",
|
action="insert",
|
||||||
@@ -71,7 +69,6 @@ async def create_timeline(
|
|||||||
"title": title,
|
"title": title,
|
||||||
"date": date,
|
"date": date,
|
||||||
"isAiSuggested": is_ai_suggested,
|
"isAiSuggested": is_ai_suggested,
|
||||||
"isApproved": is_approved,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
row = result["row"]
|
row = result["row"]
|
||||||
@@ -83,20 +80,16 @@ async def update_timeline(
|
|||||||
timeline_id: str,
|
timeline_id: str,
|
||||||
title: str = "",
|
title: str = "",
|
||||||
date: int = -1,
|
date: int = -1,
|
||||||
is_approved: int = -1,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Update a timeline. Only pass fields that should change.
|
"""Update a timeline. Only pass fields that should change.
|
||||||
timeline_id: UUID of the timeline (required)
|
timeline_id: UUID of the timeline (required)
|
||||||
date: -1 means unchanged; any other value sets the new date (ms timestamp)
|
date: -1 means unchanged; any other value sets the new date (ms timestamp)
|
||||||
is_approved: -1 means unchanged; 0 or 1 sets the approval state
|
|
||||||
"""
|
"""
|
||||||
updates: dict[str, Any] = {}
|
updates: dict[str, Any] = {}
|
||||||
if title:
|
if title:
|
||||||
updates["title"] = title
|
updates["title"] = title
|
||||||
if date != -1:
|
if date != -1:
|
||||||
updates["date"] = date
|
updates["date"] = date
|
||||||
if is_approved != -1:
|
|
||||||
updates["isApproved"] = is_approved
|
|
||||||
result = await execute_on_client(
|
result = await execute_on_client(
|
||||||
action="update",
|
action="update",
|
||||||
table="timelines",
|
table="timelines",
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||||||
|
|
||||||
from app.api.deps import get_current_user
|
from app.api.deps import get_current_user
|
||||||
from app.billing.tier_manager import FEATURES
|
from app.billing.tier_manager import FEATURES
|
||||||
from app.core.agent_runner import run_local_agent
|
from app.core.agent_runner import is_agent_running, run_local_agent
|
||||||
from app.core.device_manager import device_manager
|
from app.core.device_manager import device_manager
|
||||||
from app.db import get_session
|
from app.db import get_session
|
||||||
from app.models import AgentRunLog, LocalAgentConfig
|
from app.models import AgentRunLog, LocalAgentConfig
|
||||||
@@ -193,6 +193,12 @@ async def trigger_agent_run(
|
|||||||
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id.
|
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id.
|
||||||
stable_agent_id = body.agent_id or config.id
|
stable_agent_id = body.agent_id or config.id
|
||||||
|
|
||||||
|
if is_agent_running(stable_agent_id):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_409_CONFLICT,
|
||||||
|
detail="Agent is already running. Only one run per agent is allowed at a time.",
|
||||||
|
)
|
||||||
|
|
||||||
run_log = AgentRunLog(
|
run_log = AgentRunLog(
|
||||||
agent_id=stable_agent_id,
|
agent_id=stable_agent_id,
|
||||||
agent_type="local",
|
agent_type="local",
|
||||||
|
|||||||
@@ -2,11 +2,12 @@
|
|||||||
|
|
||||||
Drives two agent types:
|
Drives two agent types:
|
||||||
|
|
||||||
* **Local directory agent** — two-phase execution that mirrors the
|
* **Local directory agent** — two-step execution per file:
|
||||||
``deep_agent.py`` tool-calling pattern. Phase 1 (Triage) explores the
|
Step 1 (Classification) uses code to fetch all projects and asks the LLM
|
||||||
user's directory via file-system tools and groups files by project.
|
to identify which project the file belongs to and which domains are relevant.
|
||||||
Phase 2 (Processing) reads full file contents and performs CRUD
|
Step 2 (Processing) fetches existing entities for that project/domains via
|
||||||
operations using the standard entity tools (tasks, notes, etc.).
|
code and runs an LLM with tools — existing data in context enforces
|
||||||
|
update-first naturally.
|
||||||
|
|
||||||
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
|
* **Cloud connector agent** — fetches data from third-party APIs (Gmail,
|
||||||
Teams, Outlook) and pushes extracted items to Electron.
|
Teams, Outlook) and pushes extracted items to Electron.
|
||||||
@@ -43,19 +44,30 @@ from app.agents.task_agent import TASK_TOOLS
|
|||||||
from app.agents.timeline_agent import TIMELINE_TOOLS
|
from app.agents.timeline_agent import TIMELINE_TOOLS
|
||||||
from app.core.device_manager import DeviceConnectionManager
|
from app.core.device_manager import DeviceConnectionManager
|
||||||
from app.core.llm import get_llm
|
from app.core.llm import get_llm
|
||||||
from app.core.ws_context import clear_client_executor, set_client_executor
|
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
|
||||||
from app.db import async_session
|
from app.db import async_session
|
||||||
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
|
from app.models import AgentRunLog, CloudAgentConfig, LocalAgentConfig
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ── Concurrency guard ─────────────────────────────────────────────────────
|
||||||
|
# Tracks agent IDs that currently have a run in progress.
|
||||||
|
# Prevents multiple simultaneous runs of the same agent within a single process.
|
||||||
|
_running_agents: set[str] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def is_agent_running(agent_id: str) -> bool:
|
||||||
|
"""Return ``True`` if *agent_id* already has a run in progress."""
|
||||||
|
return agent_id in _running_agents
|
||||||
|
|
||||||
# ── Timeouts ───────────────────────────────────────────────────────────────
|
# ── Timeouts ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
# Max seconds to wait for a single tool-call round-trip (FE → BE).
|
# Max seconds to wait for a single tool-call round-trip (FE → BE).
|
||||||
_TOOL_CALL_TIMEOUT: int = 30
|
_TOOL_CALL_TIMEOUT: int = 30
|
||||||
# Max LLM reasoning steps per phase.
|
# Max LLM reasoning steps for Step 2 processing.
|
||||||
_MAX_TRIAGE_STEPS: int = 10
|
|
||||||
_MAX_PROCESSING_STEPS: int = 12
|
_MAX_PROCESSING_STEPS: int = 12
|
||||||
|
# Max directory recursion depth during scan.
|
||||||
|
_MAX_SCAN_DEPTH: int = 5
|
||||||
|
|
||||||
# ── Data-type to tool mapping ─────────────────────────────────────────────
|
# ── Data-type to tool mapping ─────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -66,46 +78,88 @@ _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
|
|||||||
"timelines": TIMELINE_TOOLS,
|
"timelines": TIMELINE_TOOLS,
|
||||||
}
|
}
|
||||||
|
|
||||||
# ── Triage prompt ─────────────────────────────────────────────────────────
|
# ── Step 1: Classification prompt ─────────────────────────────────────────
|
||||||
|
|
||||||
_TRIAGE_SYSTEM_PROMPT = """\
|
_DOMAIN_DESCRIPTIONS: dict[str, str] = {
|
||||||
You are a file triage assistant for a freelance project management tool.
|
"tasks": (
|
||||||
Your job is to explore a local directory on the user's device, understand its
|
"Action items, to-dos, deliverables — anything that describes work to be done, "
|
||||||
structure, and group files by project context.
|
"assigned to someone, or tracked with a due date or status."
|
||||||
|
),
|
||||||
|
"notes": (
|
||||||
|
"Documentation, meeting notes, summaries, reference material — "
|
||||||
|
"written content meant to be read and referenced rather than acted on."
|
||||||
|
),
|
||||||
|
"timelines": (
|
||||||
|
"Project milestones, deadlines, scheduled events — "
|
||||||
|
"specific dates that mark a point in the progress of a project."
|
||||||
|
),
|
||||||
|
"projects": (
|
||||||
|
"High-level project entities — only relevant if the file clearly introduces "
|
||||||
|
"a new project or updates the scope of an existing one."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
You have access to these tools:
|
_STEP1_SYSTEM_PROMPT = """\
|
||||||
- list_directory: to map folder structure
|
You are a file classifier for a freelance project management tool.
|
||||||
- get_file_metadata: to check creation/modification dates
|
|
||||||
- read_file_content: to read brief snippets when needed for categorisation
|
|
||||||
- list_projects / list_all_projects / get_project: to fetch existing projects
|
|
||||||
from the user's workspace and match files to them
|
|
||||||
|
|
||||||
Instructions:
|
Your job is to match a file to an existing project and identify which data domains to extract.
|
||||||
1. Start by calling list_directory on the configured root path.
|
|
||||||
2. Explore subdirectories as needed to understand the structure.
|
|
||||||
3. Use get_file_metadata to check modification dates. Skip files that have
|
|
||||||
NOT been modified since: {last_run_at}.
|
|
||||||
4. Call list_all_projects to get the user's existing projects.
|
|
||||||
5. Match files to existing projects by name, folder structure, or content hints.
|
|
||||||
6. If files don't match any existing project, group them under "standalone".
|
|
||||||
|
|
||||||
{custom_prompt_section}
|
## Project matching rules (STRICT — follow in order)
|
||||||
|
|
||||||
Target entity types to extract: {data_types}
|
1. Search the file content for any mention of a project name, client name, acronym, or topic
|
||||||
File extensions to consider: {file_extensions}
|
that overlaps with the existing projects listed below.
|
||||||
|
2. The match does NOT need to be exact — partial name, abbreviation, or topic similarity is enough.
|
||||||
|
3. STRONGLY PREFER matching an existing project. Only return "new" as an absolute last resort
|
||||||
|
when the file has zero meaningful connection to any listed project.
|
||||||
|
4. When in doubt, pick the closest match from the list.
|
||||||
|
|
||||||
When you have finished exploring, output ONLY a JSON object (no markdown
|
## Response format
|
||||||
fences, no explanation) mapping project IDs or "standalone" to file path
|
|
||||||
arrays:
|
|
||||||
|
|
||||||
{{"<project_id>": ["<file_path>", ...], "standalone": ["<file_path>", ...]}}
|
Respond ONLY with a JSON object — no markdown, no explanation:
|
||||||
|
|
||||||
Return ONLY the JSON object as your final message.
|
{{"project_id": "<exact id from the list below, or new>", "new_project_name": "<concise 2-5 word name, only when project_id is new>", "domains": ["tasks", "notes"]}}
|
||||||
|
|
||||||
|
## Domain definitions (only consider domains in the allowed list)
|
||||||
|
|
||||||
|
{domain_definitions}
|
||||||
|
|
||||||
|
## Existing projects
|
||||||
|
|
||||||
|
{projects_list}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# ── Processing prompt ─────────────────────────────────────────────────────
|
# ── Step 2: Processing prompt ─────────────────────────────────────────────
|
||||||
|
|
||||||
_PROCESSING_BASE_PROMPT = """\
|
_PROCESSING_SYSTEM_PROMPT = """\
|
||||||
|
You are a data extraction assistant for a freelance project management tool.
|
||||||
|
|
||||||
|
Your task: extract structured data from the file content and persist it using the available tools.
|
||||||
|
|
||||||
|
## Mandatory process — follow this order for EVERY item you extract
|
||||||
|
|
||||||
|
1. READ the existing records listed below for the relevant domain.
|
||||||
|
2. SEARCH for a match by title, topic, or semantic similarity.
|
||||||
|
3. If a match exists → call the update_* tool with the existing record's id.
|
||||||
|
4. If no match exists → call the create_* tool and set isAiSuggested=1.
|
||||||
|
|
||||||
|
NEVER call create_* without first checking the existing records.
|
||||||
|
NEVER duplicate a record that already exists under a different wording.
|
||||||
|
|
||||||
|
## Existing records (source of truth)
|
||||||
|
|
||||||
|
{existing_context}
|
||||||
|
|
||||||
|
## Context
|
||||||
|
|
||||||
|
Project: {project_context}
|
||||||
|
Domains to extract: {data_types}
|
||||||
|
|
||||||
|
{custom_prompt_section}
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ── Cloud processing prompt (kept separate for cloud agent) ───────────────
|
||||||
|
|
||||||
|
_CLOUD_PROCESSING_PROMPT = """\
|
||||||
You are a data extraction and management assistant for a freelance project
|
You are a data extraction and management assistant for a freelance project
|
||||||
management tool.
|
management tool.
|
||||||
|
|
||||||
@@ -124,26 +178,6 @@ Your task:
|
|||||||
4. Do NOT invent data. Only extract what is clearly present in the files.
|
4. Do NOT invent data. Only extract what is clearly present in the files.
|
||||||
5. If a file contains no relevant data for the target entity types, skip it.
|
5. If a file contains no relevant data for the target entity types, skip it.
|
||||||
|
|
||||||
Update-first rules (apply in this order):
|
|
||||||
Tasks:
|
|
||||||
- Call list_tasks to find a match by title or context.
|
|
||||||
- If found: call add_task_comment (author "Adiuva"), update_task to set
|
|
||||||
assignees, state (ToDo / In Progress / Completed), or other fields.
|
|
||||||
- If NOT found: call create_task with isAiSuggested=1, isApproved=0.
|
|
||||||
Timelines:
|
|
||||||
- Call list_timelines to find a match by title or date.
|
|
||||||
- If found: call update_timeline to edit fields or mark it complete.
|
|
||||||
- If NOT found: call create_timeline with isAiSuggested=1, isApproved=0.
|
|
||||||
Notes:
|
|
||||||
- Call list_notes to find a match by title or topic, then get_note to
|
|
||||||
read its current content.
|
|
||||||
- If found: call update_note with the merged content.
|
|
||||||
- If NOT found: call create_note with isAiSuggested=1, isApproved=0.
|
|
||||||
Projects:
|
|
||||||
- Call list_all_projects to check for a match first.
|
|
||||||
- Only call create_project if the information is clearly significant and
|
|
||||||
no existing project matches. Set isAiSuggested=1, isApproved=0.
|
|
||||||
|
|
||||||
{project_context}
|
{project_context}
|
||||||
|
|
||||||
Files to process:
|
Files to process:
|
||||||
@@ -168,7 +202,6 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
|
|||||||
try:
|
try:
|
||||||
now = datetime.now(timezone.utc)
|
now = datetime.now(timezone.utc)
|
||||||
if last_run_at is None:
|
if last_run_at is None:
|
||||||
# Validate the expression before deciding this is overdue.
|
|
||||||
croniter(schedule_cron, now)
|
croniter(schedule_cron, now)
|
||||||
return True
|
return True
|
||||||
ts = last_run_at
|
ts = last_run_at
|
||||||
@@ -179,7 +212,7 @@ def _is_overdue(schedule_cron: str, last_run_at: datetime | None) -> bool:
|
|||||||
return now >= next_run
|
return now >= next_run
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
|
logger.warning("agent_runner: cannot parse cron %r: %s", schedule_cron, exc)
|
||||||
return False # Fail-safe: don't trigger if expression is invalid.
|
return False
|
||||||
|
|
||||||
|
|
||||||
# ── WS executor for agent context ─────────────────────────────────────────
|
# ── WS executor for agent context ─────────────────────────────────────────
|
||||||
@@ -207,7 +240,7 @@ def _make_agent_executor(
|
|||||||
return _executor
|
return _executor
|
||||||
|
|
||||||
|
|
||||||
# ── LLM tool-calling loop (mirrors deep_agent._run_single_agent) ──────────
|
# ── LLM tool-calling loop ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
def _as_text(content: Any) -> str:
|
def _as_text(content: Any) -> str:
|
||||||
@@ -235,11 +268,7 @@ async def _run_agent_with_tools(
|
|||||||
tools: list[Any],
|
tools: list[Any],
|
||||||
max_steps: int,
|
max_steps: int,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Run an LLM agent with tool-calling, returning the final text response.
|
"""Run an LLM agent with tool-calling, returning the final text response."""
|
||||||
|
|
||||||
Follows the same pattern as ``deep_agent._run_single_agent``:
|
|
||||||
bind tools → invoke → handle tool calls → repeat until final text.
|
|
||||||
"""
|
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
llm_with_tools = llm.bind_tools(tools)
|
llm_with_tools = llm.bind_tools(tools)
|
||||||
messages: list[Any] = [
|
messages: list[Any] = [
|
||||||
@@ -247,7 +276,6 @@ async def _run_agent_with_tools(
|
|||||||
HumanMessage(content=user_message),
|
HumanMessage(content=user_message),
|
||||||
]
|
]
|
||||||
|
|
||||||
tool_calls_count = 0
|
|
||||||
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
tool_map = {tool_def.name: tool_def for tool_def in tools}
|
||||||
|
|
||||||
for _ in range(max_steps):
|
for _ in range(max_steps):
|
||||||
@@ -258,7 +286,6 @@ async def _run_agent_with_tools(
|
|||||||
return _as_text(response.content)
|
return _as_text(response.content)
|
||||||
|
|
||||||
for call in response.tool_calls:
|
for call in response.tool_calls:
|
||||||
tool_calls_count += 1
|
|
||||||
call_id = str(call.get("id", ""))
|
call_id = str(call.get("id", ""))
|
||||||
call_name = str(call.get("name", ""))
|
call_name = str(call.get("name", ""))
|
||||||
call_args = call.get("args", {})
|
call_args = call.get("args", {})
|
||||||
@@ -277,47 +304,19 @@ async def _run_agent_with_tools(
|
|||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: tool_result name=%s output=%s",
|
"agent_runner: tool_result name=%s output=%s",
|
||||||
call_name,
|
call_name,
|
||||||
str(tool_output)[:1200],
|
str(tool_output)[:200],
|
||||||
)
|
)
|
||||||
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
messages.append(ToolMessage(content=str(tool_output), tool_call_id=call["id"]))
|
||||||
|
|
||||||
# Fallback: exceeded max steps, get final response without tools.
|
|
||||||
final = await llm.ainvoke(messages)
|
final = await llm.ainvoke(messages)
|
||||||
return _as_text(final.content)
|
return _as_text(final.content)
|
||||||
|
|
||||||
|
|
||||||
# ── Triage map parser ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_triage_map(raw: str) -> dict[str, list[str]] | None:
|
|
||||||
"""Extract the JSON triage map from the LLM's final response."""
|
|
||||||
text = raw.strip()
|
|
||||||
# Try direct parse first.
|
|
||||||
try:
|
|
||||||
parsed = json.loads(text)
|
|
||||||
if isinstance(parsed, dict):
|
|
||||||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Try extracting JSON from markdown fences or surrounding text.
|
|
||||||
import re
|
|
||||||
match = re.search(r"\{[\s\S]*\}", text)
|
|
||||||
if match:
|
|
||||||
try:
|
|
||||||
parsed = json.loads(match.group(0))
|
|
||||||
if isinstance(parsed, dict):
|
|
||||||
return {k: v for k, v in parsed.items() if isinstance(v, list)}
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tool list builder ─────────────────────────────────────────────────────
|
# ── Tool list builder ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
def _build_processing_tools(data_types: list[str]) -> list[Any]:
|
def _build_processing_tools(data_types: list[str]) -> list[Any]:
|
||||||
"""Build the tool list for Phase 2 based on user's data_types selection."""
|
"""Build the tool list for processing based on user's data_types selection."""
|
||||||
tools: list[Any] = list(FILESYSTEM_TOOLS)
|
tools: list[Any] = list(FILESYSTEM_TOOLS)
|
||||||
for dt in data_types:
|
for dt in data_types:
|
||||||
dt_tools = _DATA_TYPE_TOOLS.get(dt)
|
dt_tools = _DATA_TYPE_TOOLS.get(dt)
|
||||||
@@ -326,7 +325,236 @@ def _build_processing_tools(data_types: list[str]) -> list[Any]:
|
|||||||
return tools
|
return tools
|
||||||
|
|
||||||
|
|
||||||
# ── Local agent runner (two-phase) ─────────────────────────────────────────
|
# ── Code-based directory scanner ─────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def _scan_directories(
|
||||||
|
paths: list[str],
|
||||||
|
extensions: list[str],
|
||||||
|
last_run_at: datetime | None,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Walk directories via WS tool calls and return filtered file paths.
|
||||||
|
|
||||||
|
Recursion is capped at ``_MAX_SCAN_DEPTH``. Files are filtered by
|
||||||
|
extension (if configured) and by modification date (if ``last_run_at``
|
||||||
|
is set). Fails open: if metadata cannot be read, the file is included.
|
||||||
|
"""
|
||||||
|
all_files: list[str] = []
|
||||||
|
ext_set = {e.lstrip(".").lower() for e in extensions} if extensions else set()
|
||||||
|
|
||||||
|
async def _walk(path: str, depth: int) -> None:
|
||||||
|
if depth > _MAX_SCAN_DEPTH:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
result = await execute_on_client(action="list_directory", data={"path": path})
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("agent_runner: list_directory failed %r: %s", path, exc)
|
||||||
|
return
|
||||||
|
for entry in result.get("entries", []):
|
||||||
|
entry_path = entry.get("path", "")
|
||||||
|
if not entry_path:
|
||||||
|
continue
|
||||||
|
if entry.get("type") == "directory":
|
||||||
|
await _walk(entry_path, depth + 1)
|
||||||
|
elif entry.get("type") == "file":
|
||||||
|
if ext_set:
|
||||||
|
dot_pos = entry_path.rfind(".")
|
||||||
|
file_ext = entry_path[dot_pos + 1:].lower() if dot_pos != -1 else ""
|
||||||
|
if file_ext not in ext_set:
|
||||||
|
continue
|
||||||
|
all_files.append(entry_path)
|
||||||
|
|
||||||
|
for root in paths:
|
||||||
|
await _walk(root, depth=0)
|
||||||
|
|
||||||
|
if last_run_at is None:
|
||||||
|
return all_files
|
||||||
|
|
||||||
|
# Filter by modification date.
|
||||||
|
last_run_ms = int(last_run_at.timestamp() * 1000)
|
||||||
|
filtered: list[str] = []
|
||||||
|
for file_path in all_files:
|
||||||
|
try:
|
||||||
|
meta = await execute_on_client(action="get_file_metadata", data={"path": file_path})
|
||||||
|
modified_at = meta.get("modifiedAt")
|
||||||
|
if modified_at is None:
|
||||||
|
filtered.append(file_path)
|
||||||
|
continue
|
||||||
|
if isinstance(modified_at, (int, float)):
|
||||||
|
mod_ms = int(modified_at)
|
||||||
|
else:
|
||||||
|
mod_ms = int(datetime.fromisoformat(str(modified_at)).timestamp() * 1000)
|
||||||
|
if mod_ms > last_run_ms:
|
||||||
|
filtered.append(file_path)
|
||||||
|
except Exception:
|
||||||
|
filtered.append(file_path) # fail-open
|
||||||
|
|
||||||
|
return filtered
|
||||||
|
|
||||||
|
|
||||||
|
# ── Code-based entity fetchers ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def _fetch_projects() -> list[dict]:
|
||||||
|
"""Fetch all projects from the Electron client via WS."""
|
||||||
|
try:
|
||||||
|
result = await execute_on_client(action="select", table="projects")
|
||||||
|
return result.get("rows", [])
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("agent_runner: failed to fetch projects: %s", exc)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
_DOMAIN_TABLE: dict[str, str] = {
|
||||||
|
"tasks": "tasks",
|
||||||
|
"notes": "notes",
|
||||||
|
"timelines": "timelines",
|
||||||
|
"projects": "projects",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _fetch_domain_entities(domain: str, project_id: str) -> list[dict]:
|
||||||
|
"""Fetch existing rows for a domain, scoped to a project where applicable."""
|
||||||
|
table = _DOMAIN_TABLE.get(domain)
|
||||||
|
if not table:
|
||||||
|
return []
|
||||||
|
filters: dict[str, Any] = {}
|
||||||
|
if project_id != "standalone" and domain != "projects":
|
||||||
|
filters["projectId"] = project_id
|
||||||
|
try:
|
||||||
|
result = await execute_on_client(
|
||||||
|
action="select",
|
||||||
|
table=table,
|
||||||
|
filters=filters if filters else None,
|
||||||
|
)
|
||||||
|
return result.get("rows", [])
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("agent_runner: failed to fetch %s: %s", domain, exc)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def _format_entities_for_context(domain: str, rows: list[dict]) -> str:
|
||||||
|
"""Format existing entity rows as a readable context block for the LLM.
|
||||||
|
|
||||||
|
Includes enough detail per record for the LLM to make a confident
|
||||||
|
update-vs-create decision without overwhelming the context.
|
||||||
|
Note content is truncated to 200 chars to stay within token budget.
|
||||||
|
"""
|
||||||
|
if not rows:
|
||||||
|
return f"No existing {domain}."
|
||||||
|
lines: list[str] = []
|
||||||
|
for r in rows:
|
||||||
|
if domain == "tasks":
|
||||||
|
desc = r.get("description") or ""
|
||||||
|
desc_part = f" — {desc[:120]}" if desc else ""
|
||||||
|
assignee = r.get("assignee") or r.get("assignees") or ""
|
||||||
|
due = r.get("dueDate") or r.get("due_date") or ""
|
||||||
|
meta = ", ".join(filter(None, [
|
||||||
|
f"priority: {r.get('priority', '')}" if r.get("priority") else "",
|
||||||
|
f"assignee: {assignee}" if assignee else "",
|
||||||
|
f"due: {due}" if due else "",
|
||||||
|
]))
|
||||||
|
lines.append(
|
||||||
|
f" - [{r.get('status', '?')}] {r.get('title', '')}{desc_part}"
|
||||||
|
f" ({meta}, id: {r['id']})"
|
||||||
|
)
|
||||||
|
elif domain == "notes":
|
||||||
|
snippet = (r.get("content") or "")[:200].replace("\n", " ")
|
||||||
|
snippet_part = f"\n Preview: {snippet}" if snippet else ""
|
||||||
|
lines.append(
|
||||||
|
f" - {r.get('title', '')} (id: {r['id']}){snippet_part}"
|
||||||
|
)
|
||||||
|
elif domain == "timelines":
|
||||||
|
lines.append(
|
||||||
|
f" - {r.get('title', '')} date={r.get('date', '')} (id: {r['id']})"
|
||||||
|
)
|
||||||
|
elif domain == "projects":
|
||||||
|
summary = (r.get("aiSummary") or r.get("ai_summary") or "")[:120]
|
||||||
|
summary_part = f" — {summary}" if summary else ""
|
||||||
|
lines.append(
|
||||||
|
f" - {r.get('name', '')} [{r.get('status', '')}]{summary_part}"
|
||||||
|
f" (id: {r['id']})"
|
||||||
|
)
|
||||||
|
return f"Existing {domain}:\n" + "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Step 1: LLM file classifier ───────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def _classify_file(
|
||||||
|
file_path: str,
|
||||||
|
file_content: str,
|
||||||
|
projects: list[dict],
|
||||||
|
config_data_types: list[str],
|
||||||
|
) -> tuple[str, list[str], str | None]:
|
||||||
|
"""Call the LLM to classify a file by project and relevant domains.
|
||||||
|
|
||||||
|
Returns ``(project_id_or_"new", domains, new_project_name_or_None)``.
|
||||||
|
- ``project_id`` is an existing project UUID, or ``"new"`` when no match found.
|
||||||
|
- ``new_project_name`` is only set when ``project_id == "new"``.
|
||||||
|
Falls back to ``("new", config_data_types, None)`` on any error.
|
||||||
|
"""
|
||||||
|
fallback: tuple[str, list[str], str | None] = ("new", list(config_data_types), None)
|
||||||
|
|
||||||
|
if not file_content.strip():
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
valid_project_ids = {p["id"] for p in projects}
|
||||||
|
|
||||||
|
def _fmt_project(p: dict) -> str:
|
||||||
|
summary = (p.get("aiSummary") or p.get("ai_summary") or "").strip()
|
||||||
|
summary_part = f" — {summary[:100]}" if summary else ""
|
||||||
|
return f" - id={p['id']} | name={p.get('name', '')} | status={p.get('status', '')}{summary_part}"
|
||||||
|
|
||||||
|
projects_list = "\n".join(_fmt_project(p) for p in projects) or " (none yet)"
|
||||||
|
|
||||||
|
domain_definitions = "\n".join(
|
||||||
|
f" - {d}: {_DOMAIN_DESCRIPTIONS[d]}"
|
||||||
|
for d in config_data_types
|
||||||
|
if d in _DOMAIN_DESCRIPTIONS
|
||||||
|
)
|
||||||
|
|
||||||
|
system = _STEP1_SYSTEM_PROMPT.format(
|
||||||
|
domain_definitions=domain_definitions,
|
||||||
|
projects_list=projects_list,
|
||||||
|
)
|
||||||
|
|
||||||
|
llm = get_llm()
|
||||||
|
try:
|
||||||
|
response = await llm.ainvoke([
|
||||||
|
SystemMessage(content=system),
|
||||||
|
HumanMessage(content=f"File: {file_path}\n\nContent:\n{file_content[:4000]}"),
|
||||||
|
])
|
||||||
|
raw = _as_text(response.content).strip()
|
||||||
|
# Strip markdown fences if the model wraps the JSON.
|
||||||
|
if raw.startswith("```"):
|
||||||
|
raw = raw.split("```")[1]
|
||||||
|
if raw.startswith("json"):
|
||||||
|
raw = raw[4:]
|
||||||
|
parsed = json.loads(raw.strip())
|
||||||
|
raw_project_id: str = str(parsed.get("project_id") or "new")
|
||||||
|
# Reject hallucinated UUIDs — only accept ids that exist in the fetched list.
|
||||||
|
project_id = raw_project_id if raw_project_id in valid_project_ids else "new"
|
||||||
|
new_project_name: str | None = (
|
||||||
|
str(parsed["new_project_name"]).strip() or None
|
||||||
|
if project_id == "new" and parsed.get("new_project_name")
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
domains: list[str] = [
|
||||||
|
d for d in parsed.get("domains", [])
|
||||||
|
if d in config_data_types
|
||||||
|
]
|
||||||
|
if not domains:
|
||||||
|
domains = list(config_data_types)
|
||||||
|
return project_id, domains, new_project_name
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
"agent_runner: step1 classification failed for %r: %s", file_path, exc
|
||||||
|
)
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
|
||||||
|
# ── Local agent runner (two-step per file) ────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
async def run_local_agent(
|
async def run_local_agent(
|
||||||
@@ -336,24 +564,28 @@ async def run_local_agent(
|
|||||||
device_mgr: DeviceConnectionManager,
|
device_mgr: DeviceConnectionManager,
|
||||||
run_context: dict | None = None,
|
run_context: dict | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Execute a local directory agent run using two-phase LLM-with-tools.
|
"""Execute a local directory agent run using a two-step approach per file.
|
||||||
|
|
||||||
Phase 1 — Triage:
|
Step 1 — Classification (code + 1 LLM call per file, no tools):
|
||||||
Explore the directory structure, check metadata, match files to
|
Code scans directories and fetches all projects via WS.
|
||||||
existing projects. Output: a JSON map of project → file paths.
|
For each file, LLM identifies the project and relevant domains.
|
||||||
|
|
||||||
Phase 2 — Processing:
|
Step 2 — Processing (code + 1 LLM call per file, with tools):
|
||||||
For each project group, read full file contents and perform CRUD
|
Code fetches existing entities for the identified project/domains.
|
||||||
operations using the standard entity tools.
|
LLM receives file content + existing entities in context and uses
|
||||||
|
tools to update existing records or create new ones.
|
||||||
"""
|
"""
|
||||||
run_id = run_log.id
|
run_id = run_log.id
|
||||||
|
agent_id = (run_context or {}).get("agent_id") or config.id
|
||||||
|
_running_agents.add(agent_id)
|
||||||
|
|
||||||
# ── Device online check ─────────────────────────────────────────
|
# ── Device online check ─────────────────────────────────────────
|
||||||
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
|
target_device_id = config.device_id.strip() if isinstance(config.device_id, str) else ""
|
||||||
if target_device_id:
|
is_online = (
|
||||||
is_online = device_mgr.is_online(user_id, target_device_id)
|
device_mgr.is_online(user_id, target_device_id)
|
||||||
else:
|
if target_device_id
|
||||||
is_online = device_mgr.is_online(user_id)
|
else device_mgr.is_online(user_id)
|
||||||
|
)
|
||||||
|
|
||||||
if not is_online:
|
if not is_online:
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -377,116 +609,124 @@ async def run_local_agent(
|
|||||||
items_processed = 0
|
items_processed = 0
|
||||||
items_created = 0
|
items_created = 0
|
||||||
|
|
||||||
|
custom_section = (
|
||||||
|
f"User instructions:\n{config.prompt_template}"
|
||||||
|
if config.prompt_template
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ── Phase 1: Triage ─────────────────────────────────────────
|
# ── Code: scan directories ───────────────────────────────────
|
||||||
logger.info("agent_runner: run=%s phase=triage start user=%s", run_id, user_id)
|
logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id)
|
||||||
|
file_paths = await _scan_directories(
|
||||||
last_run_str = "never (process all files)"
|
paths=config.directory_paths,
|
||||||
if config.last_run_at:
|
extensions=config.file_extensions or [],
|
||||||
last_run_str = config.last_run_at.isoformat()
|
last_run_at=config.last_run_at,
|
||||||
|
)
|
||||||
custom_section = ""
|
logger.info(
|
||||||
if config.prompt_template:
|
"agent_runner: run=%s found %d file(s) after filtering", run_id, len(file_paths)
|
||||||
custom_section = f"User instructions:\n{config.prompt_template}"
|
|
||||||
|
|
||||||
file_ext_str = ", ".join(config.file_extensions) if config.file_extensions else "all"
|
|
||||||
|
|
||||||
triage_prompt = _TRIAGE_SYSTEM_PROMPT.format(
|
|
||||||
last_run_at=last_run_str,
|
|
||||||
custom_prompt_section=custom_section,
|
|
||||||
data_types=", ".join(config.data_types),
|
|
||||||
file_extensions=file_ext_str,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
directory_paths = config.directory_paths
|
if not file_paths:
|
||||||
triage_user_msg = (
|
await _finalize_run(run_log, status="success", items_processed=0, items_created=0)
|
||||||
f"Explore these directories and produce the triage map:\n"
|
|
||||||
f"{json.dumps(directory_paths, ensure_ascii=False)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
triage_tools: list[Any] = list(FILESYSTEM_TOOLS) + list(PROJECT_TOOLS)
|
|
||||||
|
|
||||||
triage_response = await _run_agent_with_tools(
|
|
||||||
system_prompt=triage_prompt,
|
|
||||||
user_message=triage_user_msg,
|
|
||||||
tools=triage_tools,
|
|
||||||
max_steps=_MAX_TRIAGE_STEPS,
|
|
||||||
)
|
|
||||||
|
|
||||||
triage_map = _parse_triage_map(triage_response)
|
|
||||||
if not triage_map:
|
|
||||||
errors.append(f"Triage phase failed to produce a valid file map: {triage_response[:500]}")
|
|
||||||
await _finalize_run(run_log, status="error", errors=errors)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(
|
# ── Code: fetch all projects once ────────────────────────────
|
||||||
"agent_runner: run=%s triage complete groups=%d total_files=%d",
|
projects = await _fetch_projects()
|
||||||
run_id,
|
|
||||||
len(triage_map),
|
for file_path in file_paths:
|
||||||
sum(len(files) for files in triage_map.values()),
|
try:
|
||||||
|
# Read file content via code.
|
||||||
|
file_result = await execute_on_client(
|
||||||
|
action="read_file_content", data={"path": file_path}
|
||||||
)
|
)
|
||||||
|
file_content: str = file_result.get("content", "")
|
||||||
# ── Phase 2: Processing (per group) ─────────────────────────
|
if not file_content:
|
||||||
processing_tools = _build_processing_tools(config.data_types)
|
logger.debug("agent_runner: run=%s skipping empty file %r", run_id, file_path)
|
||||||
|
|
||||||
for group_key, file_paths in triage_map.items():
|
|
||||||
if not file_paths:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
items_processed += 1
|
||||||
|
|
||||||
|
# Step 1 — classify file.
|
||||||
|
project_id, domains, new_project_name = await _classify_file(
|
||||||
|
file_path=file_path,
|
||||||
|
file_content=file_content,
|
||||||
|
projects=projects,
|
||||||
|
config_data_types=config.data_types,
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: run=%s phase=processing group=%s files=%d",
|
"agent_runner: run=%s file=%r → project=%s new_name=%r domains=%s",
|
||||||
run_id,
|
run_id,
|
||||||
group_key,
|
file_path,
|
||||||
len(file_paths),
|
project_id,
|
||||||
|
new_project_name,
|
||||||
|
domains,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build project context for the LLM.
|
# Step 2 — fetch existing entities for this project + domains.
|
||||||
if group_key == "standalone":
|
# When project_id is "new", entities are fetched without a project
|
||||||
project_context = "These files are not associated with any existing project."
|
# filter; the LLM will create the project and link records to it.
|
||||||
|
effective_project_id = project_id if project_id != "new" else "standalone"
|
||||||
|
|
||||||
|
existing_blocks: list[str] = []
|
||||||
|
for domain in domains:
|
||||||
|
rows = await _fetch_domain_entities(domain, effective_project_id)
|
||||||
|
existing_blocks.append(_format_entities_for_context(domain, rows))
|
||||||
|
|
||||||
|
existing_context = "\n\n".join(existing_blocks)
|
||||||
|
|
||||||
|
if project_id == "new":
|
||||||
|
name_hint = f' Use "{new_project_name}" as the project name.' if new_project_name else ""
|
||||||
|
project_context = (
|
||||||
|
f"No existing project matches this file. "
|
||||||
|
f"Create a new project first using the create_project tool, "
|
||||||
|
f"then link all extracted records to its id.{name_hint}"
|
||||||
|
)
|
||||||
|
# Ensure the LLM has the project tools available.
|
||||||
|
if "projects" not in domains:
|
||||||
|
domains = ["projects"] + domains
|
||||||
else:
|
else:
|
||||||
project_context = f"These files belong to project ID: {group_key}. Use this project_id when creating records."
|
project_context = (
|
||||||
|
f"This file belongs to project ID: {project_id}. "
|
||||||
|
"Use this project_id when creating records."
|
||||||
|
)
|
||||||
|
|
||||||
file_list_str = "\n".join(f"- {fp}" for fp in file_paths)
|
system_prompt = _PROCESSING_SYSTEM_PROMPT.format(
|
||||||
|
existing_context=existing_context,
|
||||||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
|
||||||
data_types=", ".join(config.data_types),
|
|
||||||
project_context=project_context,
|
project_context=project_context,
|
||||||
file_list=file_list_str,
|
data_types=", ".join(domains),
|
||||||
custom_prompt_section=custom_section,
|
custom_prompt_section=custom_section,
|
||||||
)
|
)
|
||||||
|
|
||||||
items_processed += len(file_paths)
|
processing_tools = _build_processing_tools(domains)
|
||||||
|
|
||||||
try:
|
|
||||||
result_text = await _run_agent_with_tools(
|
result_text = await _run_agent_with_tools(
|
||||||
system_prompt=processing_prompt,
|
system_prompt=system_prompt,
|
||||||
user_message="Process the listed files now.",
|
user_message=(
|
||||||
|
f"Process this file and extract relevant information.\n\n"
|
||||||
|
f"File: {file_path}\n\nContent:\n{file_content}"
|
||||||
|
),
|
||||||
tools=processing_tools,
|
tools=processing_tools,
|
||||||
max_steps=_MAX_PROCESSING_STEPS,
|
max_steps=_MAX_PROCESSING_STEPS,
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: run=%s group=%s processing_result=%s",
|
"agent_runner: run=%s file=%r result=%s",
|
||||||
run_id,
|
run_id,
|
||||||
group_key,
|
file_path,
|
||||||
result_text[:500],
|
result_text[:200],
|
||||||
)
|
)
|
||||||
# Count created items by scanning tool call results.
|
|
||||||
# The tools themselves handle creation; we estimate from the
|
|
||||||
# summary. A more precise count would require intercepting
|
|
||||||
# tool results, but the summary is sufficient for the run log.
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors.append(f"Processing error for group '{group_key}': {exc}")
|
errors.append(f"Error processing '{file_path}': {exc}")
|
||||||
logger.error(
|
logger.error(
|
||||||
"agent_runner: run=%s group=%s processing failed: %s",
|
"agent_runner: run=%s file=%r failed: %s", run_id, file_path, exc
|
||||||
run_id,
|
|
||||||
group_key,
|
|
||||||
exc,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors.append(f"Agent run failed: {exc}")
|
errors.append(f"Agent run failed: {exc}")
|
||||||
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
|
logger.error("agent_runner: run=%s failed: %s", run_id, exc)
|
||||||
finally:
|
finally:
|
||||||
|
_running_agents.discard(agent_id)
|
||||||
clear_client_executor()
|
clear_client_executor()
|
||||||
|
|
||||||
# ── Finalise ────────────────────────────────────────────────────
|
# ── Finalise ────────────────────────────────────────────────────
|
||||||
@@ -503,9 +743,6 @@ async def run_local_agent(
|
|||||||
items_processed=items_processed,
|
items_processed=items_processed,
|
||||||
items_created=items_created,
|
items_created=items_created,
|
||||||
errors=errors,
|
errors=errors,
|
||||||
update_config_last_run=False,
|
|
||||||
config_id=config.id,
|
|
||||||
config_type="local",
|
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: run=%s done status=%s processed=%d errors=%d",
|
"agent_runner: run=%s done status=%s processed=%d errors=%d",
|
||||||
@@ -515,8 +752,7 @@ async def run_local_agent(
|
|||||||
len(errors),
|
len(errors),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Notify the Electron client that the run is complete so it can close
|
# Notify Electron that the run is complete.
|
||||||
# the run record in its local SQLite.
|
|
||||||
if run_context and device_mgr.is_online(user_id):
|
if run_context and device_mgr.is_online(user_id):
|
||||||
try:
|
try:
|
||||||
await device_mgr.send_frame(user_id, {
|
await device_mgr.send_frame(user_id, {
|
||||||
@@ -525,12 +761,13 @@ async def run_local_agent(
|
|||||||
"status": final_status,
|
"status": final_status,
|
||||||
})
|
})
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("agent_runner: run=%s failed to send run_complete: %s", run_id, exc)
|
logger.warning(
|
||||||
|
"agent_runner: run=%s failed to send run_complete: %s", run_id, exc
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ── Cloud agent runner ─────────────────────────────────────────────────────
|
# ── Cloud agent runner ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
# Default lookback window when an agent has never run before.
|
|
||||||
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
|
_CLOUD_DEFAULT_LOOKBACK_DAYS: int = 7
|
||||||
|
|
||||||
|
|
||||||
@@ -544,8 +781,7 @@ async def run_cloud_agent(
|
|||||||
|
|
||||||
Steps:
|
Steps:
|
||||||
|
|
||||||
1. Verify the user's device is online — results are pushed to Electron
|
1. Verify the user's device is online.
|
||||||
via WS tool-call frames. If no device is connected, abort.
|
|
||||||
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
|
2. Decrypt the stored OAuth token from ``config.oauth_token_encrypted``.
|
||||||
3. Instantiate the provider client (Gmail or MS Graph).
|
3. Instantiate the provider client (Gmail or MS Graph).
|
||||||
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
|
4. Fetch messages/emails since ``config.last_run_at`` (or 7 days ago for
|
||||||
@@ -598,11 +834,7 @@ async def run_cloud_agent(
|
|||||||
try:
|
try:
|
||||||
provider = get_provider(config.provider, credentials_info)
|
provider = get_provider(config.provider, credentials_info)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
await _finalize_run(
|
await _finalize_run(run_log, status="error", errors=[str(exc)])
|
||||||
run_log,
|
|
||||||
status="error",
|
|
||||||
errors=[str(exc)],
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# ── 4. Fetch messages ─────────────────────────────────────────────
|
# ── 4. Fetch messages ─────────────────────────────────────────────
|
||||||
@@ -636,9 +868,7 @@ async def run_cloud_agent(
|
|||||||
raw_messages = []
|
raw_messages = []
|
||||||
except RuntimeError as exc:
|
except RuntimeError as exc:
|
||||||
logger.error(
|
logger.error(
|
||||||
"agent_runner: provider fetch failed for cloud agent %s: %s",
|
"agent_runner: provider fetch failed for cloud agent %s: %s", config.id, exc
|
||||||
config.id,
|
|
||||||
exc,
|
|
||||||
)
|
)
|
||||||
await _finalize_run(
|
await _finalize_run(
|
||||||
run_log,
|
run_log,
|
||||||
@@ -664,9 +894,11 @@ async def run_cloud_agent(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
processing_tools = _build_processing_tools(config.data_types)
|
processing_tools = _build_processing_tools(config.data_types)
|
||||||
custom_section = ""
|
custom_section = (
|
||||||
if config.prompt_template:
|
f"User instructions:\n{config.prompt_template}"
|
||||||
custom_section = f"User instructions:\n{config.prompt_template}"
|
if config.prompt_template
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
|
||||||
for msg in raw_messages:
|
for msg in raw_messages:
|
||||||
content_text = msg.as_text
|
content_text = msg.as_text
|
||||||
@@ -674,7 +906,7 @@ async def run_cloud_agent(
|
|||||||
continue
|
continue
|
||||||
items_processed += 1
|
items_processed += 1
|
||||||
|
|
||||||
processing_prompt = _PROCESSING_BASE_PROMPT.format(
|
processing_prompt = _CLOUD_PROCESSING_PROMPT.format(
|
||||||
data_types=", ".join(config.data_types),
|
data_types=", ".join(config.data_types),
|
||||||
project_context="Determine the appropriate project from the message context.",
|
project_context="Determine the appropriate project from the message context.",
|
||||||
file_list=f"Message from {config.provider} (id: {msg.id})",
|
file_list=f"Message from {config.provider} (id: {msg.id})",
|
||||||
@@ -708,7 +940,11 @@ async def run_cloud_agent(
|
|||||||
await db.commit()
|
await db.commit()
|
||||||
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
|
logger.debug("agent_runner: refreshed OAuth token persisted for agent %s", config.id)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("agent_runner: failed to persist refreshed token for agent %s: %s", config.id, exc)
|
logger.warning(
|
||||||
|
"agent_runner: failed to persist refreshed token for agent %s: %s",
|
||||||
|
config.id,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
|
||||||
# ── 8. Finalise ────────────────────────────────────────────────────
|
# ── 8. Finalise ────────────────────────────────────────────────────
|
||||||
if errors and items_created == 0:
|
if errors and items_created == 0:
|
||||||
@@ -749,12 +985,6 @@ async def trigger_pending_runs(
|
|||||||
"""Dispatch any overdue agent runs after an Electron device connects.
|
"""Dispatch any overdue agent runs after an Electron device connects.
|
||||||
|
|
||||||
Called as a background task from the device WS endpoint on ``device_hello``.
|
Called as a background task from the device WS endpoint on ``device_hello``.
|
||||||
|
|
||||||
Scheduling rules:
|
|
||||||
|
|
||||||
* **Local agents**: only triggered when ``config.device_id == device_id``.
|
|
||||||
* **Cloud agents**: triggered on any connected device (no device binding).
|
|
||||||
* Runs execute **sequentially** to avoid flooding the WS connection.
|
|
||||||
"""
|
"""
|
||||||
logger.info(
|
logger.info(
|
||||||
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
|
"agent_runner: pending-run scan skipped for user=%s device=%s (client-owned agent config)",
|
||||||
@@ -778,11 +1008,7 @@ async def _finalize_run(
|
|||||||
config_id: str | None = None,
|
config_id: str | None = None,
|
||||||
config_type: str | None = None,
|
config_type: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Persist the run outcome and optionally update ``LocalAgentConfig.last_run_at``.
|
"""Persist the run outcome and optionally update ``last_run_at`` on the config."""
|
||||||
|
|
||||||
Uses a fresh DB session so this is safe to call from background tasks
|
|
||||||
after the original request session has closed.
|
|
||||||
"""
|
|
||||||
now = datetime.now(timezone.utc)
|
now = datetime.now(timezone.utc)
|
||||||
try:
|
try:
|
||||||
async with async_session() as db:
|
async with async_session() as db:
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
# Adiuva — Architettura Microservizi
|
# Adiuva — Architettura Microservizi (MVP)
|
||||||
|
|
||||||
## Panoramica
|
## Panoramica
|
||||||
|
|
||||||
Il monolite attuale viene suddiviso in **5 servizi** + un **API Gateway**, orchestrati con Docker Compose e raggiungibili tramite dominio su Cloudflare.
|
Il monolite viene suddiviso in **4 servizi MVP** + un **API Gateway (Traefik)**, orchestrati con Docker Compose su un singolo VPS raggiungibile via Cloudflare.
|
||||||
|
|
||||||
|
> **Fuori dall'MVP**: Storage Service (S3/backup CRUD) e Plugin Service (marketplace). Verranno aggiunti come servizi indipendenti in una fase successiva.
|
||||||
|
|
||||||
```
|
```
|
||||||
┌──────────────┐
|
┌──────────────┐
|
||||||
@@ -14,20 +16,21 @@ Il monolite attuale viene suddiviso in **5 servizi** + un **API Gateway**, orche
|
|||||||
│ Traefik │
|
│ Traefik │
|
||||||
│ API Gateway │
|
│ API Gateway │
|
||||||
│ (routing, │
|
│ (routing, │
|
||||||
│ TLS term.) │
|
│ TLS, rate │
|
||||||
|
│ limiting) │
|
||||||
└──────┬───────┘
|
└──────┬───────┘
|
||||||
│
|
│
|
||||||
┌──────────┬───────────┼───────────┬──────────┐
|
┌──────────┬───────────┼───────────┐
|
||||||
│ │ │ │ │
|
│ │ │ │
|
||||||
┌─────▼────┐ ┌───▼───┐ ┌────▼────┐ ┌────▼───┐ ┌───▼─────┐
|
┌─────▼────┐ ┌───▼───┐ ┌────▼────┐ ┌────▼───┐
|
||||||
│ Auth │ │ Chat │ │ Storage │ │Billing │ │ Plugins │
|
│ Auth │ │ Chat │ │ Agent │ │Billing │
|
||||||
│ Service │ │Service│ │ Service │ │Service │ │ Service │
|
│ Service │ │Service│ │ Service │ │Service │
|
||||||
└─────┬────┘ └───┬───┘ └────┬────┘ └────┬───┘ └───┬─────┘
|
└─────┬────┘ └───┬───┘ └────┬────┘ └────┬───┘
|
||||||
│ │ │ │ │
|
│ │ │ │
|
||||||
┌─────▼──────────▼──────────▼───────────▼──────────▼─────┐
|
┌─────▼──────────▼──────────▼───────────▼────┐
|
||||||
│ Infrastruttura │
|
│ Infrastruttura │
|
||||||
│ PostgreSQL │ Redis │ MinIO (S3) │ Qdrant │ (Pinecone) │
|
│ PostgreSQL │ Redis │ Qdrant │
|
||||||
└────────────────────────────────────────────────────────┘
|
└─────────────────────────────────────────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
@@ -83,46 +86,68 @@ def verify_token(token: str) -> dict:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 1.2 Chat Service (`chat-service`) ⭐ Core
|
### 1.2 Chat Service (`chat-service`) ⭐ Real-time
|
||||||
|
|
||||||
**Responsabilità**: WebSocket device, home chat, floating chat, agent runner, memory middleware, agent setup journeys.
|
**Responsabilità**: WebSocket device connection, home chat, floating chat, memory middleware, streaming LLM responses verso il client.
|
||||||
|
|
||||||
| Endpoint originale | Tipo |
|
Questo servizio gestisce la **connessione persistente** con l'app Electron e le interazioni **real-time** dell'utente (chat home, floating chat). È il proprietario della WebSocket.
|
||||||
|
|
||||||
|
| Endpoint | Tipo |
|
||||||
|---|---|
|
|---|---|
|
||||||
| `/api/v1/ws/device` | WebSocket |
|
| `/api/v1/ws/device` | WebSocket (connessione persistente) |
|
||||||
| `/api/v1/chat` | POST (REST fallback) |
|
| `/api/v1/chat` | POST (REST fallback) |
|
||||||
| `/api/v1/agents/catalog` | GET |
|
|
||||||
| `/api/v1/agents/can-create` | POST |
|
|
||||||
| `/api/v1/agents/trigger` | POST |
|
|
||||||
|
|
||||||
**Moduli inclusi**: `deep_agent`, `agent_runner`, `agent_registry`, `memory_middleware`, `ws_context`, `device_manager`, tutti gli agent tools (`task_agent`, `project_agent`, `note_agent`, `timeline_agent`, `filesystem_agent`).
|
**Moduli inclusi**: `deep_agent`, `memory_middleware`, `ws_context`, `device_manager` (Redis-backed), `output_formatter`, `llm`, tutti gli agent tools (`task_agent`, `project_agent`, `note_agent`, `timeline_agent`).
|
||||||
|
|
||||||
**Questa è la bestia che deve scalare orizzontalmente** — è il servizio più CPU/memory intensive (LLM calls, tool loops, WebSocket persistenti).
|
**Perché separato dall'Agent Service**: Il Chat Service tiene la WebSocket aperta e risponde in tempo reale (streaming). Scalare aggiungendo repliche è semplice con sticky sessions + Redis pub/sub per il cross-instance routing dei tool_call.
|
||||||
|
|
||||||
|
**Scaling**: 2–N repliche. Sticky cookies per le WS + Redis per cross-instance.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 1.3 Storage Service (`storage-service`)
|
### 1.3 Agent Service (`agent-service`) ⭐ Batch
|
||||||
|
|
||||||
**Responsabilità**: CRUD record crittografati su S3, vector operations, backup.
|
**Responsabilità**: Batch agent processing (directory scanning, file classification, entity extraction), agent setup journeys, agent configuration CRUD.
|
||||||
|
|
||||||
| Endpoint originale | Metodo |
|
Questo servizio gestisce i processi **long-running** e **CPU-intensive**: scansione filesystem, classificazione file con LLM, estrazione entità in batch. Non possiede la WebSocket — comunica con il device dell'utente tramite **Redis pub/sub** passando per il Chat Service.
|
||||||
|
|
||||||
|
| Endpoint | Tipo |
|
||||||
|---|---|
|
|---|---|
|
||||||
| `/api/v1/storage/records` | POST / GET |
|
| `/api/v1/agents/catalog` | GET |
|
||||||
| `/api/v1/storage/records/{id}` | GET / PUT / DELETE |
|
| `/api/v1/agents/can-create` | POST |
|
||||||
| `/api/v1/vectors/upsert` | POST |
|
| `/api/v1/agents/trigger` | POST |
|
||||||
| `/api/v1/vectors/search` | POST |
|
| `/api/v1/agents/journey/start` | POST (o WS relay) |
|
||||||
| `/api/v1/vectors/embed` | POST |
|
| `/api/v1/agents/journey/message` | POST (o WS relay) |
|
||||||
| `/api/v1/vectors` | DELETE |
|
|
||||||
| `/api/v1/backup` | PUT / GET / DELETE |
|
|
||||||
| `/api/v1/backup/history` | GET |
|
|
||||||
|
|
||||||
**Scaling**: 2–3 repliche. I/O bound (S3, Qdrant). Stateless.
|
**Moduli inclusi**: `agent_runner`, `agent_registry`, `filesystem_agent`, `llm`.
|
||||||
|
|
||||||
|
**Flusso tool-call cross-service** (l'Agent Service non ha la WS):
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────────────┐ ┌──────────────┐ ┌──────────┐
|
||||||
|
│ Agent Service│ │ Redis │ │ Chat │
|
||||||
|
│ (batch run) │ │ │ │ Service │
|
||||||
|
│ │ │ │ │ (ha WS) │
|
||||||
|
│ 1. Needs to │ PUBLISH │ │ SUBSCRIBE │ │
|
||||||
|
│ read file ├───────────►│tool_call:u123├───────────►│ 2. Invia │
|
||||||
|
│ from │ │ │ │ al │
|
||||||
|
│ device │ │ │ │ device│
|
||||||
|
│ │ │ │ │ via WS│
|
||||||
|
│ │ SUBSCRIBE │ │ PUBLISH │ │
|
||||||
|
│ 4. Riceve ◄────────────┤tool_result:id│◄───────────┤ 3. Device│
|
||||||
|
│ risultato │ │ │ │ reply │
|
||||||
|
└──────────────┘ └──────────────┘ └──────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
**Scaling**: 1–N repliche. Completamente stateless, scala indipendentemente dalla chat. Ogni replica processa batch job diversi. Può essere scalato a 0 se non ci sono agent attivi (risparmio risorse).
|
||||||
|
|
||||||
|
**Vantaggio dello split**: Se 50 utenti triggerano agenti batch contemporaneamente, il Chat Service non ne risente — le risposte real-time rimangono veloci.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 1.4 Billing Service (`billing-service`)
|
### 1.4 Billing Service (`billing-service`)
|
||||||
|
|
||||||
**Responsabilità**: Stripe checkout, webhook, subscription management, tier enforcement.
|
**Responsabilità**: Stripe checkout, webhook, subscription management.
|
||||||
|
|
||||||
| Endpoint originale | Metodo |
|
| Endpoint originale | Metodo |
|
||||||
|---|---|
|
|---|---|
|
||||||
@@ -132,31 +157,125 @@ def verify_token(token: str) -> dict:
|
|||||||
|
|
||||||
**Database**: Tabelle `subscriptions` (schema `billing`).
|
**Database**: Tabelle `subscriptions` (schema `billing`).
|
||||||
|
|
||||||
**Comunicazione inter-servizio**: Quando Stripe invia un webhook e il tier cambia, il Billing Service pubblica un evento su **Redis pub/sub** channel `tier_changed:{user_id}`. L'Auth Service aggiorna il campo `tier` nella tabella users (oppure i servizi leggono il tier direttamente dal JWT, aggiornato al prossimo refresh).
|
**Comunicazione inter-servizio**: Quando Stripe invia un webhook e il tier cambia, il Billing Service pubblica un evento su **Redis pub/sub** channel `tier_changed:{user_id}`. L'Auth Service aggiorna il campo `tier` nella tabella users. Al prossimo token refresh il JWT conterrà il tier aggiornato.
|
||||||
|
|
||||||
**Scaling**: 1 replica sufficiente. Basso traffico.
|
**Scaling**: 1 replica sufficiente. Basso traffico.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 1.5 Plugin Service (`plugin-service`)
|
### 1.5 Servizi esclusi dall'MVP
|
||||||
|
|
||||||
**Responsabilità**: Marketplace, installazione plugin, revenue split.
|
I seguenti servizi verranno aggiunti post-MVP come servizi indipendenti:
|
||||||
|
|
||||||
| Endpoint originale | Metodo |
|
| Servizio | Responsabilità | Note |
|
||||||
|---|---|
|
|---|---|---|
|
||||||
| `/api/v1/plugins` | GET |
|
| **Storage Service** | S3 blobs CRUD, vector ops, backup | Le funzionalità vector/embed possono restare nel Chat Service per il MVP |
|
||||||
| `/api/v1/plugins/{id}` | GET |
|
| **Plugin Service** | Marketplace, install, revenue split | Feature non critica per il lancio |
|
||||||
| `/api/v1/plugins/{id}/install` | POST / DELETE |
|
|
||||||
|
|
||||||
**Database**: Tabelle `plugins`, `plugin_installations`, `revenue_events`.
|
|
||||||
|
|
||||||
**Scaling**: 1 replica. Basso traffico.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. WebSocket con Scaling Orizzontale — Il Problema Chiave
|
## 2. Tier Check — Dove e Come
|
||||||
|
|
||||||
### Il problema attuale
|
Il tier dell'utente (free/pro/power/team) determina rate-limiting, quote e accesso a funzionalità. Con i microservizi, **ogni servizio controlla il tier autonomamente** senza chiamare l'Auth Service.
|
||||||
|
|
||||||
|
### Strategia: Tier nel JWT
|
||||||
|
|
||||||
|
L'Auth Service include il `tier` come claim nel JWT al momento del login/refresh:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"sub": "user_123",
|
||||||
|
"tier": "pro",
|
||||||
|
"exp": 1742515200,
|
||||||
|
"iat": 1742511600
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Ogni servizio:
|
||||||
|
1. Decodifica il JWT con la chiave pubblica (già lo fa per l'auth)
|
||||||
|
2. Legge `payload["tier"]` — **zero chiamate extra**
|
||||||
|
3. Applica le sue regole di enforcement localmente
|
||||||
|
|
||||||
|
```python
|
||||||
|
# shared/auth.py — dependency FastAPI condivisa
|
||||||
|
from fastapi import Depends, HTTPException, Request
|
||||||
|
from jose import jwt
|
||||||
|
|
||||||
|
PUBLIC_KEY = ...
|
||||||
|
|
||||||
|
class CurrentUser:
|
||||||
|
def __init__(self, user_id: str, tier: str):
|
||||||
|
self.user_id = user_id
|
||||||
|
self.tier = tier
|
||||||
|
|
||||||
|
async def get_current_user(request: Request) -> CurrentUser:
|
||||||
|
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
|
||||||
|
payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
|
||||||
|
return CurrentUser(user_id=payload["sub"], tier=payload["tier"])
|
||||||
|
|
||||||
|
def require_tier(*allowed_tiers: str):
|
||||||
|
"""Dependency che blocca se il tier non è tra quelli ammessi."""
|
||||||
|
async def check(user: CurrentUser = Depends(get_current_user)):
|
||||||
|
if user.tier not in allowed_tiers:
|
||||||
|
raise HTTPException(403, "Tier insufficient")
|
||||||
|
return user
|
||||||
|
return check
|
||||||
|
```
|
||||||
|
|
||||||
|
### Cosa succede quando il tier cambia (upgrade/downgrade)?
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────────┐ Stripe webhook ┌──────────┐ tier_changed ┌──────────┐
|
||||||
|
│ Stripe │ ─────────────────►│ Billing │ ───────────────►│ Auth │
|
||||||
|
│ │ │ Service │ (Redis pub/sub) │ Service │
|
||||||
|
└──────────┘ └──────────┘ └────┬─────┘
|
||||||
|
│
|
||||||
|
UPDATE users
|
||||||
|
SET tier = 'power'
|
||||||
|
│
|
||||||
|
Al prossimo /refresh
|
||||||
|
il JWT conterrà tier='power'
|
||||||
|
```
|
||||||
|
|
||||||
|
**Latenza del cambio**: Il tier si propaga al prossimo token refresh (tipicamente 15–30 min, o il client può forzare un refresh immediato dopo il checkout). Per il billing webhook, il downgrade può essere forzato invalidando il refresh token su Redis → il client è obbligato a ri-autenticarsi.
|
||||||
|
|
||||||
|
### Dove si applica in ciascun servizio
|
||||||
|
|
||||||
|
| Servizio | Enforcement |
|
||||||
|
|---|---|
|
||||||
|
| **Auth Service** | Nessuno (è lui che scrive il tier) |
|
||||||
|
| **Chat Service** | Rate-limit per tier (req/min), quota messaggi |
|
||||||
|
| **Agent Service** | Max agent configs, max runs/day, max concurrent batches |
|
||||||
|
| **Billing Service** | Nessuno (gestisce i tier, non li consuma) |
|
||||||
|
|
||||||
|
### Rate-limit distribuito via Redis
|
||||||
|
|
||||||
|
Poiché ogni servizio ha le sue repliche, il rate-limiting deve essere **condiviso** via Redis:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# shared/middleware/rate_limit.py
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
class DistributedRateLimiter:
|
||||||
|
def __init__(self, redis: aioredis.Redis):
|
||||||
|
self._redis = redis
|
||||||
|
|
||||||
|
async def check(self, user_id: str, tier: str, service: str) -> bool:
|
||||||
|
limits = {"free": 20, "pro": 60, "power": 120, "team": 200}
|
||||||
|
max_req = limits.get(tier, 20)
|
||||||
|
key = f"rate:{service}:{user_id}"
|
||||||
|
|
||||||
|
pipe = self._redis.pipeline()
|
||||||
|
pipe.incr(key)
|
||||||
|
pipe.expire(key, 60)
|
||||||
|
count, _ = await pipe.execute()
|
||||||
|
|
||||||
|
return count <= max_req
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. WebSocket con Scaling Orizzontale — Il Problema Chiave
|
||||||
|
|
||||||
`DeviceConnectionManager` è un **singleton in-memory**:
|
`DeviceConnectionManager` è un **singleton in-memory**:
|
||||||
|
|
||||||
@@ -354,7 +473,7 @@ class RedisDeviceManager:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 3. Struttura Directory Proposta
|
## 4. Struttura Directory Proposta (MVP)
|
||||||
|
|
||||||
```
|
```
|
||||||
adiuva-api/
|
adiuva-api/
|
||||||
@@ -364,7 +483,7 @@ adiuva-api/
|
|||||||
│ ├── auth.py # JWT verification (chiave pubblica)
|
│ ├── auth.py # JWT verification (chiave pubblica)
|
||||||
│ ├── schemas.py # Pydantic schemas condivisi
|
│ ├── schemas.py # Pydantic schemas condivisi
|
||||||
│ ├── middleware/
|
│ ├── middleware/
|
||||||
│ │ ├── rate_limit.py
|
│ │ ├── rate_limit.py # DistributedRateLimiter (Redis)
|
||||||
│ │ └── sanitizer.py
|
│ │ └── sanitizer.py
|
||||||
│ └── models/
|
│ └── models/
|
||||||
│ └── base.py # SQLAlchemy base condivisa
|
│ └── base.py # SQLAlchemy base condivisa
|
||||||
@@ -390,42 +509,45 @@ adiuva-api/
|
|||||||
│ ├── main.py
|
│ ├── main.py
|
||||||
│ ├── config.py
|
│ ├── config.py
|
||||||
│ ├── db.py
|
│ ├── db.py
|
||||||
│ ├── models.py # agent_run_logs, memory_*
|
│ ├── models.py # memory_*
|
||||||
│ ├── routes/
|
│ ├── routes/
|
||||||
│ │ ├── device_ws.py
|
│ │ ├── device_ws.py # WS connection owner
|
||||||
│ │ ├── chat.py
|
│ │ └── chat.py # REST fallback
|
||||||
│ │ └── agents.py
|
|
||||||
│ ├── core/
|
│ ├── core/
|
||||||
│ │ ├── device_manager.py # RedisDeviceManager
|
│ │ ├── device_manager.py # RedisDeviceManager
|
||||||
│ │ ├── deep_agent.py
|
│ │ ├── deep_agent.py # Home + floating chat
|
||||||
│ │ ├── agent_runner.py
|
|
||||||
│ │ ├── agent_registry.py
|
|
||||||
│ │ ├── memory_middleware.py
|
│ │ ├── memory_middleware.py
|
||||||
│ │ ├── ws_context.py
|
│ │ ├── ws_context.py
|
||||||
│ │ ├── output_formatter.py
|
│ │ ├── output_formatter.py
|
||||||
│ │ └── llm.py
|
│ │ └── llm.py
|
||||||
│ └── agents/
|
│ └── agents/ # Tool definitions (used by deep_agent)
|
||||||
│ ├── task_agent.py
|
│ ├── task_agent.py
|
||||||
│ ├── project_agent.py
|
│ ├── project_agent.py
|
||||||
│ ├── note_agent.py
|
│ ├── note_agent.py
|
||||||
│ ├── timeline_agent.py
|
│ └── timeline_agent.py
|
||||||
│ └── filesystem_agent.py
|
|
||||||
│
|
│
|
||||||
├── storage-service/
|
├── agent-service/
|
||||||
│ ├── Dockerfile
|
│ ├── Dockerfile
|
||||||
│ ├── requirements.txt
|
│ ├── requirements.txt
|
||||||
│ └── app/
|
│ └── app/
|
||||||
│ ├── main.py
|
│ ├── main.py
|
||||||
│ ├── config.py
|
│ ├── config.py
|
||||||
│ ├── db.py
|
│ ├── db.py
|
||||||
│ ├── models.py # storage_records, backup_metadata
|
│ ├── models.py # agent_run_logs, local/cloud_agent_configs
|
||||||
│ ├── routes/
|
│ ├── routes/
|
||||||
│ │ ├── storage.py
|
│ │ ├── agents.py # catalog, can-create, trigger
|
||||||
│ │ ├── vectors.py
|
│ │ └── agent_setup.py # journey start/message
|
||||||
│ │ └── backup.py
|
│ ├── core/
|
||||||
│ └── services/
|
│ │ ├── agent_runner.py # Batch classify → process
|
||||||
│ ├── blob_store.py
|
│ │ ├── agent_registry.py
|
||||||
│ └── vector_store.py
|
│ │ ├── redis_executor.py # execute_on_client via Redis pub/sub
|
||||||
|
│ │ └── llm.py
|
||||||
|
│ └── agents/
|
||||||
|
│ ├── task_agent.py # Tool definitions (batch context)
|
||||||
|
│ ├── project_agent.py
|
||||||
|
│ ├── note_agent.py
|
||||||
|
│ ├── timeline_agent.py
|
||||||
|
│ └── filesystem_agent.py
|
||||||
│
|
│
|
||||||
├── billing-service/
|
├── billing-service/
|
||||||
│ ├── Dockerfile
|
│ ├── Dockerfile
|
||||||
@@ -441,26 +563,18 @@ adiuva-api/
|
|||||||
│ ├── stripe_service.py
|
│ ├── stripe_service.py
|
||||||
│ └── tier_manager.py
|
│ └── tier_manager.py
|
||||||
│
|
│
|
||||||
├── plugin-service/
|
|
||||||
│ ├── Dockerfile
|
|
||||||
│ ├── requirements.txt
|
|
||||||
│ └── app/
|
|
||||||
│ ├── main.py
|
|
||||||
│ ├── config.py
|
|
||||||
│ ├── db.py
|
|
||||||
│ ├── models.py # plugins, installations, revenue
|
|
||||||
│ └── routes/
|
|
||||||
│ └── plugins.py
|
|
||||||
│
|
|
||||||
└── infra/
|
└── infra/
|
||||||
├── traefik/
|
├── traefik/
|
||||||
│ └── traefik.yml
|
│ └── traefik.yml
|
||||||
|
├── keys/
|
||||||
|
│ ├── jwt_private.pem # Solo auth-service
|
||||||
|
│ └── jwt_public.pem # Tutti i servizi
|
||||||
└── alembic/ # Migrazioni condivise o per-servizio
|
└── alembic/ # Migrazioni condivise o per-servizio
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 4. Docker Compose — Configurazione Completa
|
## 5. Docker Compose — Configurazione MVP
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
# docker-compose.yml
|
# docker-compose.yml
|
||||||
@@ -478,14 +592,14 @@ services:
|
|||||||
- "--providers.docker.exposedbydefault=false"
|
- "--providers.docker.exposedbydefault=false"
|
||||||
- "--entrypoints.web.address=:80"
|
- "--entrypoints.web.address=:80"
|
||||||
- "--entrypoints.websecure.address=:443"
|
- "--entrypoints.websecure.address=:443"
|
||||||
# Cloudflare gestisce TLS, Traefik riceve HTTP dal proxy
|
|
||||||
- "--entrypoints.web.http.redirections.entrypoint.to=websecure"
|
- "--entrypoints.web.http.redirections.entrypoint.to=websecure"
|
||||||
ports:
|
ports:
|
||||||
- "80:80"
|
- "80:80"
|
||||||
- "443:443"
|
- "443:443"
|
||||||
- "8080:8080" # Dashboard Traefik
|
- "8080:8080" # Dashboard Traefik (disabilitare in prod)
|
||||||
volumes:
|
volumes:
|
||||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||||
|
- ./infra/certs:/certs:ro
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
@@ -498,10 +612,12 @@ services:
|
|||||||
env_file: .env
|
env_file: .env
|
||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
||||||
|
REDIS_URL: redis://redis:6379
|
||||||
JWT_PRIVATE_KEY_FILE: /run/secrets/jwt_private_key
|
JWT_PRIVATE_KEY_FILE: /run/secrets/jwt_private_key
|
||||||
SERVICE_NAME: auth
|
SERVICE_NAME: auth
|
||||||
secrets:
|
secrets:
|
||||||
- jwt_private_key
|
- jwt_private_key
|
||||||
|
- jwt_public_key
|
||||||
labels:
|
labels:
|
||||||
- "traefik.enable=true"
|
- "traefik.enable=true"
|
||||||
- "traefik.http.routers.auth.rule=PathPrefix(`/api/v1/auth`)"
|
- "traefik.http.routers.auth.rule=PathPrefix(`/api/v1/auth`)"
|
||||||
@@ -509,14 +625,16 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
db:
|
db:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
# Chat Service (scalabile, N repliche)
|
# Chat Service — Real-time WS + Chat (scalabile)
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
chat-service:
|
chat-service:
|
||||||
build: ./chat-service
|
build: ./chat-service
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 3
|
replicas: 2
|
||||||
env_file: .env
|
env_file: .env
|
||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
||||||
@@ -527,8 +645,8 @@ services:
|
|||||||
- jwt_public_key
|
- jwt_public_key
|
||||||
labels:
|
labels:
|
||||||
- "traefik.enable=true"
|
- "traefik.enable=true"
|
||||||
# REST routes
|
# REST chat endpoint
|
||||||
- "traefik.http.routers.chat.rule=PathPrefix(`/api/v1/chat`) || PathPrefix(`/api/v1/agents`)"
|
- "traefik.http.routers.chat.rule=PathPrefix(`/api/v1/chat`)"
|
||||||
- "traefik.http.services.chat.loadbalancer.server.port=8000"
|
- "traefik.http.services.chat.loadbalancer.server.port=8000"
|
||||||
# WebSocket route con sticky session
|
# WebSocket route con sticky session
|
||||||
- "traefik.http.routers.ws.rule=PathPrefix(`/api/v1/ws`)"
|
- "traefik.http.routers.ws.rule=PathPrefix(`/api/v1/ws`)"
|
||||||
@@ -543,26 +661,29 @@ services:
|
|||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
# Storage Service (2 repliche)
|
# Agent Service — Batch processing (scalabile indipendentemente)
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
storage-service:
|
agent-service:
|
||||||
build: ./storage-service
|
build: ./agent-service
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 2
|
replicas: 2
|
||||||
env_file: .env
|
env_file: .env
|
||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
||||||
|
REDIS_URL: redis://redis:6379
|
||||||
JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key
|
JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key
|
||||||
SERVICE_NAME: storage
|
SERVICE_NAME: agent
|
||||||
secrets:
|
secrets:
|
||||||
- jwt_public_key
|
- jwt_public_key
|
||||||
labels:
|
labels:
|
||||||
- "traefik.enable=true"
|
- "traefik.enable=true"
|
||||||
- "traefik.http.routers.storage.rule=PathPrefix(`/api/v1/storage`) || PathPrefix(`/api/v1/vectors`) || PathPrefix(`/api/v1/backup`)"
|
- "traefik.http.routers.agents.rule=PathPrefix(`/api/v1/agents`)"
|
||||||
- "traefik.http.services.storage.loadbalancer.server.port=8000"
|
- "traefik.http.services.agents.loadbalancer.server.port=8000"
|
||||||
depends_on:
|
depends_on:
|
||||||
db:
|
db:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
# Billing Service (1 replica)
|
# Billing Service (1 replica)
|
||||||
@@ -589,28 +710,6 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
|
||||||
# Plugin Service (1 replica)
|
|
||||||
# ══════════════════════════════════════════════════════════
|
|
||||||
plugin-service:
|
|
||||||
build: ./plugin-service
|
|
||||||
deploy:
|
|
||||||
replicas: 1
|
|
||||||
env_file: .env
|
|
||||||
environment:
|
|
||||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva
|
|
||||||
JWT_PUBLIC_KEY_FILE: /run/secrets/jwt_public_key
|
|
||||||
SERVICE_NAME: plugins
|
|
||||||
secrets:
|
|
||||||
- jwt_public_key
|
|
||||||
labels:
|
|
||||||
- "traefik.enable=true"
|
|
||||||
- "traefik.http.routers.plugins.rule=PathPrefix(`/api/v1/plugins`)"
|
|
||||||
- "traefik.http.services.plugins.loadbalancer.server.port=8000"
|
|
||||||
depends_on:
|
|
||||||
db:
|
|
||||||
condition: service_healthy
|
|
||||||
|
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
# Infrastruttura
|
# Infrastruttura
|
||||||
# ══════════════════════════════════════════════════════════
|
# ══════════════════════════════════════════════════════════
|
||||||
@@ -641,19 +740,6 @@ services:
|
|||||||
retries: 5
|
retries: 5
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
minio:
|
|
||||||
image: minio/minio:latest
|
|
||||||
command: server /data --console-address ":9001"
|
|
||||||
ports:
|
|
||||||
- "9000:9000"
|
|
||||||
- "9001:9001"
|
|
||||||
environment:
|
|
||||||
MINIO_ROOT_USER: minioadmin
|
|
||||||
MINIO_ROOT_PASSWORD: minioadmin
|
|
||||||
volumes:
|
|
||||||
- minio_data:/data
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
qdrant:
|
qdrant:
|
||||||
image: qdrant/qdrant:latest
|
image: qdrant/qdrant:latest
|
||||||
volumes:
|
volumes:
|
||||||
@@ -669,22 +755,21 @@ secrets:
|
|||||||
volumes:
|
volumes:
|
||||||
postgres_data:
|
postgres_data:
|
||||||
redis_data:
|
redis_data:
|
||||||
minio_data:
|
|
||||||
qdrant_data:
|
qdrant_data:
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 5. Configurazione Cloudflare + VPS
|
## 6. Configurazione Cloudflare + VPS
|
||||||
|
|
||||||
### 5.1 DNS
|
### 6.1 DNS
|
||||||
|
|
||||||
```
|
```
|
||||||
api.tuodominio.com → A record → IP del VPS
|
api.tuodominio.com → A record → IP del VPS
|
||||||
→ Proxy: ON (orange cloud)
|
→ Proxy: ON (orange cloud)
|
||||||
```
|
```
|
||||||
|
|
||||||
### 5.2 Cloudflare Settings
|
### 6.2 Cloudflare Settings
|
||||||
|
|
||||||
| Setting | Valore | Motivo |
|
| Setting | Valore | Motivo |
|
||||||
|---------|--------|--------|
|
|---------|--------|--------|
|
||||||
@@ -693,7 +778,7 @@ api.tuodominio.com → A record → IP del VPS
|
|||||||
| Proxy timeout | **100s** (Enterprise) o default | Le LLM calls possono durare 30s+ |
|
| Proxy timeout | **100s** (Enterprise) o default | Le LLM calls possono durare 30s+ |
|
||||||
| Under Attack Mode | Off (attivare se necessario) | |
|
| Under Attack Mode | Off (attivare se necessario) | |
|
||||||
|
|
||||||
### 5.3 TLS sul VPS
|
### 6.3 TLS sul VPS
|
||||||
|
|
||||||
Due opzioni:
|
Due opzioni:
|
||||||
- **Opzione A (consigliata)**: Cloudflare Origin Certificate → montato in Traefik
|
- **Opzione A (consigliata)**: Cloudflare Origin Certificate → montato in Traefik
|
||||||
@@ -711,7 +796,7 @@ tls:
|
|||||||
keyFile: /certs/origin-key.pem
|
keyFile: /certs/origin-key.pem
|
||||||
```
|
```
|
||||||
|
|
||||||
### 5.4 Rete VPS
|
### 6.4 Rete VPS
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# UFW firewall — solo Cloudflare può raggiungere le porte 80/443
|
# UFW firewall — solo Cloudflare può raggiungere le porte 80/443
|
||||||
@@ -726,9 +811,9 @@ ufw enable
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 6. Comunicazione Inter-Servizio
|
## 7. Comunicazione Inter-Servizio
|
||||||
|
|
||||||
### 6.1 Pattern: Event Bus via Redis Pub/Sub
|
### 7.1 Redis Pub/Sub — Event Bus
|
||||||
|
|
||||||
```
|
```
|
||||||
┌──────────┐ tier_changed:user_123 ┌──────────┐
|
┌──────────┐ tier_changed:user_123 ┌──────────┐
|
||||||
@@ -736,87 +821,55 @@ ufw enable
|
|||||||
│ Service │ │ Service │
|
│ Service │ │ Service │
|
||||||
└──────────┘ └──────────┘
|
└──────────┘ └──────────┘
|
||||||
|
|
||||||
┌──────────┐ agent_triggered:user_123 ┌──────────┐
|
┌──────────┐ tool_call:user_123 ┌──────────┐
|
||||||
│ Chat │ ◄──────────────────────── │ Any │
|
│ Agent │ ────────────────────────► │ Chat │
|
||||||
│ Service │ │ Service │
|
│ Service │ │ Service │
|
||||||
└──────────┘ └──────────┘
|
│ (batch) │ ◄────────────────────────│ (ha WS) │
|
||||||
|
└──────────┘ tool_result:{call_id} └──────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
### 6.2 Pattern: HTTP Sincrono (per query semplici)
|
### 7.2 Health Checks e Service Discovery
|
||||||
|
|
||||||
Il Chat Service può avere bisogno del tier dell'utente per il rate-limiting degli agent. Due strategie:
|
|
||||||
|
|
||||||
- **Strategia A (preferita)**: Il tier è nel JWT. All'aggiornamento, il Billing Service forza token refresh invalidando i vecchi token su Redis.
|
|
||||||
- **Strategia B**: Il Chat Service chiama `http://auth-service:8000/internal/user/{id}/tier` (rete Docker interna, non esposta).
|
|
||||||
|
|
||||||
### 6.3 Health Checks e Service Discovery
|
|
||||||
|
|
||||||
Traefik gestisce automaticamente il service discovery via Docker labels. I servizi non devono conoscersi tra loro — comunicano solo via:
|
Traefik gestisce automaticamente il service discovery via Docker labels. I servizi non devono conoscersi tra loro — comunicano solo via:
|
||||||
- **Redis pub/sub** (eventi asincroni)
|
- **Redis pub/sub** (tool-call cross-instance, tier events)
|
||||||
- **Redis hash** (stato condiviso, es. `ws:connections`)
|
- **Redis hash** (stato condiviso: `ws:connections`, rate-limit counters)
|
||||||
- **PostgreSQL** (dati persistenti condivisi)
|
- **PostgreSQL** (dati persistenti condivisi)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 7. Piano di Migrazione Incrementale
|
## 8. Piano di Migrazione Incrementale (MVP)
|
||||||
|
|
||||||
### Fase 1 — Preparazione (senza rompere nulla)
|
### Fase 1 — Preparazione (nel monolite attuale)
|
||||||
1. Aggiungere Redis al `docker-compose.yml` attuale
|
1. Aggiungere Redis al `docker-compose.yml` attuale
|
||||||
2. Migrare JWT da HS256 → RS256 (backward-compatible: accetta entrambi)
|
2. Migrare JWT da HS256 → RS256 (backward-compatible: accetta entrambi per un periodo)
|
||||||
3. Implementare `RedisDeviceManager` come drop-in replacement
|
3. Implementare `RedisDeviceManager` come drop-in replacement del singleton in-memory
|
||||||
4. Estrarre `shared/` con auth verification, schemas, middleware
|
4. Estrarre `shared/` con auth verification, schemas, middleware
|
||||||
|
|
||||||
### Fase 2 — Primo split: Auth Service
|
### Fase 2 — Auth Service (primo split)
|
||||||
1. Estrarre `auth.py` routes + models in `auth-service/`
|
1. Estrarre `auth.py` routes + models in `auth-service/`
|
||||||
2. Verificare che i JWT firmati da `auth-service` vengano validati dal monolite
|
2. Verificare che i JWT firmati da `auth-service` vengano validati dal monolite
|
||||||
3. Aggiornare Traefik per routare `/api/v1/auth/*` al nuovo servizio
|
3. Aggiungere Traefik e routare `/api/v1/auth/*` al nuovo servizio
|
||||||
4. Il monolite continua a servire tutto il resto
|
4. Il monolite continua a servire tutto il resto
|
||||||
|
|
||||||
### Fase 3 — Storage + Billing + Plugins
|
### Fase 3 — Billing Service
|
||||||
1. Servizi stateless e senza WebSocket → facili da estrarre
|
1. Estrarre billing routes, Stripe service, tier manager
|
||||||
2. Estrarre uno alla volta, testare, routare via Traefik
|
2. Configurare Redis pub/sub per `tier_changed` events
|
||||||
3. Il monolite diventa sempre più magro
|
3. Routare via Traefik
|
||||||
|
|
||||||
### Fase 4 — Chat Service (il più delicato)
|
### Fase 4 — Split Chat + Agent (il più delicato)
|
||||||
1. Il monolite residuo **diventa** il Chat Service
|
1. Il monolite residuo contiene WS + chat + agents
|
||||||
2. Rimuovere i route migrati, tenere solo WS + chat + agents
|
2. Separare Agent Service: estrarre `agent_runner`, `agent_registry`, `agent_setup`, route `/agents/*`
|
||||||
3. Testare lo scaling a 2+ istanze con `RedisDeviceManager`
|
3. Implementare `redis_executor.py` nell'Agent Service per tool-call via Redis
|
||||||
4. Verificare tool-call cross-instance
|
4. Il Chat Service resta proprietario della WS e sottoscrive i canali `tool_call:{user_id}`
|
||||||
|
5. Testare: trigger agent dall'Agent Service → tool_call via Redis → Chat Service → WS → device → risposta
|
||||||
|
|
||||||
### Fase 5 — Cleanup
|
### Fase 5 — Scaling test
|
||||||
1. Rimuovere il monolite originale
|
1. Scalare Chat Service a 2 repliche, verificare sticky sessions
|
||||||
2. CI/CD pipeline per build/push separati
|
2. Scalare Agent Service a 2 repliche, verificare batch processing distribuito
|
||||||
3. Monitoring (Prometheus + Grafana) per ogni servizio
|
3. Monitoring (Prometheus + Grafana) per ogni servizio
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 8. Rate Limiting Distribuito
|
|
||||||
|
|
||||||
Il middleware attuale usa un contatore in-memory per il rate-limiting. Con i microservizi:
|
|
||||||
|
|
||||||
```python
|
|
||||||
# shared/middleware/rate_limit.py
|
|
||||||
import redis.asyncio as aioredis
|
|
||||||
|
|
||||||
class DistributedRateLimiter:
|
|
||||||
def __init__(self, redis: aioredis.Redis):
|
|
||||||
self._redis = redis
|
|
||||||
|
|
||||||
async def check(self, user_id: str, tier: str) -> bool:
|
|
||||||
limits = {"free": 20, "pro": 60, "power": 120, "team": 200}
|
|
||||||
max_req = limits.get(tier, 20)
|
|
||||||
key = f"rate:{user_id}"
|
|
||||||
|
|
||||||
pipe = self._redis.pipeline()
|
|
||||||
pipe.incr(key)
|
|
||||||
pipe.expire(key, 60) # Finestra di 60 secondi
|
|
||||||
count, _ = await pipe.execute()
|
|
||||||
|
|
||||||
return count <= max_req
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 9. Monitoraggio e Logging
|
## 9. Monitoraggio e Logging
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
@@ -845,35 +898,44 @@ Ogni servizio espone `/metrics` (Prometheus) e scrive log strutturati (JSON) rac
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 10. Sizing VPS Minimo Consigliato
|
## 10. Sizing VPS Minimo Consigliato (MVP)
|
||||||
|
|
||||||
| Componente | CPU | RAM | Note |
|
| Componente | CPU | RAM | Note |
|
||||||
|---|---|---|---|
|
|---|---|---|---|
|
||||||
| Traefik | 0.25 | 128MB | |
|
| Traefik | 0.25 | 128MB | |
|
||||||
| Auth Service ×2 | 0.25 ×2 | 128MB ×2 | |
|
| Auth Service ×2 | 0.25 ×2 | 128MB ×2 | Stateless, leggero |
|
||||||
| Chat Service ×2 | 1.0 ×2 | 1GB ×2 | Il più pesante (LLM calls) |
|
| Chat Service ×2 | 1.0 ×2 | 1GB ×2 | WS + streaming LLM |
|
||||||
| Storage Service ×2 | 0.5 ×2 | 256MB ×2 | I/O bound |
|
| Agent Service ×2 | 0.75 ×2 | 512MB ×2 | Batch LLM, CPU-bound |
|
||||||
| Billing Service | 0.25 | 128MB | |
|
| Billing Service | 0.25 | 128MB | |
|
||||||
| Plugin Service | 0.25 | 128MB | |
|
|
||||||
| PostgreSQL | 1.0 | 1GB | |
|
| PostgreSQL | 1.0 | 1GB | |
|
||||||
| Redis | 0.25 | 256MB | |
|
| Redis | 0.25 | 256MB | |
|
||||||
| Qdrant | 0.5 | 512MB | |
|
| Qdrant | 0.5 | 512MB | |
|
||||||
| MinIO | 0.25 | 256MB | |
|
| **Totale MVP** | **~5.5 vCPU** | **~5 GB** | |
|
||||||
| **Totale** | **~6 vCPU** | **~5.5 GB** | |
|
|
||||||
|
|
||||||
**Raccomandazione**: VPS con **8 vCPU / 16 GB RAM** per avere margine. Hetzner CPX41 (~€30/mese) o equivalente.
|
**Raccomandazione**: VPS con **8 vCPU / 16 GB RAM** per avere margine. Hetzner CPX41 (~€30/mese) o equivalente. Senza Storage/Plugin si risparmia ~1 vCPU e 512MB rispetto alla versione completa.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Riepilogo Decisioni Architetturali
|
## Riepilogo Architettura MVP
|
||||||
|
|
||||||
|
| Servizio | Repliche | Proprietario di |
|
||||||
|
|---|---|---|
|
||||||
|
| **Traefik** | 1 | Routing, TLS, sticky sessions |
|
||||||
|
| **Auth Service** | 2 | JWT RS256, registrazione, login, profilo |
|
||||||
|
| **Chat Service** | 2–N | WebSocket, home/floating chat, streaming |
|
||||||
|
| **Agent Service** | 2–N | Batch processing, directory scan, agent setup |
|
||||||
|
| **Billing Service** | 1 | Stripe, subscriptions, tier management |
|
||||||
|
|
||||||
| Decisione | Scelta | Motivazione |
|
| Decisione | Scelta | Motivazione |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
| API Gateway | Traefik | Nativo Docker, WebSocket support, service discovery automatico |
|
| API Gateway | Traefik | Nativo Docker, WebSocket support, service discovery automatico |
|
||||||
| JWT | RS256 (asimmetrico) | Verifica distribuita senza contattare Auth Service |
|
| JWT | RS256 (asimmetrico) | Verifica distribuita senza contattare Auth Service |
|
||||||
| WebSocket scaling | Redis pub/sub + registry | Cross-instance tool-call routing |
|
| Tier check | Claim nel JWT | Ogni servizio verifica localmente, zero roundtrip |
|
||||||
| Rate limiting | Redis contatori | Distribuito, sliding window |
|
| WebSocket scaling | Redis pub/sub + sticky cookies | Cross-instance tool-call routing |
|
||||||
| Service communication | Redis pub/sub + HTTP interno | Asincrono per eventi, sincrono per query |
|
| Chat ↔ Agent split | Servizi separati | Batch CPU-bound non impatta real-time chat |
|
||||||
| Database | PostgreSQL condiviso (un DB, schema separation opzionale) | Semplicità; split DB futuro facile |
|
| Agent → Device comms | Redis pub/sub via Chat Service | Agent non possiede la WS, usa un relay |
|
||||||
| TLS | Cloudflare Origin Certificate | Zero maintenance, trust Cloudflare |
|
| Rate limiting | Redis contatori distribuiti | Sliding window condivisa tra repliche |
|
||||||
|
| Database | PostgreSQL condiviso | Semplicità MVP; split DB futuro facile |
|
||||||
|
| TLS | Cloudflare Origin Certificate | Zero maintenance |
|
||||||
| Orchestrazione | Docker Compose | Sufficiente per un singolo VPS |
|
| Orchestrazione | Docker Compose | Sufficiente per un singolo VPS |
|
||||||
|
| Storage / Plugin | Post-MVP | Non critici per il lancio |
|
||||||
|
|||||||
Reference in New Issue
Block a user