From 1dfd088e18679eb1859404c11d3ff30364476abe Mon Sep 17 00:00:00 2001 From: roberto Date: Thu, 5 Mar 2026 15:14:43 +0100 Subject: [PATCH] step 3.1 complete: agent config tables + schemas + migration --- AI_REFACTOR_PLAN.md | 258 +++++++++++++++++++++++++++ alembic/versions/003_agent_tables.py | 127 +++++++++++++ app/models.py | 109 +++++++++++ app/schemas.py | 140 +++++++++++++++ 4 files changed, 634 insertions(+) create mode 100644 alembic/versions/003_agent_tables.py diff --git a/AI_REFACTOR_PLAN.md b/AI_REFACTOR_PLAN.md index 8ad70b4..9517a11 100644 --- a/AI_REFACTOR_PLAN.md +++ b/AI_REFACTOR_PLAN.md @@ -240,4 +240,262 @@ Tools must use **camelCase** field names (Drizzle maps them to snake_case intern - **Step 2.1 is the point of no return** — after removing LangChain, there's no local AI fallback. - **Phase B (backend changes) must land before Phase 1.3–1.5** — Electron needs the bidirectional WS to talk to. - **Phase 3 and Phase 4 are independent** — can be parallelized after Phase 2. + +--- + +## Phase 3 — Agent System: Config, Orchestration & Cloud Connectors + +> **Objective:** Backend manages all agent configuration, scheduling, orchestration, and cloud data fetching. Two agent types: **Local Directory Agent** (backend triggers Electron to read files, then AI analyzes) and **Cloud Connector Agent** (backend fetches Gmail/Teams data directly, AI analyzes, pushes results to Electron via WS tool_call). All extracted items use existing WS tool infrastructure to insert into Electron's local DB with `is_ai_suggested=True`. +> +> **Electron Phase 3 plan:** `../adiuva/AI_REFACTOR_PLAN.md` Phase 3 section. + +### Architecture + +``` +Local Agent: + Scheduler/manual trigger ──► check device online ──► WS agent_run → Electron + ──► Electron reads files ──► WS agent_data → Backend + ──► Backend AI (prompt_template + file content) ──► WS tool_call(insert) → Electron + ──► Electron persists with isAiSuggested=1 + +Cloud Agent: + Scheduler/manual trigger ──► Backend fetches Gmail/Teams (OAuth) ──► Backend AI analyzes + ──► check device online ──► WS tool_call(insert) → Electron ──► Electron persists +``` + +**New WS frame types:** + +| Direction | `type` | Payload | +|---|---|---| +| Server → Client | `agent_run` | `{ run_id, agent_id, config: { paths, file_extensions, prompt_template, data_types } }` | +| Client → Server | `agent_data` | `{ run_id, files: [{ path, name, content, metadata }] }` | +| Client → Server | `agent_complete` | `{ run_id, files_read, errors }` | +| Client → Server | `device_hello` | `{ device_id, agent_ids }` | + +### Step 3.1 — Agent config tables +- [x] Add to `app/models.py`: + - **`LocalAgentConfig`**: + - `id` UUID PK + - `user_id` FK → users + - `device_id` str — identifies which Electron install this config belongs to + - `name` str + - `directory_paths` JSON — list of absolute paths on the device + - `data_types` JSON — which tables to extract to: `["tasks", "notes", "checkpoints", "projects"]` + - `prompt_template` text — user-configured via Chatbot Journey + - `file_extensions` JSON — e.g. `[".eml", ".txt", ".pdf", ".md"]` + - `schedule_cron` str — e.g. `"0 */6 * * *"` (every 6h) + - `enabled` bool (default True) + - `last_run_at` datetime nullable + - `created_at`, `updated_at` timestamps + - **`CloudAgentConfig`**: + - `id` UUID PK + - `user_id` FK → users + - `provider` str — enum: `gmail`, `teams`, `outlook` + - `name` str + - `data_types` JSON — same format as local + - `prompt_template` text + - `oauth_token_encrypted` text — Fernet-encrypted OAuth2 credentials + - `schedule_cron` str + - `enabled` bool (default True) + - `last_run_at` datetime nullable + - `filter_config` JSON — provider-specific: `{ labels: [], date_range: {from, to}, senders: [] }` + - `created_at`, `updated_at` timestamps + - **`AgentRunLog`**: + - `id` UUID PK + - `agent_id` str — references LocalAgentConfig.id or CloudAgentConfig.id + - `agent_type` str — `local` or `cloud` + - `user_id` FK → users + - `status` str — `running`, `success`, `error`, `partial` + - `items_processed` int (default 0) + - `items_created` int (default 0) + - `errors` JSON — list of error strings + - `started_at` datetime + - `completed_at` datetime nullable +- [x] Add Pydantic schemas to `app/schemas.py`: + - `LocalAgentConfigCreate`, `LocalAgentConfigUpdate`, `LocalAgentConfigResponse` + - `CloudAgentConfigCreate`, `CloudAgentConfigUpdate`, `CloudAgentConfigResponse` + - `AgentRunLogResponse` + - `AgentCatalogItem` — `{ type, name, description, config_schema }` + - `WsAgentRun`, `WsAgentData`, `WsAgentComplete`, `WsDeviceHello` +- [x] Generate Alembic migration +- **Files:** `app/models.py`, `app/schemas.py`, `alembic/versions/` +- **Outcome:** Agent config and run tracking tables in PostgreSQL. + +### Step 3.2 — Agent CRUD API routes +- [ ] Create `app/api/routes/agents.py`: + - `GET /api/v1/agents/catalog` — returns hardcoded agent type catalog: + - `local_directory`: "Watches local directories, extracts data from files using AI" + - `gmail`: "Scans Gmail inbox, extracts tasks/notes from emails" + - `teams`: "Monitors Teams messages, extracts action items" + - `outlook`: "Scans Outlook inbox, extracts tasks/notes" + - `GET /api/v1/agents/local` — list user's local agent configs + - `POST /api/v1/agents/local` — create local agent config + - Body: `{ name, device_id, directory_paths, data_types, prompt_template, file_extensions, schedule_cron }` + - Tier check: count enabled agents ≤ `batch_active` limit + - `PUT /api/v1/agents/local/{id}` — update config (ownership check) + - `DELETE /api/v1/agents/local/{id}` — delete config + associated run logs + - `GET /api/v1/agents/cloud` — list user's cloud agent configs + - `POST /api/v1/agents/cloud` — create cloud connector config + - Body: `{ provider, name, data_types, prompt_template, oauth_token_encrypted, schedule_cron, filter_config }` + - Tier check: same `batch_active` limit (local + cloud count together) + - `PUT /api/v1/agents/cloud/{id}` — update config + - `DELETE /api/v1/agents/cloud/{id}` — delete config + run logs + - `GET /api/v1/agents/runs` — query params: `agent_id`, `page`, `limit` → paginated run logs + - `POST /api/v1/agents/{id}/run` — manual trigger (dispatches to agent runner) + - All routes require JWT auth; ownership enforced on all mutations +- [ ] Register router in `app/main.py` +- **Files:** `app/api/routes/agents.py`, `app/main.py` +- **Outcome:** Full CRUD for agent configs with tier-gated creation limits. + +### Step 3.3 — Device WS endpoint +- [ ] Create `app/api/routes/device_ws.py`: + - `WebSocket /api/v1/ws/device?token=` — persistent connection from Electron + - On connect: + - Authenticate JWT + - Receive `device_hello` frame → extract `device_id`, `agent_ids` + - Store connection in `DeviceConnectionManager` (in-memory dict: `user_id → { ws, device_id }`) + - Check for overdue agent runs → trigger them immediately + - Message loop: + - `agent_data` → route to active agent run handler + - `agent_complete` → finalize agent run + - `tool_result` → route to pending tool call (same pattern as chat WS) + - `pong` → heartbeat ack + - On disconnect: + - Remove from `DeviceConnectionManager` + - Mark any in-progress agent runs as `error` with "device disconnected" + - Heartbeat: send `ping` every 30s, disconnect if no `pong` within 10s +- [ ] Create `app/core/device_manager.py`: + - `DeviceConnectionManager` (singleton): + - `register(user_id, device_id, ws)` — stores active connection + - `unregister(user_id)` — removes connection + - `get_ws(user_id) -> WebSocket | None` — returns active WS if device is online + - `is_online(user_id, device_id=None) -> bool` — optionally checks specific device + - `send_frame(user_id, frame: dict)` — sends JSON frame to device +- **Files:** `app/api/routes/device_ws.py`, `app/core/device_manager.py`, `app/main.py` +- **Outcome:** Backend maintains persistent WS connections to Electron devices for agent triggers. + +### Step 3.4 — Agent run orchestrator +- [ ] Create `app/core/agent_runner.py`: + - `async run_local_agent(user_id, config: LocalAgentConfig, device_mgr: DeviceConnectionManager)`: + 1. Check device is online with matching `device_id` → abort if offline + 2. Create `AgentRunLog` with `status=running` + 3. Send `WsAgentRun` frame to Electron with config (paths, extensions, prompt) + 4. Await `WsAgentData` frames — collect file contents + 5. Await `WsAgentComplete` frame — Electron signals done reading + 6. For each file: call LLM with `prompt_template` + file content → extract structured items + 7. For each extracted item: send `WsToolCall(insert, table, data)` to Electron → await `WsToolResult` + - All inserts include `is_ai_suggested=True, is_approved=False` + 8. Update `AgentRunLog`: `status=success`, `items_processed`, `items_created` + - `async run_cloud_agent(user_id, config: CloudAgentConfig, device_mgr: DeviceConnectionManager)`: + 1. Check device is online → abort if offline (results must push to Electron) + 2. Create `AgentRunLog` with `status=running` + 3. Decrypt OAuth credentials from `config.oauth_token_encrypted` + 4. Fetch data from cloud provider (Step 3.6): + - Gmail: `google-api-python-client` + `filter_config` label/date filters + - Teams: `msgraph-sdk` + channel/date filters + - Outlook: `msgraph-sdk` + folder/date filters + 5. For each item: call LLM with `prompt_template` + email/message content → extract structured items + 6. For each extracted item: send `WsToolCall(insert)` to Electron → await `WsToolResult` + 7. Update `AgentRunLog` + - `async trigger_pending_runs(user_id, device_id, device_mgr)`: + - Called when Electron connects (after `device_hello`) + - Queries all enabled agent configs where `last_run_at + schedule_interval < now()` + - For local agents: only triggers if `config.device_id == device_id` + - For cloud agents: triggers regardless of device (any connected device can receive results) + - Executes runs sequentially (one at a time to avoid overwhelming the WS) + - Error handling: on any failure, update `AgentRunLog` with `status=error` + error details +- **Files:** `app/core/agent_runner.py` +- **Outcome:** Backend drives all agent execution — both local (via WS file request) and cloud (direct API calls). + +### Step 3.5 — Chatbot Journey endpoint +- [ ] Create `app/api/routes/agent_setup.py`: + - `POST /api/v1/agents/journey/start`: + - Body: `{ agent_type: "local"|"cloud", data_types: ["tasks", "notes", ...] }` + - Creates a journey session (in-memory or Redis-backed) + - Returns first AI message: contextual question based on agent type + - Local: "What kind of files are in the directories you want to monitor? (emails, documents, logs, etc.)" + - Cloud: "What kind of emails/messages should I look for? (client communications, invoices, meeting notes, etc.)" + - Response: `{ session_id, message, done: false }` + - `POST /api/v1/agents/journey/message`: + - Body: `{ session_id, message }` + - AI processes user's answer, asks follow-up questions (max 5 turns) + - System prompt: "You are configuring a data extraction agent for a freelancer. Ask about file format, what data to extract (tasks, notes, checkpoints), naming conventions, priority rules, and any special mapping. After 3-5 questions, generate a detailed prompt_template." + - When AI determines enough context: `{ session_id, message: "Here's your configuration...", done: true, prompt_template: "..." }` + - The `prompt_template` is a structured instruction for the extraction LLM (e.g. "Extract tasks from email. Subject becomes task title. If body contains 'urgent' or 'ASAP', set priority to 'high'. Extract due dates if mentioned.") +- **Files:** `app/api/routes/agent_setup.py`, `app/main.py` +- **Outcome:** Users configure AI prompts through guided conversation, not manual text editing. + +### Step 3.6 — Cloud provider integrations +- [ ] Create `app/integrations/gmail.py`: + - `GmailClient`: + - `__init__(oauth_token)` — initializes Google API client + - `async fetch_messages(filter_config, since: datetime) -> list[EmailMessage]` + - `EmailMessage`: `{ id, subject, sender, body_text, date, labels }` + - Handles token refresh via Google OAuth2 refresh flow + - Respects `filter_config.labels`, `filter_config.date_range`, `filter_config.senders` +- [ ] Create `app/integrations/ms_graph.py`: + - `MSGraphClient`: + - `__init__(oauth_token)` — initializes MS Graph client + - `async fetch_emails(filter_config, since: datetime) -> list[EmailMessage]` (Outlook) + - `async fetch_messages(filter_config, since: datetime) -> list[ChatMessage]` (Teams) + - `ChatMessage`: `{ id, content, sender, channel, date }` + - Handles token refresh via MSAL +- [ ] Create `app/integrations/__init__.py` — factory: `get_provider(provider_name) -> GmailClient | MSGraphClient` +- **Dependencies:** `google-api-python-client`, `google-auth-oauthlib`, `msgraph-sdk`, `msal` +- **Files:** `app/integrations/gmail.py`, `app/integrations/ms_graph.py`, `app/integrations/__init__.py` +- **Outcome:** Backend can fetch emails/messages from Gmail, Outlook, and Teams. + +### Step 3.7 — Agent scheduler +- [ ] Create `app/core/agent_scheduler.py`: + - Uses `APScheduler` (or simple asyncio loop) to check agent schedules + - Every 60s: query enabled agents where `last_run_at + cron_interval < now()` + - For each due agent: + - Check if user's device is online via `DeviceConnectionManager` + - If online: dispatch to `agent_runner` + - If offline: skip (will trigger on next `device_hello`) + - Locks: use PostgreSQL advisory locks to prevent duplicate runs in multi-instance deployments +- [ ] Integrate with FastAPI lifespan (start scheduler on app startup, shutdown gracefully) +- **Dependencies:** `apscheduler>=4.0` +- **Files:** `app/core/agent_scheduler.py`, `app/main.py` +- **Outcome:** Agents run automatically on their configured schedules. + +### Step 3.8 — OAuth flow endpoints +- [ ] Create `app/api/routes/oauth.py`: + - `GET /api/v1/oauth/{provider}/authorize` — returns OAuth authorization URL + - Gmail: Google OAuth2 with `gmail.readonly` scope + - Outlook/Teams: MS identity platform with `Mail.Read`, `ChannelMessage.Read.All` scopes + - `GET /api/v1/oauth/{provider}/callback` — handles OAuth redirect + - Exchanges auth code for access + refresh tokens + - Encrypts tokens with Fernet (server-side key from settings) + - Returns encrypted token blob for storage in `CloudAgentConfig.oauth_token_encrypted` + - `POST /api/v1/oauth/{provider}/refresh` — refresh expired OAuth token +- **Files:** `app/api/routes/oauth.py`, `app/main.py` +- **Outcome:** Users can connect Gmail/Teams/Outlook accounts securely. + +--- + +### Phase 3 — Verification + +| # | Scenario | Expected | +|---|---|---| +| 1 | **Agent CRUD** | Create/read/update/delete local and cloud configs; tier limits enforced (free=2, pro=10) | +| 2 | **WS device connect** | Electron connects → `device_hello` → backend stores connection → triggers overdue runs | +| 3 | **Local agent run** | Backend sends `agent_run` → Electron reads files → `agent_data` → backend AI extracts → `tool_call(insert)` → Electron persists with `isAiSuggested=1` | +| 4 | **Cloud agent run** | Backend fetches Gmail → AI extracts tasks → `tool_call(insert)` → Electron persists | +| 5 | **Device binding** | Local agent config with `device_id=A` only triggers when device A is connected | +| 6 | **Chatbot Journey** | Start journey → 3-5 Q&A turns → produces valid `prompt_template` | +| 7 | **Schedule** | Agent with `schedule_cron="0 */6 * * *"` runs every 6h when device is online | +| 8 | **Offline resilience** | Device offline → runs skipped → device reconnects → overdue runs trigger immediately | +| 9 | **OAuth flow** | Gmail authorize → callback → token encrypted → stored in config → fetch emails works | + +### Phase 3 — New Dependencies + +| Package | Purpose | +|---|---| +| `google-api-python-client` | Gmail API access | +| `google-auth-oauthlib` | Gmail OAuth2 flow | +| `msgraph-sdk` | Outlook + Teams API access | +| `msal` | MS identity platform auth | +| `apscheduler>=4.0` | Agent scheduling | +| `cryptography` (Fernet) | OAuth token encryption at rest | - **One step at a time.** Mark `[x]` and commit with `step N.N complete: `. \ No newline at end of file diff --git a/alembic/versions/003_agent_tables.py b/alembic/versions/003_agent_tables.py new file mode 100644 index 0000000..1e503c8 --- /dev/null +++ b/alembic/versions/003_agent_tables.py @@ -0,0 +1,127 @@ +"""Add agent config and run log tables: local_agent_configs, cloud_agent_configs, agent_run_logs. + +Revision ID: 003 +Revises: 002 +Create Date: 2026-03-05 +""" + +from __future__ import annotations + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision: str = "003" +down_revision: Union[str, None] = "002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ── Enum types — idempotent creation ────────────────────────────────── + op.execute(""" + DO $$ BEGIN + CREATE TYPE agent_type AS ENUM ('local', 'cloud'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$; + """) + op.execute(""" + DO $$ BEGIN + CREATE TYPE agent_run_status AS ENUM ('running', 'success', 'error', 'partial'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$; + """) + op.execute(""" + DO $$ BEGIN + CREATE TYPE cloud_provider AS ENUM ('gmail', 'teams', 'outlook'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$; + """) + + # ── local_agent_configs ─────────────────────────────────────────────── + op.create_table( + "local_agent_configs", + sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), + sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), + sa.Column("device_id", sa.String(255), nullable=False), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("directory_paths", sa.JSON, nullable=False, server_default="[]"), + sa.Column("data_types", sa.JSON, nullable=False, server_default="[]"), + sa.Column("prompt_template", sa.Text, nullable=False, server_default=""), + sa.Column("file_extensions", sa.JSON, nullable=False, server_default="[]"), + sa.Column("schedule_cron", sa.String(100), nullable=False, server_default="0 */6 * * *"), + sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.true()), + sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + ) + op.create_index("ix_local_agent_configs_user_id", "local_agent_configs", ["user_id"]) + + # ── cloud_agent_configs ─────────────────────────────────────────────── + op.create_table( + "cloud_agent_configs", + sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), + sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), + sa.Column( + "provider", + postgresql.ENUM("gmail", "teams", "outlook", name="cloud_provider", create_type=False), + nullable=False, + ), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("data_types", sa.JSON, nullable=False, server_default="[]"), + sa.Column("prompt_template", sa.Text, nullable=False, server_default=""), + sa.Column("oauth_token_encrypted", sa.Text, nullable=True), + sa.Column("filter_config", sa.JSON, nullable=True), + sa.Column("schedule_cron", sa.String(100), nullable=False, server_default="0 */6 * * *"), + sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.true()), + sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + ) + op.create_index("ix_cloud_agent_configs_user_id", "cloud_agent_configs", ["user_id"]) + + # ── agent_run_logs ───────────────────────────────────────────────────── + op.create_table( + "agent_run_logs", + sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False), + # Plain string — not a FK because it references either local_agent_configs or + # cloud_agent_configs depending on agent_type. + sa.Column("agent_id", sa.String(255), nullable=False), + sa.Column( + "agent_type", + postgresql.ENUM("local", "cloud", name="agent_type", create_type=False), + nullable=False, + ), + sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False), + sa.Column( + "status", + postgresql.ENUM("running", "success", "error", "partial", name="agent_run_status", create_type=False), + nullable=False, + server_default="running", + ), + sa.Column("items_processed", sa.Integer, nullable=False, server_default="0"), + sa.Column("items_created", sa.Integer, nullable=False, server_default="0"), + sa.Column("errors", sa.JSON, nullable=True), + sa.Column("started_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")), + sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + ) + op.create_index("ix_agent_run_logs_user_id", "agent_run_logs", ["user_id"]) + op.create_index("ix_agent_run_logs_agent_id", "agent_run_logs", ["agent_id"]) + + +def downgrade() -> None: + op.drop_table("agent_run_logs") + op.drop_table("cloud_agent_configs") + op.drop_table("local_agent_configs") + + op.execute("DROP TYPE IF EXISTS cloud_provider;") + op.execute("DROP TYPE IF EXISTS agent_run_status;") + op.execute("DROP TYPE IF EXISTS agent_type;") diff --git a/app/models.py b/app/models.py index b2747a4..ed59042 100644 --- a/app/models.py +++ b/app/models.py @@ -23,11 +23,13 @@ from datetime import datetime, timezone from sqlalchemy import ( BigInteger, + Boolean, DateTime, Enum, Float, ForeignKey, Integer, + JSON, String, Text, UniqueConstraint, @@ -54,6 +56,9 @@ def _now() -> datetime: TierEnum = Enum("free", "pro", "power", "team", name="billing_tier") PluginStatusEnum = Enum("pending_review", "approved", "rejected", name="plugin_status") ReviewDecisionEnum = Enum("approved", "rejected", name="review_decision") +AgentTypeEnum = Enum("local", "cloud", name="agent_type") +AgentStatusEnum = Enum("running", "success", "error", "partial", name="agent_run_status") +CloudProviderEnum = Enum("gmail", "teams", "outlook", name="cloud_provider") # ── Models ──────────────────────────────────────────────────────────────── @@ -266,3 +271,107 @@ class RevenueEvent(Base): ) plugin: Mapped[Plugin] = relationship(back_populates="revenue_events") + + +class LocalAgentConfig(Base): + __tablename__ = "local_agent_configs" + + id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), primary_key=True, default=_uuid + ) + user_id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + device_id: Mapped[str] = mapped_column(String(255), nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list) + data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list) + prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="") + file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list) + schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *") + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() + ) + + run_logs: Mapped[list[AgentRunLog]] = relationship( + back_populates="local_agent", + primaryjoin="and_(AgentRunLog.agent_id == LocalAgentConfig.id, AgentRunLog.agent_type == 'local')", + foreign_keys="AgentRunLog.agent_id", + cascade="all, delete-orphan", + overlaps="run_logs,cloud_agent", + ) + + +class CloudAgentConfig(Base): + __tablename__ = "cloud_agent_configs" + + id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), primary_key=True, default=_uuid + ) + user_id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + provider: Mapped[str] = mapped_column(CloudProviderEnum, nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list) + prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="") + oauth_token_encrypted: Mapped[str | None] = mapped_column(Text, nullable=True) + filter_config: Mapped[dict | None] = mapped_column(JSON, nullable=True) + schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *") + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() + ) + + run_logs: Mapped[list[AgentRunLog]] = relationship( + back_populates="cloud_agent", + primaryjoin="and_(AgentRunLog.agent_id == CloudAgentConfig.id, AgentRunLog.agent_type == 'cloud')", + foreign_keys="AgentRunLog.agent_id", + cascade="all, delete-orphan", + overlaps="run_logs,local_agent", + ) + + +class AgentRunLog(Base): + __tablename__ = "agent_run_logs" + + id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), primary_key=True, default=_uuid + ) + # Plain string — not a FK because it references either local_agent_configs or cloud_agent_configs + # depending on agent_type. Query by (agent_id, agent_type) to locate the source config. + agent_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True) + agent_type: Mapped[str] = mapped_column(AgentTypeEnum, nullable=False) + user_id: Mapped[str] = mapped_column( + Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + status: Mapped[str] = mapped_column(AgentStatusEnum, nullable=False, default="running") + items_processed: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + items_created: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + errors: Mapped[list | None] = mapped_column(JSON, nullable=True) + started_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + local_agent: Mapped[LocalAgentConfig | None] = relationship( + back_populates="run_logs", + primaryjoin="and_(AgentRunLog.agent_id == LocalAgentConfig.id, AgentRunLog.agent_type == 'local')", + foreign_keys="AgentRunLog.agent_id", + overlaps="run_logs,cloud_agent", + ) + cloud_agent: Mapped[CloudAgentConfig | None] = relationship( + back_populates="run_logs", + primaryjoin="and_(AgentRunLog.agent_id == CloudAgentConfig.id, AgentRunLog.agent_type == 'cloud')", + foreign_keys="AgentRunLog.agent_id", + overlaps="run_logs,local_agent", + ) diff --git a/app/schemas.py b/app/schemas.py index 843d88d..997955e 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -167,6 +167,10 @@ class WsFrameType(str, Enum): tool_result = "tool_result" final = "final" ping = "ping" + agent_run = "agent_run" + agent_data = "agent_data" + agent_complete = "agent_complete" + device_hello = "device_hello" class WsToolCall(BaseModel): @@ -207,3 +211,139 @@ class WsFinal(BaseModel): type: Literal[WsFrameType.final] = WsFrameType.final response: str + + +# ── WebSocket Agent Frame Protocol ──────────────────────────────────── + +class WsDeviceHello(BaseModel): + """Client → Server: device identification on WS connect.""" + + type: Literal[WsFrameType.device_hello] = WsFrameType.device_hello + device_id: str + agent_ids: list[str] = Field(default_factory=list) + + +class WsAgentRun(BaseModel): + """Server → Client: trigger an agent run on the connected device.""" + + type: Literal[WsFrameType.agent_run] = WsFrameType.agent_run + run_id: str + agent_id: str + config: dict[str, Any] + + +class WsAgentData(BaseModel): + """Client → Server: files read by the local agent.""" + + type: Literal[WsFrameType.agent_data] = WsFrameType.agent_data + run_id: str + files: list[dict[str, Any]] + + +class WsAgentComplete(BaseModel): + """Client → Server: Electron signals it has finished reading files.""" + + type: Literal[WsFrameType.agent_complete] = WsFrameType.agent_complete + run_id: str + files_read: int + errors: list[str] = Field(default_factory=list) + + +# ── Agent Catalog ───────────────────────────────────────────────────── + +class AgentCatalogItem(BaseModel): + type: str + name: str + description: str + config_schema: dict[str, Any] = Field(default_factory=dict) + + +# ── Local Agent Config ──────────────────────────────────────────────── + +class LocalAgentConfigCreate(BaseModel): + name: str + device_id: str + directory_paths: list[str] + data_types: list[str] + prompt_template: str + file_extensions: list[str] + schedule_cron: str + + +class LocalAgentConfigUpdate(BaseModel): + name: str | None = None + device_id: str | None = None + directory_paths: list[str] | None = None + data_types: list[str] | None = None + prompt_template: str | None = None + file_extensions: list[str] | None = None + schedule_cron: str | None = None + enabled: bool | None = None + + +class LocalAgentConfigResponse(BaseModel): + id: str + name: str + device_id: str + directory_paths: list[str] + data_types: list[str] + prompt_template: str + file_extensions: list[str] + schedule_cron: str + enabled: bool + last_run_at: int | None + created_at: int + updated_at: int + + +# ── Cloud Agent Config ──────────────────────────────────────────────── + +class CloudAgentConfigCreate(BaseModel): + provider: Literal["gmail", "teams", "outlook"] + name: str + data_types: list[str] + prompt_template: str + oauth_token_encrypted: str + schedule_cron: str + filter_config: dict[str, Any] | None = None + + +class CloudAgentConfigUpdate(BaseModel): + provider: Literal["gmail", "teams", "outlook"] | None = None + name: str | None = None + data_types: list[str] | None = None + prompt_template: str | None = None + oauth_token_encrypted: str | None = None + schedule_cron: str | None = None + filter_config: dict[str, Any] | None = None + enabled: bool | None = None + + +class CloudAgentConfigResponse(BaseModel): + """oauth_token_encrypted is intentionally excluded — never returned to clients.""" + + id: str + provider: Literal["gmail", "teams", "outlook"] + name: str + data_types: list[str] + prompt_template: str + schedule_cron: str + filter_config: dict[str, Any] | None + enabled: bool + last_run_at: int | None + created_at: int + updated_at: int + + +# ── Agent Run Log ───────────────────────────────────────────────────── + +class AgentRunLogResponse(BaseModel): + id: str + agent_id: str + agent_type: Literal["local", "cloud"] + status: Literal["running", "success", "error", "partial"] + items_processed: int + items_created: int + errors: list[str] + started_at: int + completed_at: int | None