Files
workspace/docs/LOCAL_AGENT_V2_PLAN.md
Roberto Musso 1f1ce7d40e first commit
2026-04-08 22:55:08 +02:00

552 lines
22 KiB
Markdown

# Local Agent V2 — Piano Implementativo
> Riferimento architetturale: [`local_agent_v2_mem.md`](local_agent_v2_mem.md)
---
## Panoramica
Il Local Agent V2 sostituisce il flusso a 3 call LLM (classification + processing separati)
con un'architettura a 2 fasi:
1. **Detect + Preprocess** (Python puro, zero LLM) — identifica il tipo di contenuto e lo pulisce
2. **Single LLM call** (classify + extract + create) — una sola call agentiva con tool calling
### Langfuse: Scoring + Prompt Management (hot-swap)
Ogni step include un test set con eval che inviano score a Langfuse.
I **prompt sono gestiti da Langfuse Prompt Management** — modificabili dalla UI
senza toccare codice. Ogni score è collegato alla **versione esatta del prompt**
che lo ha prodotto, permettendo confronto A/B tra versioni.
**Workflow iterativo:**
1. Scrivi/modifica il prompt nella UI di Langfuse (es. `unified_processing` v3)
2. Lancia gli eval: `pytest tests/test_agent_runner_v2.py -k eval`
3. Vedi in Langfuse: prompt v3 → score 0.6
4. Modifica il prompt → v4
5. Ri-lancia gli eval → prompt v4 → score 0.9
6. Promuovi v4 a `production` label
**Prompt Langfuse da creare (con fallback hardcoded nel codice):**
| Nome Langfuse | Usato in | Descrizione |
|---|---|---|
| `unified_processing` | Step 2 (runner) | Prompt unico: classify + extract + create |
| `journey_system_v2` | Step 4 (journey) | Journey chatbot → produce AgentConfig JSON |
**Pattern di scoring con prompt version linking:**
```python
from app.core.langfuse_client import get_langfuse, get_prompt_or_fallback
def run_eval_with_prompt(prompt_name: str, fallback: str, eval_name: str, run_fn):
"""Esegue un eval collegando score ↔ prompt version."""
lf = get_langfuse()
template, prompt_obj = get_prompt_or_fallback(prompt_name, fallback)
# Crea trace per l'eval
trace = lf.trace(name=f"eval-{eval_name}") if lf else None
# Esegui la call LLM dentro una generation linkata al prompt
if lf and trace:
with lf.start_as_current_observation(
as_type="generation",
name=eval_name,
prompt=prompt_obj, # ← linka alla versione esatta del prompt
trace_id=trace.id,
) as gen:
result, score = run_fn(template)
gen.update(output=str(result))
else:
result, score = run_fn(template)
# Score collegato al trace → visibile per prompt version in Langfuse
if lf and trace:
lf.score(
trace_id=trace.id,
name=eval_name,
value=score,
data_type="NUMERIC",
)
lf.flush()
return result, score
```
**In Langfuse vedrai:**
```
Prompt: unified_processing
├── v3 (2026-04-05) → avg score: 0.62 (12 evals)
├── v4 (2026-04-07) → avg score: 0.85 (12 evals) ← production
└── v5 (2026-04-08) → avg score: 0.91 (12 evals) ← candidate
```
---
## Step 1 — Preprocessor: email HTML handler ✅ DONE
## Step 3 — Model e schema: `prompt_template` → `agent_config` ✅ DONE
Aggiunto in parallelo a Step 2 come prerequisito:
- `app/schemas.py`: `ContentTypeConfig`, `AgentConfig`
- `app/models.py`: `agent_config: JSON` (nullable, accanto a `prompt_template`)
- `alembic/versions/a3b9c0d1e2f3_add_agent_config_to_local_agents.py`
## Step 2 — Refactor `agent_runner.py`: nuovo flusso per file ✅ DONE
**File da creare:**
- `app/core/preprocessors/__init__.py` — registry, detect, dispatch
- `app/core/preprocessors/base.py` — dataclass `PreprocessResult`, classe base
- `app/core/preprocessors/email_html.py` — BeautifulSoup handler
**Cosa fa:**
- `detect_content_type(filename, raw_content) -> str` — heuristic basata su extension + pattern nel contenuto
- `preprocess(content_type, raw_content) -> PreprocessResult` — dispatch al handler corretto
- `PreprocessResult`: `{ content_type, clean_text, metadata: {subject, from, to, date, ...} }`
**Handler `email_html`:**
- Strip `<style>`, `<script>`, HTML tags → testo pulito (BeautifulSoup)
- Estrai metadata: Subject, From, To, Date (da `<meta>`, header pattern, o content heuristic)
- Split thread: identifica quote markers (`>`, `On ... wrote:`, `---Original Message---`) → isola l'ultimo messaggio
- Fallback: se non riesce a splittare, restituisce tutto il testo pulito
**Handler fallback (`generic`):**
- Strip HTML tags se presenti
- Restituisce testo as-is con metadata minime (filename, extension)
**Dipendenze da aggiungere:**
- `beautifulsoup4` (già probabilmente installata, verificare)
- `lxml` (parser veloce per BS4, opzionale)
### Test set — Step 1
**File:** `tests/test_preprocessors.py`
| # | Test case | Input | Expected | Score name |
|---|-----------|-------|----------|------------|
| 1.1 | Detect email HTML | `.html` con `From:`, `To:`, `Subject:` | `content_type == "email_html"` | `preprocess.detect_email` |
| 1.2 | Detect generic HTML | `.html` con `<nav>`, `<main>` | `content_type == "generic_html"` | `preprocess.detect_generic` |
| 1.3 | Detect plain text | `.txt` | `content_type == "plain_text"` | `preprocess.detect_text` |
| 1.4 | Detect unknown | `.xyz` binario | `content_type == "unknown"` | `preprocess.detect_unknown` |
| 1.5 | Email: strip HTML | Email con `<style>`, CSS inline | `clean_text` senza tag HTML | `preprocess.email_strip` |
| 1.6 | Email: extract metadata | Email con Subject/From/Date | metadata corretti | `preprocess.email_metadata` |
| 1.7 | Email: split thread | Email con 3 risposte nested | `clean_text` = solo ultimo msg | `preprocess.email_thread` |
| 1.8 | Email: singolo messaggio | Email senza thread | `clean_text` = intero body | `preprocess.email_single` |
| 1.9 | Email: HTML pesante | Email con molto CSS/table layout | testo leggibile estratto | `preprocess.email_heavy_html` |
| 1.10 | Fallback: file sconosciuto | File binario | `clean_text` con fallback | `preprocess.fallback` |
**Eval con Langfuse:**
```python
@pytest.mark.asyncio
async def test_email_html_strip(sample_email_html):
lf = get_langfuse()
trace = lf.trace(name="eval-preprocess-email-strip") if lf else None
result = preprocess("email_html", sample_email_html)
# Assertions
has_no_tags = "<" not in result.clean_text
has_content = len(result.clean_text) > 50
ratio = len(result.clean_text) / len(sample_email_html) # compression ratio
score = 1.0 if (has_no_tags and has_content and ratio < 0.5) else 0.0
if trace:
lf.score(trace_id=trace.id, name="preprocess.email_strip", value=score,
comment=f"ratio={ratio:.2f}, len={len(result.clean_text)}")
lf.flush()
assert has_no_tags
assert has_content
```
**Criteri di successo:** tutti i 10 test passano, score medio ≥ 0.9
---
## Step 2 — Refactor `agent_runner.py`: nuovo flusso per file ✅ DONE
**File da modificare:**
- `app/core/agent_runner.py`
**Cosa cambia:**
- Rimuovere `_classify_file()` (Step 1 LLM separato)
- Rimuovere `_BATCH_FILE_CLASSIFIER_PROMPT`
- Aggiungere import del preprocessor
- Nuovo flusso in `run_local_agent()`:
```python
for file_path in file_paths:
# 1. Leggi file raw
raw_content = await execute_on_client(action="read_file_content", ...)
# 2. Detect + Preprocess (Python, zero LLM)
content_type = detect_content_type(file_path, raw_content)
preprocessed = preprocess(content_type, raw_content)
# 3. Fetch prompt da Langfuse (hot-swappable dalla UI) con fallback locale
template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
extraction_rules = _get_extraction_rules(config.agent_config, content_type)
system_prompt = template.format(
extraction_rules=extraction_rules,
global_rules="\n".join(config.agent_config.get("global_rules", [])),
projects_list=_format_projects(projects),
data_types=", ".join(config.data_types),
filename=os.path.basename(file_path),
metadata_section=_format_metadata(preprocessed.metadata),
no_match_behavior=_get_no_match_behavior(config.agent_config),
)
# 4. Single LLM call con tools (classify + extract + create)
# La generation è linkata al prompt_obj → score visibili per versione
user_message = _build_user_message(file_path, preprocessed)
result = await _run_agent_with_tools(
system_prompt=system_prompt,
user_message=user_message,
tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS,
langfuse_prompt=prompt_obj, # ← linka alla versione del prompt
)
```
**Prompt `unified_processing` (fallback locale, editabile da Langfuse UI):**
```
You are a data extraction assistant for a freelance project management tool.
## Your process (follow this exact order)
### 1. Identify the project
File: {filename}
{metadata_section}
Existing projects:
{projects_list}
Match this file to an existing project using the filename and content.
If no project matches, {no_match_behavior}.
### 2. Check existing records
Once you identify the project, use list_tasks/list_notes/list_timelines
to see what already exists. NEVER create duplicates.
### 3. Extract and create/update
{extraction_rules}
### Rules
- Set isAiSuggested=1 on every new record
- Set projectId on every record
- Update existing records when a match is found by title/topic
{global_rules}
```
**Fix `items_created`:** contare i `create_*` tool calls nei risultati.
### Test set — Step 2
**File:** `tests/test_agent_runner_v2.py`
| # | Test case | Input | Expected | Score name |
|---|-----------|-------|----------|------------|
| 2.1 | Happy path: email → task | Email preprocessata con azione | `create_task` tool chiamato | `runner.email_to_task` |
| 2.2 | Happy path: email → nota | Email informativa | `create_note` tool chiamato | `runner.email_to_note` |
| 2.3 | Happy path: email → timeline | Email con data evento | `create_timeline` tool chiamato | `runner.email_to_timeline` |
| 2.4 | Project matching: filename | File `ProjectX_report.html` | progetto ProjectX selezionato | `runner.project_filename` |
| 2.5 | Project matching: contenuto | File con menzione progetto nel body | progetto corretto | `runner.project_content` |
| 2.6 | No project match → regola globale | File senza match progetto | comportamento da global_rules | `runner.no_project` |
| 2.7 | Deduplicazione | Task esistente + email simile | `update_task`, non `create_task` | `runner.dedup` |
| 2.8 | items_created conteggio | 2 create + 1 update | `items_created == 2` | `runner.items_count` |
| 2.9 | Device offline | No device | status=error | `runner.offline` |
| 2.10 | File vuoto | Contenuto vuoto | skip senza errori | `runner.empty_file` |
**Eval con Langfuse (prompt hot-swap + score per versione):**
```python
@pytest.mark.asyncio
async def test_email_to_task_e2e(mock_ws_executor):
lf = get_langfuse()
# Il prompt viene da Langfuse → puoi cambiarlo dalla UI e ri-lanciare il test
template, prompt_obj = get_prompt_or_fallback(
"unified_processing", _UNIFIED_PROCESSING_PROMPT
)
trace = lf.trace(
name="eval-runner-email-to-task",
metadata={"step": "2", "prompt_version": getattr(prompt_obj, "version", "fallback")},
) if lf else None
config = _make_config(agent_config={
"content_types": [{
"id": "email_html",
"extraction_prompt": "Azione diretta → task. Informativa → nota."
}],
"global_rules": [],
"data_types": ["tasks", "notes"]
})
# Mock preprocessed email with action request
mock_file_content = "Subject: Fix the bug\nFrom: boss@co.com\n\nPlease fix the login bug by Friday."
tool_calls_made = []
# ... setup mock that captures tool calls ...
await run_local_agent(user_id, config, run_log, device_mgr)
created_tasks = [c for c in tool_calls_made if c["name"] == "create_task"]
score = 1.0 if len(created_tasks) == 1 else 0.0
title_match = 1.0 if any("bug" in c["args"].get("title", "").lower() for c in created_tasks) else 0.0
if trace:
# Score collegato al trace → Langfuse lo linka alla prompt version automaticamente
lf.score(trace_id=trace.id, name="runner.email_to_task", value=score,
comment=f"tasks_created={len(created_tasks)}")
lf.score(trace_id=trace.id, name="runner.email_to_task.title", value=title_match)
lf.flush()
assert score == 1.0
assert title_match == 1.0
```
**Criteri di successo:** tutti i 10 test passano, score medio ≥ 0.8
---
## Step 3 — Model e schema: `prompt_template` → `agent_config` ✅ DONE (vedi sopra)
**File da modificare:**
- `app/models.py``LocalAgentConfig.prompt_template: Text``agent_config: JSON`
- `app/schemas.py` — Pydantic schema per `AgentConfig`
- `alembic/versions/` — nuova migration
- `app/api/routes/agents.py` — aggiornare `trigger_agent_run` per leggere `agent_config`
**Pydantic schema:**
```python
class ContentTypeConfig(BaseModel):
id: str
label: str = ""
detection_hint: str = ""
preprocessing: str = "generic" # nome handler: "email_html", "generic", ...
extraction_prompt: str
class AgentConfig(BaseModel):
content_types: list[ContentTypeConfig] = []
global_rules: list[str] = []
data_types: list[str] = []
```
### Test set — Step 3
**File:** `tests/test_agent_config_schema.py`
| # | Test case | Input | Expected | Score name |
|---|-----------|-------|----------|------------|
| 3.1 | Schema valida | JSON completo | parsing OK | `schema.valid` |
| 3.2 | Schema minima | Solo `data_types` | default applicati | `schema.minimal` |
| 3.3 | Content type sconosciuto | `preprocessing: "pdf"` | accettato (futuro) | `schema.unknown_type` |
| 3.4 | Migration up/down | Alembic migrate | nessun errore | `schema.migration` |
| 3.5 | Trigger con agent_config | POST /agents/trigger | config parsata | `schema.trigger` |
**Criteri di successo:** tutti i 5 test passano
---
## Step 4 — Journey setup: output strutturato ✅ DONE
**File da modificare:**
- `app/api/routes/agent_setup.py``_JOURNEY_SYSTEM_PROMPT` riscritta
- `app/api/routes/agent_setup.py` — parsing output JSON invece di marker di testo
**Cosa cambia:**
- Il journey produce un `AgentConfig` JSON, non un `prompt_template` in prosa
- Il system prompt viene da Langfuse (`journey_system_v2`) con fallback locale
**modificabile dalla UI senza toccare codice** per iterare sulla qualità del journey
- Il system prompt istruisce l'LLM a:
1. Esplorare la directory
2. Identificare i tipi di contenuto presenti
3. Per ogni tipo, chiedere all'utente le regole di estrazione
4. Produrre un JSON strutturato conforme allo schema `AgentConfig`
- I marker `PROMPT_TEMPLATE_START/END` diventano `AGENT_CONFIG_START/END`
- Il parsing estrae e valida JSON con Pydantic
- Ogni call LLM del journey è linkata al `prompt_obj` → score per versione
### Test set — Step 4
**File:** `tests/test_journey_v2.py`
| # | Test case | Input | Expected | Score name |
|---|-----------|-------|----------|------------|
| 4.1 | Journey start: esplora directory | Directory con email HTML | prima domanda pertinente | `journey.start` |
| 4.2 | Journey: produce JSON valido | 3-5 turni di conversazione | `AgentConfig` valido | `journey.valid_json` |
| 4.3 | Journey: rileva email HTML | Directory con `.html` email | content_type `email_html` presente | `journey.detect_email` |
| 4.4 | Journey: regole custom utente | "crea solo note, no task" | `extraction_prompt` riflette la regola | `journey.custom_rules` |
| 4.5 | Journey: global rules | "no progetto = no entità" | presente in `global_rules` | `journey.global_rules` |
| 4.6 | Journey: nudge dopo max turns | Raggiunto limite turni | JSON prodotto comunque | `journey.nudge` |
**Eval con Langfuse (esempio LLM-as-judge):**
```python
@pytest.mark.asyncio
async def test_journey_produces_valid_config(mock_ws_executor):
lf = get_langfuse()
trace = lf.trace(name="eval-journey-valid-config") if lf else None
# Simula journey completo: start + 3 messaggi
reply = await handle_journey_start(user_id, {
"agent_type": "local",
"directory": "/test/emails",
"data_types": ["tasks", "notes"],
})
# Simula risposte utente
for msg in ["They are email exports from Outlook", "Extract tasks from action items", "Yes, that looks correct"]:
reply = await handle_journey_message(user_id, {
"session_id": reply["session_id"],
"message": msg,
})
if reply.get("done"):
break
config_json = reply.get("agent_config")
is_valid = False
try:
parsed = AgentConfig.model_validate_json(config_json)
is_valid = len(parsed.content_types) > 0
except Exception:
pass
if trace:
lf.score(trace_id=trace.id, name="journey.valid_json", value=1.0 if is_valid else 0.0,
comment=f"config={config_json[:200] if config_json else 'None'}")
lf.flush()
assert is_valid
```
**Criteri di successo:** tutti i 6 test passano, score LLM ≥ 0.8
---
## Step 5 — Frontend: Electron store + scheduler + UI ✅ DONE
**File da modificare:**
- `src/main/store.ts` — campo `promptTemplate``agentConfig`
- `src/main/agents/agent-scheduler.ts` — passa `agentConfig` al trigger
- `src/renderer/components/settings/JourneyDialog.tsx` — parsing JSON da reply
- `src/renderer/components/settings/LocalAgentConfigPanel.tsx` — mostra config
- `src/renderer/components/settings/types.ts` — type `LocalAgentConfig` aggiornato
- `src/shared/api-types.ts` — frame type aggiornato (se impatta WS)
**Cosa cambia:**
- Lo store salva `agentConfig: AgentConfig` (oggetto) invece di `promptTemplate: string`
- Lo scheduler manda `agent_config` nel body del trigger (non `custom_agent_prompt`)
- Il JourneyDialog riceve JSON e lo mostra in modo human-readable
- Il config panel mostra i content types configurati e le regole
### Test set — Step 5
| # | Test case | Verifica | Score name |
|---|-----------|----------|------------|
| 5.1 | Store: salva/legge agentConfig | round-trip JSON | `fe.store` |
| 5.2 | Scheduler: passa config al trigger | body POST corretto | `fe.scheduler` |
| 5.3 | Journey: parsing reply JSON | `agentConfig` popolato | `fe.journey_parse` |
**Nota:** test frontend sono manuali/Playwright. Score inviati solo per i test backend.
**Criteri di successo:** round-trip completo funzionante
---
## Step 6 — Test end-to-end con file reali
**File da creare:**
- `tests/test_local_agent_e2e.py`
- `tests/fixtures/emails/` — 5-10 email HTML di esempio (anonimizzate)
**Scenari E2E:**
| # | Scenario | Input | Expected | Score name |
|---|----------|-------|----------|------------|
| 6.1 | Email con azione → task | "Please review the PR by Friday" | task creato con dueDate | `e2e.action_email` |
| 6.2 | Email informativa → nota | "FYI: new policy effective May 1" | nota + timeline creati | `e2e.info_email` |
| 6.3 | Email thread nested | 4 livelli di reply | solo ultimo msg processato | `e2e.thread` |
| 6.4 | Newsletter → skip | Newsletter marketing | nessuna entità creata | `e2e.newsletter_skip` |
| 6.5 | Progetto da filename | `ProjectX_update.html` | assegnato a ProjectX | `e2e.project_filename` |
| 6.6 | Progetto da contenuto | Email menziona "Project Alpha" | assegnato a Project Alpha | `e2e.project_content` |
| 6.7 | Nessun progetto + regola | No match + "no project = no entity" | nessuna entità creata | `e2e.no_project_rule` |
| 6.8 | Deduplicazione update | Task esiste + email simile | update, non create | `e2e.dedup` |
| 6.9 | Multi-entità da 1 email | Email con task + meeting date | task + timeline creati | `e2e.multi_entity` |
| 6.10 | Batch 5 file misti | 3 email + 1 newsletter + 1 info | 3 processati, 1 skippato, 1 nota | `e2e.batch_mixed` |
**Eval con Langfuse (esempio con scoring multiplo):**
```python
@pytest.mark.asyncio
async def test_e2e_action_email(real_email_fixtures):
lf = get_langfuse()
trace = lf.trace(name="eval-e2e-action-email", metadata={"step": "6"}) if lf else None
# Setup completo: config → preprocess → LLM → tool calls
tool_calls = await run_full_pipeline(
file_path="fixtures/emails/action_request.html",
agent_config=STANDARD_EMAIL_CONFIG,
existing_projects=[{"id": "p1", "name": "Project Alpha"}],
)
# Score multipli per aspetto
scores = {
"task_created": 1.0 if any(c["name"] == "create_task" for c in tool_calls) else 0.0,
"correct_project": 1.0 if any(c["args"].get("project_id") == "p1" for c in tool_calls) else 0.0,
"has_due_date": 1.0 if any(c["args"].get("due_date", 0) > 0 for c in tool_calls) else 0.0,
"is_ai_suggested": 1.0 if any(c["args"].get("is_ai_suggested") == 1 for c in tool_calls) else 0.0,
}
if trace:
for name, value in scores.items():
lf.score(trace_id=trace.id, name=f"e2e.action_email.{name}", value=value)
lf.flush()
assert all(v == 1.0 for v in scores.values())
```
**Criteri di successo:** ≥ 8/10 test passano, score medio ≥ 0.8
---
## Ordine di implementazione
```
Step 1 (preprocessor) ← nessuna dipendenza, partire qui
Step 3 (model/schema) ← parallelo a Step 1
Step 2 (agent_runner) ← dipende da Step 1 + Step 3
Step 4 (journey setup) ← dipende da Step 3 (schema AgentConfig)
Step 5 (frontend) ← dipende da Step 3 + Step 4
Step 6 (E2E) ← dipende da tutto
```
**Step 1 e 3 possono essere sviluppati in parallelo.**
---
## Riepilogo score Langfuse
| Step | Score prefix | # test | Soglia minima |
|------|-------------|--------|---------------|
| 1 | `preprocess.*` | 10 | ≥ 0.9 |
| 2 | `runner.*` | 10 | ≥ 0.8 |
| 3 | `schema.*` | 5 | 1.0 (deterministici) |
| 4 | `journey.*` | 6 | ≥ 0.8 |
| 5 | `fe.*` | 3 | 1.0 (deterministici) |
| 6 | `e2e.*` | 10 | ≥ 0.8 |
Totale: **44 test**, di cui ~26 con scoring LLM su Langfuse.