22 KiB
Local Agent V2 — Piano Implementativo
Riferimento architetturale:
local_agent_v2_mem.md
Panoramica
Il Local Agent V2 sostituisce il flusso a 3 call LLM (classification + processing separati) con un'architettura a 2 fasi:
- Detect + Preprocess (Python puro, zero LLM) — identifica il tipo di contenuto e lo pulisce
- Single LLM call (classify + extract + create) — una sola call agentiva con tool calling
Langfuse: Scoring + Prompt Management (hot-swap)
Ogni step include un test set con eval che inviano score a Langfuse. I prompt sono gestiti da Langfuse Prompt Management — modificabili dalla UI senza toccare codice. Ogni score è collegato alla versione esatta del prompt che lo ha prodotto, permettendo confronto A/B tra versioni.
Workflow iterativo:
- Scrivi/modifica il prompt nella UI di Langfuse (es.
unified_processingv3) - Lancia gli eval:
pytest tests/test_agent_runner_v2.py -k eval - Vedi in Langfuse: prompt v3 → score 0.6
- Modifica il prompt → v4
- Ri-lancia gli eval → prompt v4 → score 0.9
- Promuovi v4 a
productionlabel
Prompt Langfuse da creare (con fallback hardcoded nel codice):
| Nome Langfuse | Usato in | Descrizione |
|---|---|---|
unified_processing |
Step 2 (runner) | Prompt unico: classify + extract + create |
journey_system_v2 |
Step 4 (journey) | Journey chatbot → produce AgentConfig JSON |
Pattern di scoring con prompt version linking:
from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback
def run_eval_with_prompt(prompt_name: str, fallback: str, eval_name: str, run_fn):
"""Esegue un eval collegando score ↔ prompt version."""
lf = get_langfuse()
template, prompt_obj = get_prompt_or_fallback(prompt_name, fallback)
# Crea trace per l'eval
trace = lf.trace(name=f"eval-{eval_name}") if lf else None
# Esegui la call LLM dentro una generation linkata al prompt
if lf and trace:
with lf.start_as_current_observation(
as_type="generation",
name=eval_name,
prompt=prompt_obj, # ← linka alla versione esatta del prompt
trace_id=trace.id,
) as gen:
result, score = run_fn(template)
gen.update(output=str(result))
else:
result, score = run_fn(template)
# Score collegato al trace → visibile per prompt version in Langfuse
if lf and trace:
lf.score(
trace_id=trace.id,
name=eval_name,
value=score,
data_type="NUMERIC",
)
lf.flush()
return result, score
In Langfuse vedrai:
Prompt: unified_processing
├── v3 (2026-04-05) → avg score: 0.62 (12 evals)
├── v4 (2026-04-07) → avg score: 0.85 (12 evals) ← production
└── v5 (2026-04-08) → avg score: 0.91 (12 evals) ← candidate
Step 1 — Preprocessor: email HTML handler ✅ DONE
Step 3 — Model e schema: prompt_template → agent_config ✅ DONE
Aggiunto in parallelo a Step 2 come prerequisito:
app/schemas.py:ContentTypeConfig,AgentConfigapp/models.py:agent_config: JSON(nullable, accanto aprompt_template)alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py
Step 2 — Refactor agent_runner.py: nuovo flusso per file ✅ DONE
File da creare:
app/core/preprocessors/__init__.py— registry, detect, dispatchapp/core/preprocessors/base.py— dataclassPreprocessResult, classe baseapp/core/preprocessors/email_html.py— BeautifulSoup handler
Cosa fa:
detect_content_type(filename, raw_content) -> str— heuristic basata su extension + pattern nel contenutopreprocess(content_type, raw_content) -> PreprocessResult— dispatch al handler correttoPreprocessResult:{ content_type, clean_text, metadata: {subject, from, to, date, ...} }
Handler email_html:
- Strip
<style>,<script>, HTML tags → testo pulito (BeautifulSoup) - Estrai metadata: Subject, From, To, Date (da
<meta>, header pattern, o content heuristic) - Split thread: identifica quote markers (
>,On ... wrote:,---Original Message---) → isola l'ultimo messaggio - Fallback: se non riesce a splittare, restituisce tutto il testo pulito
Handler fallback (generic):
- Strip HTML tags se presenti
- Restituisce testo as-is con metadata minime (filename, extension)
Dipendenze da aggiungere:
beautifulsoup4(già probabilmente installata, verificare)lxml(parser veloce per BS4, opzionale)
Test set — Step 1
File: tests/test_preprocessors.py
| # | Test case | Input | Expected | Score name |
|---|---|---|---|---|
| 1.1 | Detect email HTML | .html con From:, To:, Subject: |
content_type == "email_html" |
preprocess.detect_email |
| 1.2 | Detect generic HTML | .html con <nav>, <main> |
content_type == "generic_html" |
preprocess.detect_generic |
| 1.3 | Detect plain text | .txt |
content_type == "plain_text" |
preprocess.detect_text |
| 1.4 | Detect unknown | .xyz binario |
content_type == "unknown" |
preprocess.detect_unknown |
| 1.5 | Email: strip HTML | Email con <style>, CSS inline |
clean_text senza tag HTML |
preprocess.email_strip |
| 1.6 | Email: extract metadata | Email con Subject/From/Date | metadata corretti | preprocess.email_metadata |
| 1.7 | Email: split thread | Email con 3 risposte nested | clean_text = solo ultimo msg |
preprocess.email_thread |
| 1.8 | Email: singolo messaggio | Email senza thread | clean_text = intero body |
preprocess.email_single |
| 1.9 | Email: HTML pesante | Email con molto CSS/table layout | testo leggibile estratto | preprocess.email_heavy_html |
| 1.10 | Fallback: file sconosciuto | File binario | clean_text con fallback |
preprocess.fallback |
Eval con Langfuse:
@pytest.mark.asyncio
async def test_email_html_strip(sample_email_html):
lf = get_langfuse()
trace = lf.trace(name="eval-preprocess-email-strip") if lf else None
result = preprocess("email_html", sample_email_html)
# Assertions
has_no_tags = "<" not in result.clean_text
has_content = len(result.clean_text) > 50
ratio = len(result.clean_text) / len(sample_email_html) # compression ratio
score = 1.0 if (has_no_tags and has_content and ratio < 0.5) else 0.0
if trace:
lf.score(trace_id=trace.id, name="preprocess.email_strip", value=score,
comment=f"ratio={ratio:.2f}, len={len(result.clean_text)}")
lf.flush()
assert has_no_tags
assert has_content
Criteri di successo: tutti i 10 test passano, score medio ≥ 0.9
Step 2 — Refactor agent_runner.py: nuovo flusso per file ✅ DONE
File da modificare:
app/core/agent_runner.py
Cosa cambia:
- Rimuovere
_classify_file()(Step 1 LLM separato) - Rimuovere
_BATCH_FILE_CLASSIFIER_PROMPT - Aggiungere import del preprocessor
- Nuovo flusso in
run_local_agent():
for file_path in file_paths:
# 1. Leggi file raw
raw_content = await execute_on_client(action="read_file_content", ...)
# 2. Detect + Preprocess (Python, zero LLM)
content_type = detect_content_type(file_path, raw_content)
preprocessed = preprocess(content_type, raw_content)
# 3. Fetch prompt da Langfuse (hot-swappable dalla UI) con fallback locale
template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
extraction_rules = _get_extraction_rules(config.agent_config, content_type)
system_prompt = template.format(
extraction_rules=extraction_rules,
global_rules="\n".join(config.agent_config.get("global_rules", [])),
projects_list=_format_projects(projects),
data_types=", ".join(config.data_types),
filename=os.path.basename(file_path),
metadata_section=_format_metadata(preprocessed.metadata),
no_match_behavior=_get_no_match_behavior(config.agent_config),
)
# 4. Single LLM call con tools (classify + extract + create)
# La generation è linkata al prompt_obj → score visibili per versione
user_message = _build_user_message(file_path, preprocessed)
result = await _run_agent_with_tools(
system_prompt=system_prompt,
user_message=user_message,
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
langfuse_prompt=prompt_obj, # ← linka alla versione del prompt
)
Prompt unified_processing (fallback locale, editabile da Langfuse UI):
You are a data extraction assistant for a freelance project management tool.
## Your process (follow this exact order)
### 1. Identify the project
File: {filename}
{metadata_section}
Existing projects:
{projects_list}
Match this file to an existing project using the filename and content.
If no project matches, {no_match_behavior}.
### 2. Check existing records
Once you identify the project, use list_tasks/list_notes/list_timelines
to see what already exists. NEVER create duplicates.
### 3. Extract and create/update
{extraction_rules}
### Rules
- Set isAiSuggested=1 on every new record
- Set projectId on every record
- Update existing records when a match is found by title/topic
{global_rules}
Fix items_created: contare i create_* tool calls nei risultati.
Test set — Step 2
File: tests/test_agent_runner_v2.py
| # | Test case | Input | Expected | Score name |
|---|---|---|---|---|
| 2.1 | Happy path: email → task | Email preprocessata con azione | create_task tool chiamato |
runner.email_to_task |
| 2.2 | Happy path: email → nota | Email informativa | create_note tool chiamato |
runner.email_to_note |
| 2.3 | Happy path: email → timeline | Email con data evento | create_timeline tool chiamato |
runner.email_to_timeline |
| 2.4 | Project matching: filename | File ProjectX_report.html |
progetto ProjectX selezionato | runner.project_filename |
| 2.5 | Project matching: contenuto | File con menzione progetto nel body | progetto corretto | runner.project_content |
| 2.6 | No project match → regola globale | File senza match progetto | comportamento da global_rules | runner.no_project |
| 2.7 | Deduplicazione | Task esistente + email simile | update_task, non create_task |
runner.dedup |
| 2.8 | items_created conteggio | 2 create + 1 update | items_created == 2 |
runner.items_count |
| 2.9 | Device offline | No device | status=error | runner.offline |
| 2.10 | File vuoto | Contenuto vuoto | skip senza errori | runner.empty_file |
Eval con Langfuse (prompt hot-swap + score per versione):
@pytest.mark.asyncio
async def test_email_to_task_e2e(mock_ws_executor):
lf = get_langfuse()
# Il prompt viene da Langfuse → puoi cambiarlo dalla UI e ri-lanciare il test
template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
trace = lf.trace(
name="eval-runner-email-to-task",
metadata={"step": "2", "prompt_version": getattr(prompt_obj, "version", "fallback")},
) if lf else None
config = _make_config(agent_config={
"content_types": [{
"id": "email_html",
"extraction_prompt": "Azione diretta → task. Informativa → nota."
}],
"global_rules": [],
"data_types": ["tasks", "notes"]
})
# Mock preprocessed email with action request
mock_file_content = "Subject: Fix the bug\nFrom: boss@co.com\n\nPlease fix the login bug by Friday."
tool_calls_made = []
# ... setup mock that captures tool calls ...
await run_local_agent(user_id, config, run_log, device_mgr)
created_tasks = [c for c in tool_calls_made if c["name"] == "create_task"]
score = 1.0 if len(created_tasks) == 1 else 0.0
title_match = 1.0 if any("bug" in c["args"].get("title", "").lower() for c in created_tasks) else 0.0
if trace:
# Score collegato al trace → Langfuse lo linka alla prompt version automaticamente
lf.score(trace_id=trace.id, name="runner.email_to_task", value=score,
comment=f"tasks_created={len(created_tasks)}")
lf.score(trace_id=trace.id, name="runner.email_to_task.title", value=title_match)
lf.flush()
assert score == 1.0
assert title_match == 1.0
Criteri di successo: tutti i 10 test passano, score medio ≥ 0.8
Step 3 — Model e schema: prompt_template → agent_config ✅ DONE (vedi sopra)
File da modificare:
app/models.py—LocalAgentConfig.prompt_template: Text→agent_config: JSONapp/schemas.py— Pydantic schema perAgentConfigalembic/versions/— nuova migrationapp/api/routes/agents.py— aggiornaretrigger_agent_runper leggereagent_config
Pydantic schema:
class ContentTypeConfig(BaseModel):
id: str
label: str = ""
detection_hint: str = ""
preprocessing: str = "generic" # nome handler: "email_html", "generic", ...
extraction_prompt: str
class AgentConfig(BaseModel):
content_types: list[ContentTypeConfig] = []
global_rules: list[str] = []
data_types: list[str] = []
Test set — Step 3
File: tests/test_agent_config_schema.py
| # | Test case | Input | Expected | Score name |
|---|---|---|---|---|
| 3.1 | Schema valida | JSON completo | parsing OK | schema.valid |
| 3.2 | Schema minima | Solo data_types |
default applicati | schema.minimal |
| 3.3 | Content type sconosciuto | preprocessing: "pdf" |
accettato (futuro) | schema.unknown_type |
| 3.4 | Migration up/down | Alembic migrate | nessun errore | schema.migration |
| 3.5 | Trigger con agent_config | POST /agents/trigger | config parsata | schema.trigger |
Criteri di successo: tutti i 5 test passano
Step 4 — Journey setup: output strutturato ✅ DONE
File da modificare:
app/api/routes/agent_setup.py—_JOURNEY_SYSTEM_PROMPTriscrittaapp/api/routes/agent_setup.py— parsing output JSON invece di marker di testo
Cosa cambia:
- Il journey produce un
AgentConfigJSON, non unprompt_templatein prosa - Il system prompt viene da Langfuse (
journey_system_v2) con fallback locale → modificabile dalla UI senza toccare codice per iterare sulla qualità del journey - Il system prompt istruisce l'LLM a:
- Esplorare la directory
- Identificare i tipi di contenuto presenti
- Per ogni tipo, chiedere all'utente le regole di estrazione
- Produrre un JSON strutturato conforme allo schema
AgentConfig
- I marker
PROMPT_TEMPLATE_START/ENDdiventanoAGENT_CONFIG_START/END - Il parsing estrae e valida JSON con Pydantic
- Ogni call LLM del journey è linkata al
prompt_obj→ score per versione
Test set — Step 4
File: tests/test_journey_v2.py
| # | Test case | Input | Expected | Score name |
|---|---|---|---|---|
| 4.1 | Journey start: esplora directory | Directory con email HTML | prima domanda pertinente | journey.start |
| 4.2 | Journey: produce JSON valido | 3-5 turni di conversazione | AgentConfig valido |
journey.valid_json |
| 4.3 | Journey: rileva email HTML | Directory con .html email |
content_type email_html presente |
journey.detect_email |
| 4.4 | Journey: regole custom utente | "crea solo note, no task" | extraction_prompt riflette la regola |
journey.custom_rules |
| 4.5 | Journey: global rules | "no progetto = no entità" | presente in global_rules |
journey.global_rules |
| 4.6 | Journey: nudge dopo max turns | Raggiunto limite turni | JSON prodotto comunque | journey.nudge |
Eval con Langfuse (esempio LLM-as-judge):
@pytest.mark.asyncio
async def test_journey_produces_valid_config(mock_ws_executor):
lf = get_langfuse()
trace = lf.trace(name="eval-journey-valid-config") if lf else None
# Simula journey completo: start + 3 messaggi
reply = await handle_journey_start(user_id, {
"agent_type": "local",
"directory": "/test/emails",
"data_types": ["tasks", "notes"],
})
# Simula risposte utente
for msg in ["They are email exports from Outlook", "Extract tasks from action items", "Yes, that looks correct"]:
reply = await handle_journey_message(user_id, {
"session_id": reply["session_id"],
"message": msg,
})
if reply.get("done"):
break
config_json = reply.get("agent_config")
is_valid = False
try:
parsed = AgentConfig.model_validate_json(config_json)
is_valid = len(parsed.content_types) > 0
except Exception:
pass
if trace:
lf.score(trace_id=trace.id, name="journey.valid_json", value=1.0 if is_valid else 0.0,
comment=f"config={config_json[:200] if config_json else 'None'}")
lf.flush()
assert is_valid
Criteri di successo: tutti i 6 test passano, score LLM ≥ 0.8
Step 5 — Frontend: Electron store + scheduler + UI ✅ DONE
File da modificare:
src/main/store.ts— campopromptTemplate→agentConfigsrc/main/agents/agent-scheduler.ts— passaagentConfigal triggersrc/renderer/components/settings/JourneyDialog.tsx— parsing JSON da replysrc/renderer/components/settings/LocalAgentConfigPanel.tsx— mostra configsrc/renderer/components/settings/types.ts— typeLocalAgentConfigaggiornatosrc/shared/api-types.ts— frame type aggiornato (se impatta WS)
Cosa cambia:
- Lo store salva
agentConfig: AgentConfig(oggetto) invece dipromptTemplate: string - Lo scheduler manda
agent_confignel body del trigger (noncustom_agent_prompt) - Il JourneyDialog riceve JSON e lo mostra in modo human-readable
- Il config panel mostra i content types configurati e le regole
Test set — Step 5
| # | Test case | Verifica | Score name |
|---|---|---|---|
| 5.1 | Store: salva/legge agentConfig | round-trip JSON | fe.store |
| 5.2 | Scheduler: passa config al trigger | body POST corretto | fe.scheduler |
| 5.3 | Journey: parsing reply JSON | agentConfig popolato |
fe.journey_parse |
Nota: test frontend sono manuali/Playwright. Score inviati solo per i test backend.
Criteri di successo: round-trip completo funzionante
Step 6 — Test end-to-end con file reali
File da creare:
tests/test_local_agent_e2e.pytests/fixtures/emails/— 5-10 email HTML di esempio (anonimizzate)
Scenari E2E:
| # | Scenario | Input | Expected | Score name |
|---|---|---|---|---|
| 6.1 | Email con azione → task | "Please review the PR by Friday" | task creato con dueDate | e2e.action_email |
| 6.2 | Email informativa → nota | "FYI: new policy effective May 1" | nota + timeline creati | e2e.info_email |
| 6.3 | Email thread nested | 4 livelli di reply | solo ultimo msg processato | e2e.thread |
| 6.4 | Newsletter → skip | Newsletter marketing | nessuna entità creata | e2e.newsletter_skip |
| 6.5 | Progetto da filename | ProjectX_update.html |
assegnato a ProjectX | e2e.project_filename |
| 6.6 | Progetto da contenuto | Email menziona "Project Alpha" | assegnato a Project Alpha | e2e.project_content |
| 6.7 | Nessun progetto + regola | No match + "no project = no entity" | nessuna entità creata | e2e.no_project_rule |
| 6.8 | Deduplicazione update | Task esiste + email simile | update, non create | e2e.dedup |
| 6.9 | Multi-entità da 1 email | Email con task + meeting date | task + timeline creati | e2e.multi_entity |
| 6.10 | Batch 5 file misti | 3 email + 1 newsletter + 1 info | 3 processati, 1 skippato, 1 nota | e2e.batch_mixed |
Eval con Langfuse (esempio con scoring multiplo):
@pytest.mark.asyncio
async def test_e2e_action_email(real_email_fixtures):
lf = get_langfuse()
trace = lf.trace(name="eval-e2e-action-email", metadata={"step": "6"}) if lf else None
# Setup completo: config → preprocess → LLM → tool calls
tool_calls = await run_full_pipeline(
file_path="fixtures/emails/action_request.html",
agent_config=STANDARD_EMAIL_CONFIG,
existing_projects=[{"id": "p1", "name": "Project Alpha"}],
)
# Score multipli per aspetto
scores = {
"task_created": 1.0 if any(c["name"] == "create_task" for c in tool_calls) else 0.0,
"correct_project": 1.0 if any(c["args"].get("project_id") == "p1" for c in tool_calls) else 0.0,
"has_due_date": 1.0 if any(c["args"].get("due_date", 0) > 0 for c in tool_calls) else 0.0,
"is_ai_suggested": 1.0 if any(c["args"].get("is_ai_suggested") == 1 for c in tool_calls) else 0.0,
}
if trace:
for name, value in scores.items():
lf.score(trace_id=trace.id, name=f"e2e.action_email.{name}", value=value)
lf.flush()
assert all(v == 1.0 for v in scores.values())
Criteri di successo: ≥ 8/10 test passano, score medio ≥ 0.8
Ordine di implementazione
Step 1 (preprocessor) ← nessuna dipendenza, partire qui
↓
Step 3 (model/schema) ← parallelo a Step 1
↓
Step 2 (agent_runner) ← dipende da Step 1 + Step 3
↓
Step 4 (journey setup) ← dipende da Step 3 (schema AgentConfig)
↓
Step 5 (frontend) ← dipende da Step 3 + Step 4
↓
Step 6 (E2E) ← dipende da tutto
Step 1 e 3 possono essere sviluppati in parallelo.
Riepilogo score Langfuse
| Step | Score prefix | # test | Soglia minima |
|---|---|---|---|
| 1 | preprocess.* |
10 | ≥ 0.9 |
| 2 | runner.* |
10 | ≥ 0.8 |
| 3 | schema.* |
5 | 1.0 (deterministici) |
| 4 | journey.* |
6 | ≥ 0.8 |
| 5 | fe.* |
3 | 1.0 (deterministici) |
| 6 | e2e.* |
10 | ≥ 0.8 |
Totale: 44 test, di cui ~26 con scoring LLM su Langfuse.