12 Commits

Author SHA1 Message Date
Roberto Musso
e668e3fd20 update setting page 2026-04-15 11:43:56 +02:00
Roberto Musso
7ccdad431f feat(i18n): inject user language into AI agent system prompts
- Add _language_instruction() to deep_agent.py, reads language from core memory
- Append language directive to all 4 run_* functions (task/project/checkpoint/note)
- Minor fixes: alembic env, route imports, test cleanup
2026-04-12 00:35:23 +02:00
Roberto Musso
4073863dc6 feat: add onboarding wizard backend - migration, schema, memory routes 2026-04-11 23:38:53 +02:00
Roberto Musso
a85f8fde29 feat(langfuse): propagate user_id and session_id to all traces
- Add hash_user_id() to SHA-256 hash user IDs before sending to Langfuse
- Add langfuse_context() helper wrapping propagate_attributes()
- deep_agent: extract session_id from _debug context, wrap all agent
  runs and classifier with langfuse_context(user_id, session_id)
- agent_runner: add session_id param, pass run_id as session for batch
- agent_setup: wrap journey LLM calls with langfuse_context
- Remove redundant metadata dicts (now handled by propagate_attributes)
2026-04-10 22:44:05 +02:00
Roberto Musso
90500a3462 fix: return 409 when unverified OAuth email conflicts with existing account
Before: branch 3 of oauth_callback attempted to INSERT a user with a
duplicate email → DB constraint violation → 500.

After: if email_verified=False and the email already exists, raise 409
with a message directing the user to sign in with their password.

Also adds test_callback_unverified_email_conflict_returns_409.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 13:46:15 +02:00
Roberto Musso
c1a8ac7669 test: add TestOAuth suite for Google OAuth routes
6 tests covering the authorize and callback endpoints:
- authorize returns URL + state, 503 when unconfigured
- callback: state mismatch → 401, new user creation, existing OAuth
  link re-login (same user sub), email-match auto-linking to password user

Provider methods (exchange_code, get_userinfo) are mocked via AsyncMock
so tests run without hitting Google APIs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 13:42:11 +02:00
Roberto Musso
c510cbaae5 feat: add OAuth web-callback route and update OAUTH_REDIRECT_URI default
GET /auth/oauth/{provider}/web-callback receives the Google redirect and
bounces immediately to adiuvai://oauth/callback deep link. Google Cloud
Console only accepts http/https redirect URIs — adiuvai:// is not valid.
Default OAUTH_REDIRECT_URI now points to localhost:8000 for dev; override
with the API domain env var in production.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 13:03:05 +02:00
Roberto Musso
ce139bbac3 feat: add OAuth DB schema — oauth_accounts table, nullable password_hash, avatar_url on User
Step 1 of Google login integration: Alembic migration for oauth_accounts +
avatar_url on users, OAuthAccount model with User relationship, UserProfile
schema extended with avatar_url, get_current_user updated to include avatar_url.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 09:20:52 +02:00
Roberto Musso
3cf067faea feat: enhance agent configuration and model management with per-agent overrides 2026-04-10 08:45:14 +02:00
Roberto Musso
7253f6fe72 testing journey agent creation 2026-04-09 00:40:16 +02:00
Roberto Musso
41db3a7089 update env variables 2026-04-08 23:52:52 +02:00
Roberto Musso
cc94194fd1 update app name 2026-04-08 23:27:34 +02:00
42 changed files with 1594 additions and 771 deletions

View File

@@ -2,7 +2,7 @@
ENV=dev ENV=dev
# ── Database ────────────────────────────────────────────────────────────────── # ── Database ──────────────────────────────────────────────────────────────────
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/adiuva DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/adiuvai
# ── Auth ────────────────────────────────────────────────────────────────────── # ── Auth ──────────────────────────────────────────────────────────────────────
JWT_SECRET=replace-with-a-long-random-secret JWT_SECRET=replace-with-a-long-random-secret
@@ -13,11 +13,45 @@ JWT_REFRESH_TOKEN_EXPIRE_DAYS=30
# ── LLM ─────────────────────────────────────────────────────────────────────── # ── LLM ───────────────────────────────────────────────────────────────────────
# LiteLLM model identifiers — change to swap providers without code changes. # LiteLLM model identifiers — change to swap providers without code changes.
# Examples: gpt-4o, anthropic/claude-sonnet-4-20250514, gemini/gemini-pro, ollama/llama3 # Examples: gpt-4o, anthropic/claude-sonnet-4-20250514, gemini/gemini-pro, ollama/llama3
#
# API keys — only the key(s) matching your chosen provider(s) are required.
# The correct key is picked automatically from the model prefix (e.g.
# "anthropic/..." → ANTHROPIC_API_KEY, "gemini/..." → GOOGLE_API_KEY).
OPENAI_API_KEY= OPENAI_API_KEY=
ANTHROPIC_API_KEY= ANTHROPIC_API_KEY=
GOOGLE_API_KEY= GOOGLE_API_KEY=
LLM_MODEL=gpt-4o CEREBRAS_API_KEY=
LLM_ROUTER_MODEL=gpt-4o-mini
# Default model used by any agent that does not have a specific override below.
LLM_MODEL=gpt-5-mini
LLM_EMBED_MODEL=text-embedding-3-small
# GitHub Copilot — leave empty to use the LiteLLM default token directory.
# In Docker, point this to a named-volume path so tokens survive restarts.
# GITHUB_COPILOT_TOKEN_DIR=
# ── Per-agent model overrides ─────────────────────────────────────────────────
# Leave a value empty to fall back to LLM_MODEL.
# Each agent resolves its API key from the model prefix automatically.
#
# Intent classifier — routes user messages to the right domain agent.
# A small/fast model (e.g. gpt-4o-mini) is usually sufficient here.
LLM_MODEL_CLASSIFIER=
# Home-agent — handles chat from the home screen (all tools available).
LLM_MODEL_HOME_AGENT=
# Floating-agent — handles contextual chat triggered from a task/project/note.
LLM_MODEL_FLOATING_AGENT=
# Unified-processor — processes local directory files (local agent runner).
LLM_MODEL_UNIFIED_PROCESSOR=
# Cloud-processor — fetches and processes data from cloud connectors.
LLM_MODEL_CLOUD_PROCESSOR=
# Setup-agent — guided journey to build an AgentConfig via WebSocket chat.
LLM_MODEL_SETUP_AGENT=
# ── Stripe (leave empty to stub billing) ────────────────────────────────────── # ── Stripe (leave empty to stub billing) ──────────────────────────────────────
STRIPE_SECRET_KEY= STRIPE_SECRET_KEY=
@@ -27,9 +61,9 @@ STRIPE_WEBHOOK_SECRET=
# ── Langfuse (leave empty to disable observability) ─────────────────────────── # ── Langfuse (leave empty to disable observability) ───────────────────────────
LANGFUSE_SECRET_KEY= LANGFUSE_SECRET_KEY=
LANGFUSE_PUBLIC_KEY= LANGFUSE_PUBLIC_KEY=
# LANGFUSE_HOST=https://cloud.langfuse.com # EU (default) # LANGFUSE_BASE_URL=https://cloud.langfuse.com # EU (default)
# LANGFUSE_HOST=https://us.cloud.langfuse.com # US # LANGFUSE_BASE_URL=https://us.cloud.langfuse.com # US
# LANGFUSE_HOST=http://localhost:3000 # Self-hosted # LANGFUSE_BASE_URL=http://localhost:3000 # Self-hosted
# ── CORS ────────────────────────────────────────────────────────────────────── # ── CORS ──────────────────────────────────────────────────────────────────────
# Comma-separated list parsed by Settings (override default if needed) # Comma-separated list parsed by Settings (override default if needed)

View File

@@ -48,23 +48,23 @@ jobs:
key: ${{ secrets.SSH_KEY }} key: ${{ secrets.SSH_KEY }}
script: | script: |
set -e set -e
DEPLOY_DIR="/opt/adiuva-api" DEPLOY_DIR="/opt/adiuvai-api"
REPO_URL="http://10.0.0.119:3000/${{ gitea.repository }}.git" REPO_URL="http://10.0.0.119:3000/${{ gitea.repository }}.git"
TAG="${{ gitea.ref_name }}" TAG="${{ gitea.ref_name }}"
# ── Pull latest code ── # ── Pull latest code ──
cd /tmp && rm -rf adiuva-api-deploy cd /tmp && rm -rf adiuvai-api-deploy
git clone --depth 1 --branch "${TAG}" "${REPO_URL}" adiuva-api-deploy git clone --depth 1 --branch "${TAG}" "${REPO_URL}" adiuvai-api-deploy
# ── Sync source (preserve .env) ── # ── Sync source (preserve .env) ──
cp -rf /tmp/adiuva-api-deploy/app/ \ cp -rf /tmp/adiuvai-api-deploy/app/ \
/tmp/adiuva-api-deploy/alembic/ \ /tmp/adiuvai-api-deploy/alembic/ \
/tmp/adiuva-api-deploy/alembic.ini \ /tmp/adiuvai-api-deploy/alembic.ini \
/tmp/adiuva-api-deploy/Dockerfile \ /tmp/adiuvai-api-deploy/Dockerfile \
/tmp/adiuva-api-deploy/docker-compose.yml \ /tmp/adiuvai-api-deploy/docker-compose.yml \
/tmp/adiuva-api-deploy/requirements.txt \ /tmp/adiuvai-api-deploy/requirements.txt \
"$DEPLOY_DIR/" "$DEPLOY_DIR/"
rm -rf /tmp/adiuva-api-deploy rm -rf /tmp/adiuvai-api-deploy
# ── Verify .env ── # ── Verify .env ──
if [ ! -f "$DEPLOY_DIR/.env" ]; then if [ ! -f "$DEPLOY_DIR/.env" ]; then

View File

@@ -58,7 +58,7 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Build image - name: Build image
run: docker build -t adiuva-api:ci . run: docker build -t adiuvai-api:ci .
- name: Verify gunicorn installed - name: Verify gunicorn installed
run: docker run --rm adiuva-api:ci gunicorn --version run: docker run --rm adiuvai-api:ci gunicorn --version

591
README.md
View File

@@ -1,591 +0,0 @@
# Adiuva Cloud API
**AI-powered project management backend with LLM orchestration and subscription billing.**
Built with FastAPI · Python 3.12 · PostgreSQL · LangChain · Stripe
---
## Table of Contents
- [Overview](#overview)
- [Architecture](#architecture)
- [Key Features](#key-features)
- [Tech Stack](#tech-stack)
- [Getting Started](#getting-started)
- [Docker Deployment](#docker-deployment)
- [Environment Variables](#environment-variables)
- [API Reference](#api-reference)
- [Data Model](#data-model)
- [AI Agent System](#ai-agent-system)
- [Orchestration & Execution Plans](#orchestration--execution-plans)
- [Middleware](#middleware)
- [Billing & Tiers](#billing--tiers)
- [Testing](#testing)
- [Project Structure](#project-structure)
- [License](#license)
---
## Overview
Adiuva Cloud API is the FastAPI backend that powers the **Adiuva Electron desktop app**. It provides LLM-powered chat orchestration, text embedding generation, and Stripe-based subscription billing across four tiers.
### Design Principles
1. **Never expose prompts** — system prompts stay server-side; responses are sanitized to strip any leaked prompt fragments.
2. **Stateless request handling** — all context comes from the client and JWT; no server-side session state.
3. **Tier gates enforced server-side** — the server always reads the current tier from the database, never trusting client-reported values.
---
## Architecture
```
┌──────────────┐ ┌────────────────────────────────────────────────────────┐
│ Electron │ │ FastAPI (Uvicorn / Gunicorn) │
│ Desktop App │────▶│ │
│ (Client) │◀────│ Middleware: RateLimit → Sanitizer → CORS → Router │
└──────────────┘ │ │
│ ┌──────────────────┐ ┌────────────────────────────┐ │
│ │ Auth Routes │ │ Chat Routes │ │
│ │ Billing Routes │ │ ↓ │ │
│ │ Agent Routes │ │ Orchestrator (GPT-4o-mini)│ │
│ │ Device WS │ │ ↓ classify intent │ │
│ └──────────────────┘ │ Agent Registry │ │
│ │ ↓ │ │
│ │ TaskAgent | ProjectAgent │ │
│ │ NoteAgent | CheckptAgent │ │
│ │ (GPT-4o + LangChain) │ │
│ └────────────────────────────┘ │
└────────────────────────────────────────────────────────┘
┌────────▼───┐
│ PostgreSQL │
│ (Auth, │
│ Billing, │
│ Agents) │
└────────────┘
┌────────▼───┐
│ Stripe │
│ (Billing) │
└────────────┘
```
---
## Key Features
1. **LLM-powered orchestration** — GPT-4o-mini classifies user intent and routes to the appropriate domain agent.
2. **4 specialized AI agents** — Tasks (8 tools), Projects (6 tools), Timelines (4 tools), Notes (5 tools), all powered by GPT-4o via LangChain.
3. **Execution plans & playbooks** — Server-side prompt template registry; clients receive only opaque template IDs, never raw prompts.
4. **Text embeddings** — Generates text-embedding-3-small vectors for local client-side note search.
5. **Stripe billing** — Four-tier subscription model (Free / Pro / Power / Team) with checkout sessions and full webhook lifecycle handling.
6. **JWT authentication** — Access + refresh tokens with bcrypt password hashing, SHA-256 token hashing, and automatic rotation.
7. **Prompt IP protection** — Sanitizer middleware strips system prompts, reasoning markers, tool schemas, and agent routing metadata from all chat responses.
8. **Tier-based rate limiting** — Sliding-window per-user limiter scaling from 20 to 200 requests/min by subscription tier.
9. **WebSocket streaming** — Real-time chat with 30-second heartbeat keep-alive and chunked text delivery.
10. **Alembic migrations** — Versioned schema management.
11. **Comprehensive test suite** — In-memory SQLite, per-tier test fixtures, and full API coverage without external dependencies.
---
## Tech Stack
| Package | Version | Purpose |
|---|---|---|
| `fastapi` | ≥ 0.115.0 | Web framework |
| `uvicorn[standard]` | ≥ 0.34.0 | ASGI development server |
| `gunicorn` | ≥ 22.0.0 | Production process manager |
| `langchain` | ≥ 0.3.0 | LLM orchestration framework |
| `langchain-openai` | ≥ 0.3.0 | OpenAI LLM provider integration |
| `litellm` | ≥ 1.50.0 | Universal LLM gateway (100+ providers) |
| `pydantic` | ≥ 2.10.0 | Data validation and serialization |
| `pydantic-settings` | ≥ 2.7.0 | Environment-based configuration |
| `python-jose[cryptography]` | ≥ 3.3.0 | JWT encoding and decoding |
| `stripe` | ≥ 11.0.0 | Billing and payment integration |
| `slowapi` | ≥ 0.1.9 | Rate limiting utilities |
| `sqlalchemy` | ≥ 2.0.0 | Async ORM and query builder |
| `asyncpg` | ≥ 0.30.0 | PostgreSQL async driver |
| `alembic` | ≥ 1.14.0 | Database migration management |
| `bcrypt` | ≥ 4.2.0 | Password hashing |
| `python-dotenv` | ≥ 1.0.0 | `.env` file loading |
| `httpx` | ≥ 0.28.0 | Async HTTP client (used in tests) |
| `websockets` | ≥ 14.0 | WebSocket protocol support |
| `psycopg2-binary` | ≥ 2.9.0 | Synchronous PostgreSQL driver (Alembic) |
| `pytest` | ≥ 8.0.0 | Test framework |
| `pytest-asyncio` | ≥ 0.24.0 | Async test support |
| `aiosqlite` | ≥ 0.20.0 | In-memory SQLite for tests |
| `ruff` | ≥ 0.8.0 | Linter and formatter |
---
## Getting Started
### Prerequisites
- Python 3.12+
- PostgreSQL 16+
- An OpenAI API key (for LLM features)
- Stripe API keys (optional — billing stubs gracefully when unconfigured)
### Installation
```bash
# Clone the repository
git clone <repo-url> && cd adiuva-api
# Create a virtual environment
python -m venv .venv && source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Configure environment
cp .env.example .env
# Edit .env with your DATABASE_URL, OPENAI_API_KEY, etc.
```
### Database Setup
```bash
# Start PostgreSQL (or use the Docker Compose database)
docker compose up db -d
# Run migrations
alembic upgrade head
```
### Run the Development Server
```bash
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
```
Interactive API docs are available at [http://localhost:8000/docs](http://localhost:8000/docs) in development mode (`ENV=dev`). The `/docs` endpoint is disabled in production.
---
## Docker Deployment
### Quick Start
```bash
docker compose up --build
```
This starts two services:
- **app** — FastAPI server on port `8000`
- **db** — PostgreSQL 16 (Alpine) on port `5432` with a persistent volume and health checks
### Dockerfile Details
The Dockerfile uses a multi-stage build:
1. **Builder stage** — Installs Python dependencies into a virtual environment.
2. **Runtime stage** — Copies only the venv, app source, and Alembic migrations. Runs as a non-root user (`appuser`).
3. **Production server** — Gunicorn with 4 Uvicorn workers, 120-second timeout, listening on port 8000.
```bash
# Production command (run by the container)
gunicorn app.main:app -k uvicorn.workers.UvicornWorker -w 4 --timeout 120 -b 0.0.0.0:8000
```
---
## Homelab / Self-Hosted Deployment
You can run the entire stack locally on a homelab with **no cloud dependencies except the LLM provider**.
### 1. Start all services
```bash
docker compose up -d
```
This starts PostgreSQL alongside the app.
### 2. Configure your `.env`
```bash
# Database (uses the compose PostgreSQL)
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/adiuva
# Billing — leave empty to stub (no Stripe needed)
STRIPE_SECRET_KEY=
STRIPE_WEBHOOK_SECRET=
# LLM — the only external service
OPENAI_API_KEY=sk-...
LLM_MODEL=gpt-4o
LLM_ROUTER_MODEL=gpt-4o-mini
# Auth
JWT_SECRET=your-secret-here
ENV=dev
```
### 3. Run migrations
```bash
docker compose exec app alembic upgrade head
```
### What runs where
| Service | Runs on | Port | Notes |
|---|---|---|---|
| FastAPI app | Docker | 8000 | API server |
| PostgreSQL | Docker | 5432 | Auth, billing, agents |
| Stripe | — | — | Stubbed when keys are empty |
| OpenAI / LLM | Cloud | — | Only external dependency |
> **Want fully offline AI too?** Set `LLM_MODEL=ollama/llama3` and `LLM_ROUTER_MODEL=ollama/llama3`, then add an Ollama container or point at a local Ollama instance. See the [LLM provider switching](#switching-llm-providers) section.
---
## Environment Variables
All variables are loaded from a `.env` file via Pydantic Settings. Source: `app/config/settings.py`
| Variable | Type | Default | Description |
|---|---|---|---|
| `DATABASE_URL` | `str` | `postgresql+asyncpg://postgres:postgres@localhost:5432/adiuva` | Async SQLAlchemy connection string |
| `JWT_SECRET` | `str` | `change-me-in-production` | HMAC secret for JWT signing |
| `JWT_ALGORITHM` | `str` | `HS256` | JWT signing algorithm |
| `JWT_ACCESS_TOKEN_EXPIRE_MINUTES` | `int` | `30` | Access token time-to-live |
| `JWT_REFRESH_TOKEN_EXPIRE_DAYS` | `int` | `30` | Refresh token time-to-live |
| `STRIPE_SECRET_KEY` | `str` | `""` | Stripe API key (empty = stub mode) |
| `STRIPE_WEBHOOK_SECRET` | `str` | `\"\"` | Stripe webhook signature secret |\n| `OPENAI_API_KEY` | `str` | `\"\"` | OpenAI key for LLM agent calls |
| `LLM_MODEL` | `str` | `gpt-4o` | LiteLLM model identifier for agents (e.g. `anthropic/claude-3.5-sonnet`, `gemini/gemini-pro`, `ollama/llama3`) |
| `LLM_ROUTER_MODEL` | `str` | `gpt-4o-mini` | Lighter model used for intent classification / routing |
| `CORS_ORIGINS` | `list[str]` | `["app://.", "http://localhost:3000", "http://localhost:5173"]` | Allowed CORS origins |
| `ENV` | `Literal` | `dev` | `dev` or `prod` — controls `/docs` visibility and SQL echo |
---
## API Reference
All routes are prefixed with `/api/v1`. **27 endpoints** total (25 REST + 1 WebSocket + 1 health check).
### Health
| Method | Path | Auth | Description |
|---|---|---|---|
| `GET` | `/api/v1/health` | No | Returns `{"status": "ok", "version": "0.1.0"}` |
### Auth
| Method | Path | Auth | Description |
|---|---|---|---|
| `POST` | `/api/v1/auth/register` | No | Create account with bcrypt-hashed password, returns `AuthTokens` |
| `POST` | `/api/v1/auth/login` | No | Validate credentials, returns `AuthTokens` |
| `POST` | `/api/v1/auth/refresh` | No | Rotate refresh token, returns new `AuthTokens` |
| `GET` | `/api/v1/auth/me` | JWT | Returns `UserProfile` for the authenticated user |
### Chat
| Method | Path | Auth | Description |
|---|---|---|---|
| `POST` | `/api/v1/chat` | JWT | Route message through the orchestrator; returns `ChatResponse` or `ExecutionPlan` depending on execution mode |
| `POST` | `/api/v1/chat/embed` | JWT | Generate a 1536-dim text embedding vector (`text-embedding-3-small`). Used by Electron for local note search. |
| `WS` | `/api/v1/chat/stream` | JWT (query param `?token=`) | Streaming chat — first frame is a `ChatRequest`, server yields text chunks, final frame is `{"done": true, "response": "...", "actions": [...]}`. 30-second heartbeat ping. |
### Plans
| Method | Path | Auth | Description |
|---|---|---|---|
| `GET` | `/api/v1/plans/playbook` | JWT | List all cached execution plan playbooks |
| `GET` | `/api/v1/plans/playbook/{plan_id}` | JWT | Retrieve a specific playbook by ID |
### Billing
| Method | Path | Auth | Description |
|---|---|---|---|
| `POST` | `/api/v1/billing/checkout` | JWT | Create a Stripe checkout session, returns `{"checkout_url": "..."}` |
| `POST` | `/api/v1/billing/webhook` | Stripe signature | Handle Stripe events: `checkout.session.completed`, `customer.subscription.updated`, `customer.subscription.deleted`, `invoice.payment_failed` |
| `GET` | `/api/v1/billing/subscription` | JWT | Get current subscription information |
| `DELETE` | `/api/v1/billing/subscription` | JWT | Cancel subscription and revert to free tier |
---
## Data Model
3 tables managed by Alembic migrations. Source: `app/models.py`
### Tables
| Table | Primary Key | Key Columns | Purpose |
|---|---|---|---|
| `users` | `id` (UUID) | `email` (unique), `password_hash`, `tier`, `stripe_customer_id`, timestamps | User accounts |
| `refresh_tokens` | `id` (UUID) | `user_id` (FK), `token_hash` (SHA-256, unique), `expires_at` | Hashed refresh tokens for rotation |
| `subscriptions` | `id` (UUID) | `user_id` (FK, unique), `stripe_subscription_id`, `tier`, `status`, `current_period_end` | Stripe subscription records |
### Enum Types
| Enum | Values |
|---|---|
| `billing_tier` | `free`, `pro`, `power`, `team` |
### Migrations
| Version | Description |
|---|---|
| `001_initial_schema` | Creates core auth and billing tables with indexes and foreign key constraints |
---
## AI Agent System
The agent system uses a registry pattern with LangChain tool-calling agents powered by GPT-4o. Source: `app/agents/`, `app/core/agent_registry.py`
### Architecture
- **`BaseAgent`** — Abstract base with `user_id` and `shared_memory`.
- **`ChatAgent(BaseAgent)`** — Abstract `handle(query, context)` and `get_tools()` methods, plus a shared `_tool_loop(llm, messages, tools, max_iter=5)` for iterative tool calling.
- **`AgentRegistry`** — Singleton registry with `@register` decorator, `get(name)`, `list_agents()`, and `call_agent(name, query, context)`.
### Registered Agents
| Agent | Registry Name | Tools | Description |
|---|---|---|---|
| **TaskAgent** | `task_agent` | 8 | Full task and comment CRUD. Status: `todo` / `in_progress` / `done`. Priority: `high` / `medium` / `low`. Tools: `list_tasks`, `create_task`, `update_task`, `delete_task`, `list_tasks_due_today`, `list_task_comments`, `add_task_comment`, `delete_task_comment` |
| **ProjectAgent** | `project_agent` | 6 | Project lifecycle management. Status: `active` / `archived`. Prefers archiving over deletion. Tools: `list_projects`, `list_all_projects`, `get_project`, `create_project`, `update_project`, `delete_project` |
| **TimelineAgent** | `timeline_agent` | 4 | Project milestones. Requires `project_id` for creation. Supports AI-suggestion and approval workflows. Tools: `list_timelines`, `create_timeline`, `update_timeline`, `delete_timeline` |
| **NoteAgent** | `note_agent` | 5 | Markdown note management. Optionally linked to projects. Tools: `list_notes`, `get_note`, `create_note`, `update_note`, `delete_note` |
All agents use the model configured by `LLM_MODEL` (default: GPT-4o) with `temperature=0` via LiteLLM. Tools return JSON action descriptors that the Electron client interprets and applies locally.
### Switching LLM Providers
The backend uses **LiteLLM** as a universal LLM gateway. All agents and the orchestrator instantiate models through a centralized factory in `app/core/llm.py`. To switch providers, change environment variables — no code changes required:
```bash
# OpenAI (default)
LLM_MODEL=gpt-4o
LLM_ROUTER_MODEL=gpt-4o-mini
# Anthropic
LLM_MODEL=anthropic/claude-3.5-sonnet
LLM_ROUTER_MODEL=anthropic/claude-3-haiku
# Google Gemini
LLM_MODEL=gemini/gemini-pro
LLM_ROUTER_MODEL=gemini/gemini-flash
# Local Ollama
LLM_MODEL=ollama/llama3
LLM_ROUTER_MODEL=ollama/llama3
# AWS Bedrock
LLM_MODEL=bedrock/anthropic.claude-v2
LLM_ROUTER_MODEL=bedrock/anthropic.claude-instant-v1
```
See the [LiteLLM provider docs](https://docs.litellm.ai/docs/providers) for the full list of 100+ supported providers and model naming conventions.
---
## Orchestration & Execution Plans
Source: `app/core/orchestrator.py`, `app/core/execution_plan.py`
### Orchestrator
1. **`classify_intent(message, context, registry)`** — Uses the router model (`LLM_ROUTER_MODEL`, default: GPT-4o-mini) to determine which agent should handle a message. Falls back to `task_agent` when classification is ambiguous.
2. **`route_single(agent_name, message, context)`** — Routes to a single agent and returns a `ChatResponse`.
3. **`route_pipeline(agent_names, message, context)`** — Executes agents sequentially; each receives `previous_results` from earlier agents. A final LLM synthesis step merges all results.
4. **`orchestrate(request)`** — Main entry point. In `direct` mode, returns a `ChatResponse`. In `plan` mode, returns an `ExecutionPlan`.
5. **`orchestrate_stream(request)`** — Streaming variant that yields 50-character text chunks with a final JSON frame.
### Execution Plans
- **`PromptTemplateRegistry`** — Maps template IDs to server-side prompt text. Clients only ever see opaque IDs, never raw prompts.
- **`ExecutionPlanBuilder`** — Fluent builder API: `add_step()`, `add_llm_step(template_id, vars)`, `add_data_step(action, data_from_step)`. Validates step references on `build()`.
- **`PlanCache`** — LRU cache (maxsize 1000) for storing plans as reusable playbooks.
### Built-in Templates (6)
`tpl_task_agent_default`, `tpl_timeline_agent_default`, `tpl_project_agent_default`, `tpl_note_agent_default`, `tpl_task_extract_from_project`, `tpl_note_weekly_summary`
### Built-in Playbooks (2)
| Playbook | Description |
|---|---|
| `create_tasks_from_project` | LLM extracts actionable tasks from project context, then creates task records |
| `generate_weekly_note` | LLM generates a weekly summary, then creates a note record |
---
## Middleware
Middleware executes in this order on each request: **TierRateLimit → Sanitizer → CORS → Router**
### JWT Authentication
Source: `app/api/middleware/auth.py`
- FastAPI dependency `get_current_user` validates the `Bearer` JWT and extracts `user_id` and `email`.
- **Live tier lookup** — The current tier is fetched from the `subscriptions` table on every request (not cached in the JWT), so upgrades and downgrades take immediate effect.
- Falls back to `free` when no subscription row exists.
- Raises `401 Unauthorized` on invalid or expired tokens.
- **Exempt paths:** `/api/v1/auth/register`, `/api/v1/auth/login`, `/api/v1/billing/webhook`
### Tier-Based Rate Limiter
Source: `app/api/middleware/rate_limit.py`
- `TierRateLimitMiddleware` — Sliding-window in-process rate limiter (no Redis dependency).
- Per-user 60-second window sized by subscription tier:
| Tier | Requests / Minute |
|---|---|
| Free | 20 |
| Pro | 60 |
| Power | 120 |
| Team | 200 |
- Returns `429 Too Many Requests` with a `Retry-After` header when the limit is exceeded.
- **Exempt paths:** register, login, webhook, health
### Response Sanitizer
Source: `app/api/middleware/sanitizer.py`
- Runs only on `/api/v1/chat` endpoints.
- Scans JSON response bodies and replaces leaked prompt IP fragments with `[REDACTED]`.
- Detects: system prompt openers, agent routing metadata, LangChain tool schemas, internal reasoning markers (`<thinking>`, `[INST]`), and known prompt fingerprints.
- Logs sanitization events as `WARNING`.
---
## Billing & Tiers
Source: `app/billing/stripe_service.py`, `app/billing/tier_manager.py`
### Feature Matrix
| Feature | Free | Pro | Power | Team |
|---|---|---|---|---|
| AI Agents | 3 | Unlimited | Unlimited | Unlimited |
| Batch Active | 2 | 10 | Unlimited | Unlimited |
| LLM Providers | 1 | Unlimited | Unlimited | Unlimited |
| Batch Builder | — | — | ✓ | ✓ |
| SSO | — | — | — | ✓ |
| Rate Limit | 20 req/min | 60 req/min | 120 req/min | 200 req/min |
### Stripe Integration
- **Checkout** — `create_checkout_session(user_id, tier)` creates a Stripe Checkout session. Returns a stub URL when Stripe is not configured.
- **Webhooks** — Handles `checkout.session.completed`, `customer.subscription.updated`, `customer.subscription.deleted`, and `invoice.payment_failed`.
- **Subscription management** — `get_subscription()` returns the current subscription record; `cancel_subscription()` cancels via the Stripe API and reverts the user to the free tier.
- **Price IDs:** `price_pro_monthly`, `price_power_monthly`, `price_team_monthly`
### Tier Manager
- `get_tier(user_id)` — Returns the user's current billing tier.
- `check_feature(tier, feature)` — Boolean feature gate check.
- `require_feature(tier, feature)` — Raises HTTP 403 if the feature is not available.
---
## Testing
### Running Tests
```bash
# Run all tests
pytest
# Run a specific test file
pytest tests/test_auth.py
# Run with verbose output
pytest -v
```
### Test Infrastructure
- **Database:** Async SQLite in-memory via `aiosqlite` + `StaticPool` — fast, no PostgreSQL needed.
- **Auth helpers:** `make_jwt(tier)` and `auth_header(tier)` generate per-tier test tokens.
- **Seed data:** Auto-creates one `User` + `Subscription` per tier (free/pro/power/team) before each test.
- **FK enforcement:** SQLite `PRAGMA foreign_keys=ON`.
- **No external dependencies** — all tests run fully offline.
### Test Coverage
| File | Coverage |
|---|---|
| `test_auth.py` | Register, login, token access, refresh, expiration |
| `test_middleware.py` | Rate limiting by tier, sanitizer prompt leak detection |
---
## Project Structure
```
adiuva-api/
├── alembic.ini # Alembic configuration
├── docker-compose.yml # Docker Compose (app + PostgreSQL)
├── Dockerfile # Multi-stage production build
├── requirements.txt # Python dependencies
├── alembic/ # Database migrations
│ ├── env.py # Alembic environment config
│ ├── script.py.mako # Migration template
│ └── versions/
│ └── 001_initial_schema.py # Tables, indexes, FKs
├── app/ # Application source
│ ├── main.py # FastAPI app factory, middleware, routes
│ ├── db.py # Async SQLAlchemy engine & session
│ ├── models.py # SQLAlchemy ORM models
│ ├── schemas.py # Pydantic request/response schemas
│ │
│ ├── config/
│ │ └── settings.py # Pydantic Settings (env vars)
│ │
│ ├── agents/ # LLM-powered domain agents
│ │ ├── task_agent.py # Task & comment CRUD (8 tools)
│ │ ├── project_agent.py # Project lifecycle (6 tools)
│ │ ├── timeline_agent.py # Milestones (4 tools)
│ │ └── note_agent.py # Markdown notes (5 tools)
│ │
│ ├── core/ # Orchestration engine
│ │ ├── agent_registry.py # BaseAgent, ChatAgent, AgentRegistry
│ │ ├── llm.py # LiteLLM factory (get_llm, get_router_llm)
│ │ └── deep_agent.py # Deep agent orchestration
│ │
│ ├── api/ # HTTP layer
│ │ ├── deps.py # Shared FastAPI dependencies
│ │ ├── middleware/
│ │ │ ├── rate_limit.py # Sliding-window tier rate limiter
│ │ │ └── sanitizer.py # Prompt IP leak protection
│ │ └── routes/
│ │ ├── auth.py # Register, login, refresh, me
│ │ ├── chat.py # Chat + embed endpoint
│ │ ├── billing.py # Stripe checkout, webhooks, subscription
│ │ ├── agents.py # Agent catalog, config, runs
│ │ └── device_ws.py # Persistent device WebSocket
│ │
│ └── billing/
│ ├── stripe_service.py # Stripe API wrapper
│ └── tier_manager.py # Feature matrix, rate limits
└── tests/ # Test suite
├── conftest.py # Fixtures: DB, auth, seeds
├── test_auth.py
├── test_orchestrator.py
├── test_agents.py
├── test_agent_registry.py
├── test_execution_plan.py
└── test_middleware.py
```
---
## License
*To be determined.*

View File

@@ -16,7 +16,7 @@ import re
from logging.config import fileConfig from logging.config import fileConfig
from alembic import context from alembic import context
from sqlalchemy import engine_from_config, pool from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import create_async_engine
# Alembic Config object (gives access to alembic.ini values). # Alembic Config object (gives access to alembic.ini values).

View File

@@ -14,7 +14,7 @@ from alembic import op
from sqlalchemy.dialects import postgresql from sqlalchemy.dialects import postgresql
revision: str = "003" revision: str = "003"
down_revision: Union[str, None] = "002" down_revision: Union[str, None] = "001"
branch_labels: Union[str, Sequence[str], None] = None branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None

View File

@@ -1,4 +1,8 @@
"""add agent_config to local_agent_configs """Restore agent config tables and add agent_config column.
9a1f2d0b6c7e dropped local_agent_configs and cloud_agent_configs, but both
ORM models are still active. This migration recreates them with agent_config
added to local_agent_configs.
Revision ID: a3b9c0d1e2f3 Revision ID: a3b9c0d1e2f3
Revises: 9a1f2d0b6c7e Revises: 9a1f2d0b6c7e
@@ -9,8 +13,9 @@ from __future__ import annotations
from typing import Sequence, Union from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
@@ -21,11 +26,82 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None: def upgrade() -> None:
op.add_column( # Recreate enum types (idempotent — they may already exist from migration 003)
"local_agent_configs", op.execute("""
sa.Column("agent_config", sa.JSON(), nullable=True), DO $$ BEGIN
) CREATE TYPE agent_type AS ENUM ('local', 'cloud');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
""")
op.execute("""
DO $$ BEGIN
CREATE TYPE agent_run_status AS ENUM ('running', 'success', 'error', 'partial');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
""")
op.execute("""
DO $$ BEGIN
CREATE TYPE cloud_provider AS ENUM ('gmail', 'teams', 'outlook');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
""")
bind = op.get_bind()
inspector = sa.inspect(bind)
existing = set(inspector.get_table_names())
# ── local_agent_configs (with agent_config column) ────────────────────
if "local_agent_configs" not in existing:
op.create_table(
"local_agent_configs",
sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column("device_id", sa.String(255), nullable=False),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("directory_paths", sa.JSON, nullable=False, server_default="[]"),
sa.Column("data_types", sa.JSON, nullable=False, server_default="[]"),
sa.Column("prompt_template", sa.Text, nullable=False, server_default=""),
sa.Column("agent_config", sa.JSON, nullable=True),
sa.Column("file_extensions", sa.JSON, nullable=False, server_default="[]"),
sa.Column("schedule_cron", sa.String(100), nullable=False, server_default="0 */6 * * *"),
sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.true()),
sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
)
op.create_index("ix_local_agent_configs_user_id", "local_agent_configs", ["user_id"])
# ── cloud_agent_configs ───────────────────────────────────────────────
if "cloud_agent_configs" not in existing:
op.create_table(
"cloud_agent_configs",
sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column(
"provider",
postgresql.ENUM("gmail", "teams", "outlook", name="cloud_provider", create_type=False),
nullable=False,
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("data_types", sa.JSON, nullable=False, server_default="[]"),
sa.Column("prompt_template", sa.Text, nullable=False, server_default=""),
sa.Column("oauth_token_encrypted", sa.Text, nullable=True),
sa.Column("filter_config", sa.JSON, nullable=True),
sa.Column("schedule_cron", sa.String(100), nullable=False, server_default="0 */6 * * *"),
sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.true()),
sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
)
op.create_index("ix_cloud_agent_configs_user_id", "cloud_agent_configs", ["user_id"])
def downgrade() -> None: def downgrade() -> None:
op.drop_column("local_agent_configs", "agent_config") op.drop_index("ix_cloud_agent_configs_user_id", table_name="cloud_agent_configs")
op.drop_table("cloud_agent_configs")
op.drop_index("ix_local_agent_configs_user_id", table_name="local_agent_configs")
op.drop_table("local_agent_configs")

View File

@@ -0,0 +1,56 @@
"""Add oauth_accounts table, nullable password_hash, avatar_url to users.
Revision ID: b4c0d1e2f3a4
Revises: a3b9c0d1e2f3
Create Date: 2026-04-10 00:00:00.000000
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "b4c0d1e2f3a4"
down_revision: Union[str, None] = "a3b9c0d1e2f3"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ── users: make password_hash nullable (social users have no password) ──
op.alter_column("users", "password_hash", existing_type=sa.String(255), nullable=True)
# ── users: add avatar_url ─────────────────────────────────────────────
op.add_column("users", sa.Column("avatar_url", sa.String(2048), nullable=True))
# ── oauth_accounts ────────────────────────────────────────────────────
op.create_table(
"oauth_accounts",
sa.Column("id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column("user_id", postgresql.UUID(as_uuid=False), nullable=False),
sa.Column("provider", sa.String(50), nullable=False),
sa.Column("provider_user_id", sa.String(255), nullable=False),
sa.Column("provider_email", sa.String(255), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("now()"),
),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.UniqueConstraint("provider", "provider_user_id", name="uq_oauth_provider_user"),
)
op.create_index("ix_oauth_accounts_user_id", "oauth_accounts", ["user_id"])
def downgrade() -> None:
op.drop_index("ix_oauth_accounts_user_id", table_name="oauth_accounts")
op.drop_table("oauth_accounts")
op.drop_column("users", "avatar_url")
op.alter_column("users", "password_hash", existing_type=sa.String(255), nullable=False)

View File

@@ -0,0 +1,31 @@
"""Add onboarding_completed_at column to users table.
Revision ID: c5d1e2f3a4b5
Revises: b4c0d1e2f3a4
Create Date: 2026-04-11 00:00:00.000000
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "c5d1e2f3a4b5"
down_revision: Union[str, None] = "b4c0d1e2f3a4"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
"users",
sa.Column("onboarding_completed_at", sa.DateTime(timezone=True), nullable=True),
)
def downgrade() -> None:
op.drop_column("users", "onboarding_completed_at")

View File

@@ -0,0 +1,34 @@
"""avatar_url_varchar_to_text
Revision ID: e04100e88ace
Revises: c5d1e2f3a4b5
Create Date: 2026-04-13 09:13:06.733674
"""
from __future__ import annotations
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e04100e88ace'
down_revision: Union[str, None] = 'c5d1e2f3a4b5'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.alter_column('users', 'avatar_url',
existing_type=sa.VARCHAR(length=2048),
type_=sa.Text(),
existing_nullable=True)
def downgrade() -> None:
op.alter_column('users', 'avatar_url',
existing_type=sa.Text(),
type_=sa.VARCHAR(length=2048),
existing_nullable=True)

View File

@@ -7,12 +7,31 @@ handles actual disk I/O and responds with ``tool_result`` frames.
from __future__ import annotations from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Any from typing import Any
from langchain_core.tools import tool from langchain_core.tools import tool
from app.core.ws_context import execute_on_client from app.core.ws_context import execute_on_client
# Max characters returned by read_file_content in journey (exploration) tools.
# The journey only needs to understand file structure, not full content.
_JOURNEY_READ_MAX_CHARS: int = 4000
def _resolve_path(path: str, base: str) -> str:
"""Resolve *path* against *base* when *path* is relative.
The LLM often passes ``"."`` meaning "the configured directory".
Without this, Electron resolves ``"."`` relative to its own CWD instead
of the user's chosen directory.
"""
if os.path.isabs(path):
return path
return str(Path(base) / path)
@tool @tool
async def list_directory(path: str) -> str: async def list_directory(path: str) -> str:
@@ -83,3 +102,93 @@ FILESYSTEM_TOOLS: list[Any] = [
read_file_content, read_file_content,
get_file_metadata, get_file_metadata,
] ]
def make_directory_tools(base_directory: str) -> list[Any]:
"""Return filesystem tools that resolve relative paths against *base_directory*.
Use this instead of ``FILESYSTEM_TOOLS`` whenever you know the user's target
directory upfront (e.g., journey setup sessions). Relative paths like ``"."``
from the LLM are resolved to the correct absolute path before being sent to
the Electron client, preventing it from falling back to its own CWD.
"""
def _compact_for_journey(raw: str) -> str:
"""Strip HTML noise and truncate for journey exploration.
The journey LLM only needs to understand file structure (headers,
first paragraphs). Full CSS/style blocks are pure noise that eat
up context window budget.
"""
text = re.sub(r"<style[^>]*>.*?</style>", "", raw, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r"<script[^>]*>.*?</script>", "", text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL)
if len(text) > _JOURNEY_READ_MAX_CHARS:
text = text[:_JOURNEY_READ_MAX_CHARS] + "\n[…truncated for exploration]"
return text
@tool
async def list_directory(path: str) -> str: # noqa: F811
"""List files and folders in a local directory on the user's device.
Returns a formatted listing of entries with name, type (file/directory),
and full path.
"""
resolved = _resolve_path(path, base_directory)
result = await execute_on_client(
action="list_directory",
data={"path": resolved},
)
entries: list[dict[str, Any]] = result.get("entries", [])
if not entries:
return f"Directory '{resolved}' is empty or does not exist."
lines: list[str] = []
for entry in entries:
entry_type = entry.get("type", "unknown")
entry_name = entry.get("name", "")
entry_path = entry.get("path", "")
lines.append(f"- [{entry_type}] {entry_name} ({entry_path})")
return f"Directory listing for '{resolved}' ({len(entries)} entries):\n" + "\n".join(lines)
@tool
async def read_file_content(path: str) -> str: # noqa: F811
"""Read the text content of a local file on the user's device.
Returns the file content as a string. Large files may be truncated
by the Electron client.
"""
resolved = _resolve_path(path, base_directory)
result = await execute_on_client(
action="read_file_content",
data={"path": resolved},
)
content: str = result.get("content", "")
if not content:
return f"File '{resolved}' is empty or could not be read."
return _compact_for_journey(content)
@tool
async def get_file_metadata(path: str) -> str: # noqa: F811
"""Get metadata for a local file: size, creation date, modification date, extension.
Returns a formatted summary of the file's metadata.
"""
resolved = _resolve_path(path, base_directory)
result = await execute_on_client(
action="get_file_metadata",
data={"path": resolved},
)
size = result.get("size", "unknown")
created = result.get("createdAt", "unknown")
modified = result.get("modifiedAt", "unknown")
extension = result.get("extension", "unknown")
name = result.get("name", resolved)
return (
f"File: {name}\n"
f" Extension: {extension}\n"
f" Size: {size} bytes\n"
f" Created: {created}\n"
f" Modified: {modified}"
)
return [list_directory, read_file_content, get_file_metadata]

View File

@@ -18,21 +18,6 @@ _UUID_RE = re.compile(
def _is_uuid(value: str) -> bool: def _is_uuid(value: str) -> bool:
return bool(_UUID_RE.match(value)) return bool(_UUID_RE.match(value))
NOTE_SYSTEM_PROMPT = (
"You are a note-taking assistant. You help users create, retrieve, update,\n"
"and delete Markdown notes in their workspace.\n\n"
"Rules:\n"
" - content is always Markdown; preserve formatting when updating\n"
" - project_id is optional; link a note to a project when mentioned\n"
" - When updating, call get_note first if you need to read existing content\n"
" before appending or replacing sections\n"
" - list_notes without project_id returns all notes; scope with project_id\n"
" when the user is working within a specific project\n"
" - project_id must be a UUID; if you only know a project name, do not pass it as project_id\n"
" - Do not fabricate note content — reflect what the user provides or what\n"
" is already in the note (retrieved via get_note)."
)
@tool @tool
async def list_notes(project_id: str = "") -> str: async def list_notes(project_id: str = "") -> str:

View File

@@ -8,22 +8,6 @@ from langchain_core.tools import tool
from app.core.ws_context import execute_on_client from app.core.ws_context import execute_on_client
PROJECT_SYSTEM_PROMPT = (
"You are a project management assistant. You help users create, find,\n"
"update, and archive projects in their workspace.\n\n"
"Rules:\n"
" - status must be one of: active, archived\n"
" - client_id is optional; link to a client only when explicitly mentioned\n"
" - ai_summary is populated only when the user asks for a project summary;\n"
" derive it from context data — do not fabricate content\n"
" - Use list_projects for scoped queries; list_all_projects only when the\n"
" user wants a complete cross-client view including archived projects\n"
" - get_project requires a project UUID; resolve the ID first by calling\n"
" list_projects if you only have a project name\n"
" - Prefer archiving (update_project status=archived) over deletion;\n"
" only call delete_project when the user explicitly confirms deletion."
)
@tool @tool
async def list_projects( async def list_projects(

View File

@@ -18,23 +18,6 @@ _UUID_RE = re.compile(
def _is_uuid(value: str) -> bool: def _is_uuid(value: str) -> bool:
return bool(_UUID_RE.match(value)) return bool(_UUID_RE.match(value))
TASK_SYSTEM_PROMPT = (
"You are a task management assistant for a project workspace.\n"
"You create, update, list, and track tasks and their comments.\n\n"
"Rules:\n"
" - status must be one of: todo, in_progress, done\n"
" - priority must be one of: high, medium, low\n"
" - due_date is a Unix timestamp in milliseconds; convert human dates\n"
" - assignees is a JSON-encoded array of strings (e.g. '[\"Alice\",\"Bob\"]')\n"
" - project_id is optional; link to a project when the user mentions one\n"
" - is_ai_suggested: 1 only when proactively proposing a task the user\n"
" did not explicitly request; 0 otherwise\n"
" - is_ai_suggested: 1 only when proactively proposing a task the user did not explicitly request; 0 otherwise\n"
" - Use list_tasks_due_today for 'what's due today' queries\n"
" - For update_task, use -1 for integer fields you do not want to change\n"
" - Always confirm the action in plain, user-friendly language."
)
# ── Task tools ──────────────────────────────────────────────────────── # ── Task tools ────────────────────────────────────────────────────────

View File

@@ -17,20 +17,6 @@ _UUID_RE = re.compile(
def _is_uuid(value: str) -> bool: def _is_uuid(value: str) -> bool:
return bool(_UUID_RE.match(value)) return bool(_UUID_RE.match(value))
TIMELINE_SYSTEM_PROMPT = (
"You are a project timeline assistant. Timelines are milestone dates that\n"
"track progress on a project — they are not calendar events.\n\n"
"Rules:\n"
" - project_id is REQUIRED for every create; confirm with the user if unknown\n"
" - For listing, project_id must be a UUID; never pass plain names as project_id\n"
" - date is a Unix timestamp in milliseconds; convert human-readable dates\n"
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
" - is_ai_suggested: 1 when proactively proposing a timeline, 0 otherwise\n"
" - For update_timeline, use -1 for integer fields you do not want to change\n"
" - Listing without a project_id returns all timelines across projects\n"
" - Always echo the title and formatted date in your confirmation."
)
@tool @tool
async def list_timelines(project_id: str = "") -> str: async def list_timelines(project_id: str = "") -> str:

View File

@@ -65,16 +65,39 @@ async def get_current_user(
default_tier = "power" if settings.ENV == "dev" else "free" default_tier = "power" if settings.ENV == "dev" else "free"
tier: str = result.scalar_one_or_none() or default_tier tier: str = result.scalar_one_or_none() or default_tier
# Fetch name/surname from user row. # Fetch name/surname/avatar_url/onboarding_completed_at/password_hash from user row.
user_result = await db.execute( user_result = await db.execute(
select(User.name, User.surname).where(User.id == user_id) select(
User.name, User.surname, User.avatar_url, User.onboarding_completed_at,
User.password_hash,
).where(User.id == user_id)
) )
user_row = user_result.one_or_none() user_row = user_result.one_or_none()
# Convert onboarding_completed_at to epoch ms (int) or None.
onboarding_ms: int | None = None
if user_row and user_row.onboarding_completed_at is not None:
onboarding_ms = int(user_row.onboarding_completed_at.timestamp() * 1000)
# Load decrypted core memory.
from app.core.memory_middleware import MemoryMiddleware # noqa: PLC0415
memory_dict: dict[str, str] = {}
try:
mw = MemoryMiddleware(db)
blocks = await mw.list_core_blocks(user_id)
memory_dict = {b["label"]: b["value"] for b in blocks}
except Exception:
pass # Non-critical — return empty memory on failure
return UserProfile( return UserProfile(
id=user_id, id=user_id,
email=email, email=email,
name=user_row.name if user_row else None, name=user_row.name if user_row else None,
surname=user_row.surname if user_row else None, surname=user_row.surname if user_row else None,
avatar_url=user_row.avatar_url if user_row else None,
has_password=bool(user_row.password_hash) if user_row else False,
tier=tier, tier=tier,
onboarding_completed_at=onboarding_ms,
memory=memory_dict,
) # type: ignore[arg-type] ) # type: ignore[arg-type]

View File

@@ -31,10 +31,9 @@ from typing import Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from app.agents.filesystem_agent import FILESYSTEM_TOOLS from app.agents.filesystem_agent import make_directory_tools
from app.config.settings import settings from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback from app.core.llm import get_agent_llm, model_for_agent
from app.core.llm import get_llm
from app.schemas import AgentConfig from app.schemas import AgentConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -257,15 +256,17 @@ async def _call_llm_with_tools(
else: else:
messages.append(AIMessage(content=turn["content"])) messages.append(AIMessage(content=turn["content"]))
llm = get_llm(model=None, temperature=0.4) llm = get_agent_llm("setup", temperature=0.4)
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
_lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
name="journey-setup", name="journey-setup",
metadata={"user_id": user_id or None, "session_id": session_id or None},
input=history[-1]["content"] if history else "", input=history[-1]["content"] if history else "",
) )
if lf else None if lf else None
@@ -273,12 +274,12 @@ async def _call_llm_with_tools(
_span = _span_ctx.__enter__() if _span_ctx else None _span = _span_ctx.__enter__() if _span_ctx else None
try: try:
for _ in range(_MAX_TOOL_STEPS): for step in range(_MAX_TOOL_STEPS):
_gen_ctx = ( _gen_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="generation", as_type="generation",
name="journey-setup-llm", name="journey-setup-llm",
model=settings.LLM_MODEL, model=model_for_agent("setup"),
prompt=langfuse_prompt, prompt=langfuse_prompt,
input=messages, input=messages,
) )
@@ -287,15 +288,27 @@ async def _call_llm_with_tools(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
resp_text = _as_text(response.content)
# Guard against empty responses (e.g. model returned finish_reason
# 'error' which LiteLLM maps to 'stop' with empty content).
if not response.tool_calls and not resp_text.strip():
logger.warning(
"agent_setup: journey LLM returned empty response at step %d — retrying",
step,
)
# Drop the empty AIMessage so we don't pollute history, and retry.
continue
messages.append(response) messages.append(response)
if not response.tool_calls: if not response.tool_calls:
if _span: if _span:
_span.update(output=_as_text(response.content)) _span.update(output=resp_text)
return _as_text(response.content) return resp_text
for call in response.tool_calls: for call in response.tool_calls:
call_name = str(call.get("name", "")) call_name = str(call.get("name", ""))
@@ -324,10 +337,14 @@ async def _call_llm_with_tools(
final_text = _as_text(final.content) final_text = _as_text(final.content)
if _span: if _span:
_span.update(output=final_text) _span.update(output=final_text)
return final_text return final_text or (
"Sorry, I had trouble processing the files. "
"Could you try again? If the issue persists, the files might be too large for me to analyse."
)
finally: finally:
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -372,7 +389,7 @@ async def handle_journey_start(
ai_reply = await _call_llm_with_tools( ai_reply = await _call_llm_with_tools(
system_prompt=system_prompt, system_prompt=system_prompt,
history=seed_history, history=seed_history,
tools=list(FILESYSTEM_TOOLS), tools=make_directory_tools(directory),
user_id=user_id, user_id=user_id,
session_id=session_id, session_id=session_id,
langfuse_prompt=langfuse_prompt, langfuse_prompt=langfuse_prompt,
@@ -436,10 +453,11 @@ async def handle_journey_message(
session.history.append({"role": "user", "content": message}) session.history.append({"role": "user", "content": message})
# Call the LLM with tools. # Call the LLM with tools.
session_tools = make_directory_tools(session.directory)
ai_reply = await _call_llm_with_tools( ai_reply = await _call_llm_with_tools(
system_prompt=session.system_prompt, system_prompt=session.system_prompt,
history=session.history, history=session.history,
tools=list(FILESYSTEM_TOOLS), tools=session_tools,
user_id=session.user_id, user_id=session.user_id,
session_id=session_id, session_id=session_id,
langfuse_prompt=session.langfuse_prompt, langfuse_prompt=session.langfuse_prompt,
@@ -464,7 +482,7 @@ async def handle_journey_message(
nudge_reply = await _call_llm_with_tools( nudge_reply = await _call_llm_with_tools(
system_prompt=session.system_prompt, system_prompt=session.system_prompt,
history=session.history, history=session.history,
tools=list(FILESYSTEM_TOOLS), tools=session_tools,
user_id=session.user_id, user_id=session.user_id,
session_id=session_id, session_id=session_id,
langfuse_prompt=session.langfuse_prompt, langfuse_prompt=session.langfuse_prompt,

View File

@@ -12,8 +12,11 @@ in backend agent-config tables.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timezone
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func, select from sqlalchemy import func, select
@@ -177,6 +180,11 @@ async def trigger_agent_run(
_enforce_agent_limit(current_user.tier, body.active_agents) _enforce_agent_limit(current_user.tier, body.active_agents)
await _enforce_run_frequency(current_user.tier, current_user.id, db) await _enforce_run_frequency(current_user.tier, current_user.id, db)
last_run_dt = (
datetime.fromtimestamp(body.last_run_at / 1000, tz=timezone.utc)
if body.last_run_at
else None
)
config = LocalAgentConfig( config = LocalAgentConfig(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
user_id=current_user.id, user_id=current_user.id,
@@ -184,10 +192,12 @@ async def trigger_agent_run(
name="Local Directory Monitor", name="Local Directory Monitor",
directory_paths=[body.directory], directory_paths=[body.directory],
data_types=_to_data_types(body.what_to_extract), data_types=_to_data_types(body.what_to_extract),
prompt_template=body.custom_agent_prompt, prompt_template=body.custom_agent_prompt or "",
agent_config=body.agent_config,
file_extensions=[], file_extensions=[],
schedule_cron=body.batch_interval, schedule_cron=body.batch_interval,
enabled=True, enabled=True,
last_run_at=last_run_dt,
) )
# Use the FE's stable agent_id if provided, fall back to the ephemeral config id. # Use the FE's stable agent_id if provided, fall back to the ephemeral config id.

View File

@@ -1,34 +1,68 @@
"""Auth routes: register, login, refresh, me. """Auth routes: register, login, refresh, me, OAuth social login, onboarding.
Users and refresh tokens are persisted in PostgreSQL (users + refresh_tokens Users and refresh tokens are persisted in PostgreSQL (users + refresh_tokens
tables). Passwords are hashed with bcrypt; refresh tokens are stored as tables). Passwords are hashed with bcrypt; refresh tokens are stored as
SHA-256 hashes so plaintext never reaches the DB. SHA-256 hashes so plaintext never reaches the DB.
OAuth (Google):
GET /auth/oauth/{provider}/authorize — returns consent-screen URL + state
POST /auth/oauth/{provider}/callback — exchanges code, issues JWT tokens
""" """
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
import json
import time import time
import urllib.parse
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Literal
import bcrypt import bcrypt
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import RedirectResponse
from jose import jwt from jose import jwt
from pydantic import BaseModel from pydantic import BaseModel, Field
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user from app.api.deps import get_current_user
from app.auth.oauth_providers import GoogleOAuthProvider, generate_pkce_pair
from app.config.settings import settings from app.config.settings import settings
from app.core.llm import get_llm
from app.core.memory_middleware import MemoryMiddleware
from app.db import get_session from app.db import get_session
from app.models import RefreshToken, User from app.models import OAuthAccount, RefreshToken, User
from app.schemas import AuthTokens, UserProfile from app.schemas import AuthTokens, UserProfile
router = APIRouter(prefix="/auth", tags=["auth"]) router = APIRouter(prefix="/auth", tags=["auth"])
# ── OAuth provider registry ───────────────────────────────────────────
def _get_google_provider() -> GoogleOAuthProvider:
if not settings.GOOGLE_AUTH_CLIENT_ID or not settings.GOOGLE_AUTH_CLIENT_SECRET:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
"Google login is not configured on this server",
)
return GoogleOAuthProvider(
client_id=settings.GOOGLE_AUTH_CLIENT_ID,
client_secret=settings.GOOGLE_AUTH_CLIENT_SECRET,
redirect_uri=settings.OAUTH_REDIRECT_URI,
)
_PROVIDERS = {"google": _get_google_provider}
# In-memory state store: state → (code_verifier, expires_at_epoch_s)
# Production note: replace with Redis for multi-process deployments.
_pending_states: dict[str, tuple[str, float]] = {}
_STATE_TTL_SECONDS = 600 # 10 minutes
# ── Internal helpers ───────────────────────────────────────────────── # ── Internal helpers ─────────────────────────────────────────────────
@@ -231,5 +265,531 @@ async def update_profile(
email=user.email, email=user.email,
name=user.name, name=user.name,
surname=user.surname, surname=user.surname,
avatar_url=user.avatar_url,
tier=current_user.tier, tier=current_user.tier,
) )
# ── OAuth helpers ─────────────────────────────────────────────────────
async def _issue_refresh_token(user: User, db: AsyncSession) -> tuple[str, AuthTokens]:
"""Create a refresh token row and return (plain_token, AuthTokens)."""
plain_token = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(
days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
rt = RefreshToken(
user_id=user.id,
token_hash=_hash_token(plain_token),
expires_at=expires_at,
)
db.add(rt)
access_token, expires_at_ms = _make_access_token(user.id, user.email, user.tier)
return plain_token, AuthTokens(
access_token=access_token,
refresh_token=plain_token,
expires_at=expires_at_ms,
)
# ── OAuth request/response schemas ───────────────────────────────────
class _OAuthAuthorizeResponse(BaseModel):
url: str
state: str
class _OAuthCallbackRequest(BaseModel):
code: str
state: str
# ── OAuth routes ──────────────────────────────────────────────────────
@router.get(
"/oauth/{provider}/web-callback",
summary="Web-facing OAuth redirect — bounces to the adiuvai:// deep link",
include_in_schema=False,
)
async def oauth_web_callback(
provider: Literal["google"],
code: str,
state: str,
) -> RedirectResponse:
"""Google redirects here after user consent.
This endpoint immediately redirects to the Electron deep-link URI so the
desktop app receives the authorization code. It is intentionally simple —
no state validation here (the Electron app + backend callback do that).
Registered in Google Cloud Console as:
http://localhost:8000/api/v1/auth/oauth/google/web-callback (dev)
https://api.adiuvai.com/api/v1/auth/oauth/google/web-callback (prod)
"""
params = urllib.parse.urlencode({"code": code, "state": state, "provider": provider})
deep_link = f"adiuvai://oauth/callback?{params}"
return RedirectResponse(url=deep_link, status_code=302)
@router.get(
"/oauth/{provider}/authorize",
response_model=_OAuthAuthorizeResponse,
summary="Start OAuth flow — returns the provider consent-screen URL",
)
async def oauth_authorize(
provider: Literal["google"],
) -> _OAuthAuthorizeResponse:
"""Generate a PKCE state + code_challenge and return the authorization URL.
The client opens this URL in the system browser. After the user grants
consent, the provider redirects to the deep-link URI (adiuvai://oauth/callback)
with ``code`` and ``state`` query params. The client then calls
``POST /auth/oauth/{provider}/callback`` with those values.
"""
provider_factory = _PROVIDERS.get(provider)
if provider_factory is None:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Unknown provider: {provider}")
oauth_provider = provider_factory()
state = str(uuid.uuid4())
code_verifier, code_challenge = generate_pkce_pair()
# Purge expired states to prevent unbounded growth.
now = time.time()
expired = [s for s, (_, exp) in _pending_states.items() if exp < now]
for s in expired:
del _pending_states[s]
_pending_states[state] = (code_verifier, now + _STATE_TTL_SECONDS)
url = oauth_provider.get_authorization_url(state=state, code_challenge=code_challenge)
return _OAuthAuthorizeResponse(url=url, state=state)
@router.post(
"/oauth/{provider}/callback",
response_model=AuthTokens,
summary="Complete OAuth flow — exchange code and issue JWT tokens",
)
async def oauth_callback(
provider: Literal["google"],
body: _OAuthCallbackRequest,
db: AsyncSession = Depends(get_session),
) -> AuthTokens:
"""Validate state, exchange the authorization code, and sign in (or register) the user.
Resolution order:
1. ``oauth_accounts`` row match → existing user, log in.
2. Email match + ``email_verified=True`` → link OAuth account to existing user.
3. No match → create new user (password_hash=None, avatar from provider).
"""
provider_factory = _PROVIDERS.get(provider)
if provider_factory is None:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Unknown provider: {provider}")
# Validate state (CSRF protection).
now = time.time()
entry = _pending_states.pop(body.state, None)
if entry is None or entry[1] < now:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired OAuth state")
code_verifier, _ = entry
oauth_provider = provider_factory()
# Exchange code for tokens.
try:
token_data = await oauth_provider.exchange_code(
code=body.code,
code_verifier=code_verifier,
redirect_uri=settings.OAUTH_REDIRECT_URI,
)
except Exception:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Failed to exchange authorization code"
)
access_token_google = token_data.get("access_token")
if not access_token_google:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No access token in provider response")
# Fetch user identity.
try:
userinfo = await oauth_provider.get_userinfo(access_token_google)
except Exception:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Failed to fetch user info from provider")
# ── Resolution order ──────────────────────────────────────────────
# 1. Existing OAuth link?
oauth_result = await db.execute(
select(OAuthAccount).where(
OAuthAccount.provider == provider,
OAuthAccount.provider_user_id == userinfo.provider_user_id,
)
)
oauth_account = oauth_result.scalar_one_or_none()
if oauth_account is not None:
user_result = await db.execute(select(User).where(User.id == oauth_account.user_id))
user = user_result.scalar_one()
# Backfill avatar if the user doesn't have one yet.
if user.avatar_url is None and userinfo.avatar_url:
user.avatar_url = userinfo.avatar_url
await db.commit()
plain_token, tokens = await _issue_refresh_token(user, db)
await db.commit()
return tokens
# 2. Email match with a verified Google email → link accounts.
if userinfo.email_verified:
email_result = await db.execute(select(User).where(User.email == userinfo.email))
existing_user = email_result.scalar_one_or_none()
if existing_user is not None:
new_link = OAuthAccount(
user_id=existing_user.id,
provider=provider,
provider_user_id=userinfo.provider_user_id,
provider_email=userinfo.email,
)
db.add(new_link)
if existing_user.avatar_url is None and userinfo.avatar_url:
existing_user.avatar_url = userinfo.avatar_url
plain_token, tokens = await _issue_refresh_token(existing_user, db)
await db.commit()
return tokens
# Guard: if the email is already taken but we couldn't auto-link (e.g.
# email_verified=False), refuse with 409 instead of hitting a DB constraint.
if not userinfo.email_verified:
conflict = await db.execute(select(User).where(User.email == userinfo.email))
if conflict.scalar_one_or_none() is not None:
raise HTTPException(
status.HTTP_409_CONFLICT,
"An account with this email already exists. "
"Please sign in with your password.",
)
# 3. New user — social-only account (no password).
new_user = User(
id=str(uuid.uuid4()),
email=userinfo.email,
name=userinfo.name,
password_hash=None,
avatar_url=userinfo.avatar_url,
tier="free",
encryption_key=Fernet.generate_key().decode(),
)
db.add(new_user)
await db.flush() # populate new_user.id
new_oauth = OAuthAccount(
user_id=new_user.id,
provider=provider,
provider_user_id=userinfo.provider_user_id,
provider_email=userinfo.email,
)
db.add(new_oauth)
plain_token, tokens = await _issue_refresh_token(new_user, db)
await db.commit()
return tokens
# ── Onboarding helpers ────────────────────────────────────────────────
async def _build_profile(user_id: str, email: str, db: AsyncSession) -> UserProfile:
"""Re-fetch and return a full UserProfile (reuses get_current_user logic)."""
# We can't call the FastAPI dependency directly, but we can replicate
# the core logic inline. Instead, we just re-query the same way.
from app.models import Subscription # noqa: PLC0415
result = await db.execute(
select(Subscription.tier).where(Subscription.user_id == user_id)
)
default_tier = "power" if settings.ENV == "dev" else "free"
tier: str = result.scalar_one_or_none() or default_tier
user_result = await db.execute(
select(
User.name, User.surname, User.avatar_url, User.onboarding_completed_at,
User.password_hash,
).where(User.id == user_id)
)
user_row = user_result.one_or_none()
onboarding_ms: int | None = None
if user_row and user_row.onboarding_completed_at is not None:
onboarding_ms = int(user_row.onboarding_completed_at.timestamp() * 1000)
memory_dict: dict[str, str] = {}
try:
mw = MemoryMiddleware(db)
blocks = await mw.list_core_blocks(user_id)
memory_dict = {b["label"]: b["value"] for b in blocks}
except Exception:
pass
return UserProfile(
id=user_id,
email=email,
name=user_row.name if user_row else None,
surname=user_row.surname if user_row else None,
avatar_url=user_row.avatar_url if user_row else None,
has_password=bool(user_row.password_hash) if user_row else False,
tier=tier,
onboarding_completed_at=onboarding_ms,
memory=memory_dict,
)
# ── Onboarding routes ────────────────────────────────────────────────
class _UpdateMemoryRequest(BaseModel):
memory: dict[str, str] = Field(default_factory=dict)
mark_onboarded: bool = False
@router.put("/me/memory", response_model=UserProfile)
async def update_memory(
body: _UpdateMemoryRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> UserProfile:
"""Update core memory key/value pairs and optionally mark onboarding complete."""
mw = MemoryMiddleware(db)
for key, value in body.memory.items():
await mw.update_core(current_user.id, key, value)
if body.mark_onboarded:
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
user.onboarding_completed_at = datetime.now(timezone.utc)
await db.commit()
return await _build_profile(current_user.id, current_user.email, db)
@router.post("/me/onboarding/reset")
async def reset_onboarding(
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
):
"""Reset onboarding so the wizard runs again on next login."""
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
user.onboarding_completed_at = None
await db.commit()
return {"status": "reset"}
class _NormalizeRequest(BaseModel):
inputs: dict[str, str]
class _NormalizeResponse(BaseModel):
normalized: dict[str, str]
@router.post("/onboarding/normalize", response_model=_NormalizeResponse)
async def normalize_onboarding(
body: _NormalizeRequest,
current_user: UserProfile = Depends(get_current_user),
) -> _NormalizeResponse:
"""One-shot LLM normalization for free-text onboarding answers."""
if not body.inputs:
return _NormalizeResponse(normalized={})
try:
llm = get_llm(model="gpt-4o-mini", temperature=0)
prompt = (
"You normalize user onboarding answers into clean, ≤3-word canonical labels.\n"
"Return a JSON object with the same keys and normalized values.\n"
"Examples: 'i build websites''Web Developer', 'tech-ish stuff''Technology'\n"
f"Input: {json.dumps(body.inputs)}"
)
response = await llm.ainvoke(
[
{"role": "system", "content": "You normalize user inputs. Return JSON only."},
{"role": "user", "content": prompt},
],
)
normalized = json.loads(response.content)
return _NormalizeResponse(normalized=normalized)
except Exception:
# LLM failure must never block onboarding — return inputs unchanged
return _NormalizeResponse(normalized=body.inputs)
# ── Password management ───────────────────────────────────────────────
class _ChangePasswordRequest(BaseModel):
current_password: str = Field(min_length=1)
new_password: str = Field(min_length=8)
@router.put("/me/password", status_code=status.HTTP_200_OK)
async def change_password(
body: _ChangePasswordRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> dict[str, bool]:
"""Change the authenticated user's password.
Requires the current password for verification.
Returns 400 for social-only users (no password set).
"""
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
if user.password_hash is None:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"This account uses social login and has no password to change",
)
if not _verify_password(body.current_password, user.password_hash):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Current password is incorrect")
user.password_hash = _hash_password(body.new_password)
await db.commit()
return {"ok": True}
# ── OAuth account management ─────────────────────────────────────────
@router.get("/me/oauth-accounts", response_model=list[dict])
async def list_oauth_accounts(
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> list[dict]:
"""List all OAuth providers linked to the authenticated user."""
result = await db.execute(
select(OAuthAccount).where(OAuthAccount.user_id == current_user.id)
)
accounts = result.scalars().all()
return [
{
"provider": a.provider,
"provider_email": a.provider_email,
"created_at": int(a.created_at.timestamp() * 1000),
}
for a in accounts
]
@router.delete("/me/oauth-accounts/{provider}", status_code=status.HTTP_200_OK)
async def unlink_oauth_account(
provider: str,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> dict[str, bool]:
"""Unlink an OAuth provider from the authenticated user.
Refuses if the user has no password and this is their only login method.
"""
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
oauth_result = await db.execute(
select(OAuthAccount).where(
OAuthAccount.user_id == current_user.id,
OAuthAccount.provider == provider,
)
)
account = oauth_result.scalar_one_or_none()
if account is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"No linked {provider} account found")
# Safety: don't let users lock themselves out.
all_oauth = await db.execute(
select(OAuthAccount).where(OAuthAccount.user_id == current_user.id)
)
oauth_count = len(all_oauth.scalars().all())
if user.password_hash is None and oauth_count <= 1:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"Cannot unlink the only login method. Set a password first.",
)
await db.delete(account)
await db.commit()
return {"ok": True}
# ── Avatar update ─────────────────────────────────────────────────────
class _UpdateAvatarRequest(BaseModel):
avatar_url: str = Field(min_length=1)
@router.put("/me/avatar", response_model=UserProfile)
async def update_avatar(
body: _UpdateAvatarRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> UserProfile:
"""Update the authenticated user's avatar URL.
Accepts {"avatar_url": "https://..."} — the client uploads the image
to its own storage and passes the resulting URL here.
"""
if not body.avatar_url.startswith(("https://", "http://", "data:image/")):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid avatar URL")
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
user.avatar_url = body.avatar_url
await db.commit()
return await _build_profile(current_user.id, current_user.email, db)
# ── Account deletion ─────────────────────────────────────────────────
@router.delete("/me", status_code=status.HTTP_200_OK)
async def delete_account(
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> dict[str, bool]:
"""Permanently delete the authenticated user's account.
Cascades: refresh tokens, OAuth accounts, subscription, and all memory
rows are deleted via SQLAlchemy relationship cascades. Stripe subscription
is cancelled if active.
"""
# Cancel Stripe subscription if present.
try:
from app.billing.stripe_service import stripe_service # noqa: PLC0415
await stripe_service.cancel_subscription(current_user.id, db)
except HTTPException:
pass # No subscription — that's fine
# Delete all memory rows (core, associative, episodic, proactive).
try:
from app.models import ( # noqa: PLC0415
MemoryAssociative, MemoryCore, MemoryEpisodic, MemoryProactive,
)
for model in (MemoryCore, MemoryAssociative, MemoryEpisodic, MemoryProactive):
await db.execute(
model.__table__.delete().where(model.user_id == current_user.id)
)
except Exception:
pass # Non-critical — cascade on User will handle most
# Delete the user row — cascades handle refresh_tokens, oauth_accounts, subscription.
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
await db.delete(user)
await db.commit()
return {"ok": True}

View File

@@ -83,3 +83,16 @@ async def cancel_subscription(
"""Cancel the active subscription.""" """Cancel the active subscription."""
await stripe_service.cancel_subscription(current_user.id, db) await stripe_service.cancel_subscription(current_user.id, db)
return {"ok": True} return {"ok": True}
@router.get("/invoices", response_model=list[dict])
async def list_invoices(
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> list[dict[str, Any]]:
"""Return billing history (invoices) from Stripe.
Returns an empty list when Stripe is not configured.
"""
invoices = await stripe_service.list_invoices(current_user.id, db)
return invoices

1
app/auth/__init__.py Normal file
View File

@@ -0,0 +1 @@
"OAuth provider abstractions and utilities."

135
app/auth/oauth_providers.py Normal file
View File

@@ -0,0 +1,135 @@
"""OAuth 2.0 + PKCE provider abstractions.
Each provider implements a three-step flow designed for a desktop (public) client:
1. get_authorization_url(state, code_challenge) → str
Build the provider's consent-screen URL. State and code_challenge are
generated server-side; the client opens this URL in the system browser.
2. exchange_code(code, code_verifier, redirect_uri) → dict
Exchange the short-lived authorization code for an access token.
The code_verifier proves ownership of the PKCE challenge.
3. get_userinfo(access_token) → OAuthUserInfo
Fetch the canonical user identity from the provider.
Currently supported providers:
- GoogleOAuthProvider (scope: openid email profile)
Adding a new provider:
- Implement the three methods above.
- Register in _PROVIDERS inside routes/auth.py.
"""
from __future__ import annotations
import base64
import hashlib
import os
import urllib.parse
from dataclasses import dataclass
import httpx
# ── Data transfer objects ─────────────────────────────────────────────
@dataclass
class OAuthUserInfo:
"""Normalized user identity returned by any provider."""
provider_user_id: str
email: str
email_verified: bool
avatar_url: str | None
name: str | None
# ── PKCE helpers ──────────────────────────────────────────────────────
def generate_pkce_pair() -> tuple[str, str]:
"""Generate a (code_verifier, code_challenge) pair for PKCE S256.
The code_verifier is a random 32-byte URL-safe base64 string.
The code_challenge is SHA-256(code_verifier) base64url-encoded (no padding).
"""
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode()
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
return code_verifier, code_challenge
# ── Google provider ───────────────────────────────────────────────────
class GoogleOAuthProvider:
"""Google OAuth 2.0 provider (openid email profile scope).
Uses Google's standard authorization endpoint with PKCE S256.
Does NOT use google-auth-oauthlib to keep the flow generic and async.
"""
name = "google"
_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_TOKEN_URL = "https://oauth2.googleapis.com/token"
_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
def __init__(self, client_id: str, client_secret: str, redirect_uri: str) -> None:
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
def get_authorization_url(self, state: str, code_challenge: str) -> str:
"""Build the Google consent-screen URL."""
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"scope": "openid email profile",
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline",
"prompt": "select_account",
}
return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
async def exchange_code(
self, code: str, code_verifier: str, redirect_uri: str
) -> dict:
"""Exchange authorization code for an access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self._TOKEN_URL,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"code_verifier": code_verifier,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
response.raise_for_status()
return response.json()
async def get_userinfo(self, access_token: str) -> OAuthUserInfo:
"""Fetch the authenticated user's identity from Google."""
async with httpx.AsyncClient() as client:
response = await client.get(
self._USERINFO_URL,
headers={"Authorization": f"Bearer {access_token}"},
)
response.raise_for_status()
data = response.json()
return OAuthUserInfo(
provider_user_id=data["sub"],
email=data["email"],
email_verified=data.get("email_verified", False),
avatar_url=data.get("picture"),
name=data.get("name"),
)

View File

@@ -43,8 +43,8 @@ class StripeService:
self, self,
user_id: str, user_id: str,
tier: str, tier: str,
success_url: str = "https://app.adiuva.app/billing/success?session_id={CHECKOUT_SESSION_ID}", success_url: str = "https://app.adiuvai.app/billing/success?session_id={CHECKOUT_SESSION_ID}",
cancel_url: str = "https://app.adiuva.app/billing/cancel", cancel_url: str = "https://app.adiuvai.app/billing/cancel",
) -> str: ) -> str:
"""Create a Stripe checkout session and return the URL. """Create a Stripe checkout session and return the URL.
@@ -200,6 +200,45 @@ class StripeService:
sub.status = "canceled" sub.status = "canceled"
await db.commit() await db.commit()
async def list_invoices(
self, user_id: str, db: AsyncSession, limit: int = 24
) -> list[dict[str, Any]]:
"""Return recent invoices for the user from Stripe.
Returns an empty list when Stripe is not configured or the user has
no ``stripe_customer_id``.
"""
if not self._configured():
return []
from app.models import User # noqa: PLC0415
result = await db.execute(
select(User.stripe_customer_id).where(User.id == user_id)
)
customer_id = result.scalar_one_or_none()
if not customer_id:
return []
try:
s = self._client()
invoices = s.Invoice.list(customer=customer_id, limit=limit)
return [
{
"id": inv.id,
"amount_due": inv.amount_due,
"amount_paid": inv.amount_paid,
"currency": inv.currency,
"status": inv.status,
"created": inv.created * 1000, # epoch ms
"invoice_url": inv.hosted_invoice_url,
"invoice_pdf": inv.invoice_pdf,
}
for inv in invoices.auto_paging_iter()
]
except Exception:
return []
# ── Private DB helpers ─────────────────────────────────────────────── # ── Private DB helpers ───────────────────────────────────────────────
async def _upsert_subscription( async def _upsert_subscription(

View File

@@ -3,7 +3,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings): class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/adiuva" DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/adiuvai"
JWT_SECRET: str = "change-me-in-production" JWT_SECRET: str = "change-me-in-production"
JWT_ALGORITHM: str = "HS256" JWT_ALGORITHM: str = "HS256"
JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
@@ -18,9 +18,16 @@ class Settings(BaseSettings):
CEREBRAS_API_KEY: str = "" CEREBRAS_API_KEY: str = ""
LLM_MODEL: str = "gpt-4o" LLM_MODEL: str = "gpt-4o"
LLM_ROUTER_MODEL: str = "gpt-4o-mini"
LLM_EMBED_MODEL: str = "text-embedding-3-small" LLM_EMBED_MODEL: str = "text-embedding-3-small"
# Per-agent model overrides. Leave empty to fall back to LLM_MODEL.
LLM_MODEL_CLASSIFIER: str = "" # _infer_floating_domain (intent routing)
LLM_MODEL_HOME_AGENT: str = "" # home-agent (run_single_agent / stream)
LLM_MODEL_FLOATING_AGENT: str = "" # floating-agent (contextual chat)
LLM_MODEL_UNIFIED_PROCESSOR: str = "" # unified-processor (agent_runner)
LLM_MODEL_CLOUD_PROCESSOR: str = "" # cloud-processor (agent_runner)
LLM_MODEL_SETUP_AGENT: str = "" # agent-setup journey
# GitHub Copilot OAuth token storage directory. # GitHub Copilot OAuth token storage directory.
# Leave empty to use the LiteLLM default (~/.config/litellm/github_copilot). # Leave empty to use the LiteLLM default (~/.config/litellm/github_copilot).
# In Docker, set this to a path backed by a named volume so tokens survive restarts. # In Docker, set this to a path backed by a named volume so tokens survive restarts.
@@ -34,16 +41,33 @@ class Settings(BaseSettings):
# MS_TENANT_ID: set to 'common' to allow multi-tenant (personal + work accounts). # MS_TENANT_ID: set to 'common' to allow multi-tenant (personal + work accounts).
MS_TENANT_ID: str = "common" MS_TENANT_ID: str = "common"
# Google Login OAuth credentials — scope: openid email profile.
# Separate from GMAIL_CLIENT_ID/SECRET (which uses gmail.readonly scope).
GOOGLE_AUTH_CLIENT_ID: str = ""
GOOGLE_AUTH_CLIENT_SECRET: str = ""
# The redirect URI registered in Google Cloud Console.
# Google redirects here after consent; this backend route then bounces to
# the adiuvai:// deep link so the Electron app receives the code.
# Dev: http://localhost:8000/api/v1/auth/oauth/google/web-callback
# Prod: https://api.adiuvai.com/api/v1/auth/oauth/google/web-callback
OAUTH_REDIRECT_URI: str = "http://localhost:8000/api/v1/auth/oauth/google/web-callback"
# Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth # Fernet key (URL-safe base64, 32-byte key) for at-rest encryption of OAuth
# tokens stored in cloud_agent_configs.oauth_token_encrypted. # tokens stored in cloud_agent_configs.oauth_token_encrypted.
# Generate with: from cryptography.fernet import Fernet; Fernet.generate_key() # Generate with: from cryptography.fernet import Fernet; Fernet.generate_key()
OAUTH_ENCRYPTION_KEY: str = "" OAUTH_ENCRYPTION_KEY: str = ""
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"] CORS_ORIGINS: list[str] = [
"app://.",
"http://localhost:3000",
"http://localhost:5173",
"http://localhost:4173", # Vite preview (web SPA)
"https://app.adiuvai.com", # Production web portal
]
LANGFUSE_SECRET_KEY: str = "" LANGFUSE_SECRET_KEY: str = ""
LANGFUSE_PUBLIC_KEY: str = "" LANGFUSE_PUBLIC_KEY: str = ""
LANGFUSE_HOST: str = "https://cloud.langfuse.com" LANGFUSE_BASE_URL: str = "https://cloud.langfuse.com"
ENV: Literal["dev", "prod"] = "dev" ENV: Literal["dev", "prod"] = "dev"

View File

@@ -30,7 +30,6 @@ import asyncio
import json import json
import logging import logging
import os import os
import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
@@ -43,10 +42,9 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.config.settings import settings
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import compile_prompt, extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_llm from app.core.llm import get_agent_llm, model_for_agent
from app.core.preprocessors import detect_content_type, preprocess from app.core.preprocessors import detect_content_type, preprocess
from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor from app.core.ws_context import clear_client_executor, execute_on_client, set_client_executor
from app.db import async_session from app.db import async_session
@@ -74,13 +72,13 @@ _MAX_PROCESSING_STEPS: int = 12
_MAX_SCAN_DEPTH: int = 5 _MAX_SCAN_DEPTH: int = 5
# ── Data-type to tool mapping ───────────────────────────────────────────── # ── Data-type to tool mapping ─────────────────────────────────────────────
# NOTE: "projects" is intentionally excluded — project creation/assignment is
# handled in code by the runner, never delegated to the Step 2 LLM.
_DATA_TYPE_TOOLS: dict[str, list[Any]] = { _DATA_TYPE_TOOLS: dict[str, list[Any]] = {
"tasks": TASK_TOOLS, "tasks": TASK_TOOLS,
"notes": NOTE_TOOLS, "notes": NOTE_TOOLS,
"timelines": TIMELINE_TOOLS, "timelines": TIMELINE_TOOLS,
"timelineEvents": TIMELINE_TOOLS,
"projects": PROJECT_TOOLS,
} }
# ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ── # ── V2: Unified processing prompt (hot-swappable via Langfuse "unified_processing") ──
@@ -228,6 +226,7 @@ async def _run_agent_with_tools(
tools: list[Any], tools: list[Any],
max_steps: int, max_steps: int,
user_id: str = "", user_id: str = "",
session_id: str = "",
langfuse_prompt: Any = None, langfuse_prompt: Any = None,
agent_name: str = "batch-agent", agent_name: str = "batch-agent",
_tool_calls_out: list[str] | None = None, _tool_calls_out: list[str] | None = None,
@@ -238,7 +237,7 @@ async def _run_agent_with_tools(
run is appended to it (used by the caller to count ``create_*`` calls). run is appended to it (used by the caller to count ``create_*`` calls).
""" """
lf = get_langfuse() lf = get_langfuse()
llm = get_llm() llm = get_agent_llm(agent_name)
llm_with_tools = llm.bind_tools(tools) llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [ messages: list[Any] = [
SystemMessage(content=system_prompt), SystemMessage(content=system_prompt),
@@ -247,6 +246,9 @@ async def _run_agent_with_tools(
tool_map = {tool_def.name: tool_def for tool_def in tools} tool_map = {tool_def.name: tool_def for tool_def in tools}
_lf_ctx = langfuse_context(user_id=user_id or None, session_id=session_id or None)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -264,7 +266,7 @@ async def _run_agent_with_tools(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="generation", as_type="generation",
name=f"{agent_name}-llm", name=f"{agent_name}-llm",
model=settings.LLM_MODEL, model=model_for_agent(agent_name),
prompt=langfuse_prompt, prompt=langfuse_prompt,
input=messages, input=messages,
) )
@@ -273,7 +275,7 @@ async def _run_agent_with_tools(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -318,6 +320,7 @@ async def _run_agent_with_tools(
finally: finally:
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -386,7 +389,8 @@ async def _scan_directories(
for file_path in all_files: for file_path in all_files:
try: try:
meta = await execute_on_client(action="get_file_metadata", data={"path": file_path}) meta = await execute_on_client(action="get_file_metadata", data={"path": file_path})
modified_at = meta.get("modifiedAt") # FE sends snake_case keys on the wire (toSnakeCase transform)
modified_at = meta.get("modified_at") or meta.get("modifiedAt")
if modified_at is None: if modified_at is None:
filtered.append(file_path) filtered.append(file_path)
continue continue
@@ -607,7 +611,6 @@ async def run_local_agent(
try: try:
# ── Code: scan directories ─────────────────────────────────── # ── Code: scan directories ───────────────────────────────────
logger.info("agent_runner: run=%s scanning directories user=%s", run_id, user_id)
file_paths = await _scan_directories( file_paths = await _scan_directories(
paths=config.directory_paths, paths=config.directory_paths,
extensions=config.file_extensions or [], extensions=config.file_extensions or [],
@@ -686,6 +689,7 @@ async def run_local_agent(
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
session_id=run_id,
langfuse_prompt=prompt_obj, langfuse_prompt=prompt_obj,
agent_name="unified-processor", agent_name="unified-processor",
_tool_calls_out=file_tool_calls, _tool_calls_out=file_tool_calls,
@@ -696,6 +700,12 @@ async def run_local_agent(
) )
items_created += file_created items_created += file_created
# Refresh project list when a project was created so
# subsequent files see it in the prompt context.
if "create_project" in file_tool_calls:
projects = await _fetch_projects()
projects_block = _format_projects(projects)
logger.info( logger.info(
"agent_runner: run=%s file=%r created=%d result=%s", "agent_runner: run=%s file=%r created=%d result=%s",
run_id, file_path, file_created, result_text[:200], run_id, file_path, file_created, result_text[:200],
@@ -911,6 +921,7 @@ async def run_cloud_agent(
tools=processing_tools, tools=processing_tools,
max_steps=_MAX_PROCESSING_STEPS, max_steps=_MAX_PROCESSING_STEPS,
user_id=user_id, user_id=user_id,
session_id=run_id,
langfuse_prompt=cloud_prompt_obj, langfuse_prompt=cloud_prompt_obj,
agent_name="cloud-processor", agent_name="cloud-processor",
) )

View File

@@ -16,9 +16,8 @@ from app.agents.note_agent import NOTE_TOOLS
from app.agents.project_agent import PROJECT_TOOLS from app.agents.project_agent import PROJECT_TOOLS
from app.agents.task_agent import TASK_TOOLS from app.agents.task_agent import TASK_TOOLS
from app.agents.timeline_agent import TIMELINE_TOOLS from app.agents.timeline_agent import TIMELINE_TOOLS
from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback from app.core.langfuse_client import extract_usage, get_langfuse, get_prompt_or_fallback, langfuse_context
from app.core.llm import get_llm from app.core.llm import get_agent_llm, model_for_agent
from app.config.settings import settings
from app.core.memory_middleware import MemoryMiddleware from app.core.memory_middleware import MemoryMiddleware
from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector from app.core.ws_context import clear_tool_result_collector, execute_on_client, set_tool_result_collector
from app.db import async_session from app.db import async_session
@@ -28,6 +27,34 @@ logger = logging.getLogger(__name__)
FloatingDomainType = Literal["task", "timeline", "project", "node"] FloatingDomainType = Literal["task", "timeline", "project", "node"]
FloatingDomainSection = Literal["task", "timeline", "note"] FloatingDomainSection = Literal["task", "timeline", "note"]
# Mapping of core-memory language values to natural-language names for prompts.
_LANGUAGE_NAMES: dict[str, str] = {
"en": "English", "it": "Italian", "es": "Spanish",
"fr": "French", "de": "German",
"english": "English", "italian": "Italian", "italiano": "Italian",
"spanish": "Spanish", "español": "Spanish",
"french": "French", "français": "French",
"german": "German", "deutsch": "German",
}
def _language_instruction(context: dict[str, Any]) -> str:
"""Return a system-prompt suffix that tells the LLM to respond in the user's language.
Returns an empty string when the language is English or unknown — saves tokens.
"""
core = context.get("core_memory") or {}
raw = (core.get("language") or "").strip().lower()
if not raw:
return ""
lang = _LANGUAGE_NAMES.get(raw, raw.title()) # best-effort capitalisation
if lang.lower() == "english":
return ""
return (
f"\n\nIMPORTANT: Always respond in {lang}. "
f"All your output text must be written in {lang}."
)
_HOME_SYSTEM_PROMPT = ( _HOME_SYSTEM_PROMPT = (
"You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. " "You are the home assistant with direct access to all tools: tasks, projects, notes, timelines, and memory tools. "
"Always use tools for factual data retrieval before answering. " "Always use tools for factual data retrieval before answering. "
@@ -149,6 +176,15 @@ def _trace_id_from_context(context: dict[str, Any]) -> str | None:
return None return None
def _session_id_from_context(context: dict[str, Any]) -> str | None:
debug = context.get("_debug")
if isinstance(debug, dict):
session_id = debug.get("session_id")
if isinstance(session_id, str) and session_id:
return session_id
return None
def _context_for_model(context: dict[str, Any]) -> dict[str, Any]: def _context_for_model(context: dict[str, Any]) -> dict[str, Any]:
sanitized = dict(context) sanitized = dict(context)
sanitized.pop("_debug", None) sanitized.pop("_debug", None)
@@ -537,7 +573,7 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
} }
try: try:
llm = get_llm() llm = get_agent_llm("classifier")
classifier_messages = [ classifier_messages = [
SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT), SystemMessage(content=_FLOATING_DOMAIN_CLASSIFIER_PROMPT),
HumanMessage( HumanMessage(
@@ -551,18 +587,25 @@ async def _infer_floating_domain(message: str, context: dict[str, Any]) -> dict[
_, classifier_prompt_obj = get_prompt_or_fallback( _, classifier_prompt_obj = get_prompt_or_fallback(
"floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT "floating_domain_classifier", _FLOATING_DOMAIN_CLASSIFIER_PROMPT
) )
if lf:
with lf.start_as_current_observation( # Extract user/session from context for Langfuse attribution
as_type="generation", _debug = context.get("_debug") if isinstance(context, dict) else None
name="floating-classifier", _lf_user = (_debug or {}).get("user_id") if isinstance(_debug, dict) else None
model=settings.LLM_MODEL, _lf_session = (_debug or {}).get("session_id") if isinstance(_debug, dict) else None
prompt=classifier_prompt_obj,
input=classifier_messages, with langfuse_context(user_id=_lf_user, session_id=_lf_session):
) as gen: if lf:
with lf.start_as_current_observation(
as_type="generation",
name="floating-classifier",
model=model_for_agent("classifier"),
prompt=classifier_prompt_obj,
input=classifier_messages,
) as gen:
response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages) response = await llm.ainvoke(classifier_messages)
gen.update(output=_as_text(response.content), usage=extract_usage(response))
else:
response = await llm.ainvoke(classifier_messages)
parsed = _parse_json_object(_as_text(response.content)) parsed = _parse_json_object(_as_text(response.content))
if parsed is not None: if parsed is not None:
domain = _normalize_domain_payload(parsed, project_id) domain = _normalize_domain_payload(parsed, project_id)
@@ -591,8 +634,9 @@ async def _run_single_agent(
agent_name: str = "agent", agent_name: str = "agent",
) -> str: ) -> str:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse() lf = get_langfuse()
llm = get_llm() llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context) model_context = _context_for_model(context)
logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id) logger.info("deep_agent: run_single_agent_start trace=%s user=%s", trace_id or "-", user_id)
@@ -611,6 +655,9 @@ async def _run_single_agent(
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -628,7 +675,7 @@ async def _run_single_agent(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="generation", as_type="generation",
name=f"{agent_name}-llm", name=f"{agent_name}-llm",
model=settings.LLM_MODEL, model=model_for_agent(agent_name),
prompt=langfuse_prompt, prompt=langfuse_prompt,
input=messages, input=messages,
) )
@@ -637,7 +684,7 @@ async def _run_single_agent(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -699,6 +746,7 @@ async def _run_single_agent(
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -714,8 +762,9 @@ async def _run_single_agent_stream(
agent_name: str = "agent", agent_name: str = "agent",
) -> AsyncGenerator[tuple[str, Any], None]: ) -> AsyncGenerator[tuple[str, Any], None]:
trace_id = _trace_id_from_context(context) trace_id = _trace_id_from_context(context)
session_id = _session_id_from_context(context)
lf = get_langfuse() lf = get_langfuse()
llm = get_llm() llm = get_agent_llm(agent_name)
tools = _all_tools_for_user(user_id, trace_id) tools = _all_tools_for_user(user_id, trace_id)
model_context = _context_for_model(context) model_context = _context_for_model(context)
logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id) logger.info("deep_agent: run_single_agent_stream_start trace=%s user=%s", trace_id or "-", user_id)
@@ -735,6 +784,9 @@ async def _run_single_agent_stream(
collected: list[dict[str, Any]] = [] collected: list[dict[str, Any]] = []
set_tool_result_collector(collected) set_tool_result_collector(collected)
_lf_ctx = langfuse_context(user_id=user_id, session_id=session_id)
_lf_ctx.__enter__()
_span_ctx = ( _span_ctx = (
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="span", as_type="span",
@@ -753,7 +805,7 @@ async def _run_single_agent_stream(
lf.start_as_current_observation( lf.start_as_current_observation(
as_type="generation", as_type="generation",
name=f"{agent_name}-llm", name=f"{agent_name}-llm",
model=settings.LLM_MODEL, model=model_for_agent(agent_name),
prompt=langfuse_prompt, prompt=langfuse_prompt,
input=messages, input=messages,
) )
@@ -762,7 +814,7 @@ async def _run_single_agent_stream(
_gen = _gen_ctx.__enter__() if _gen_ctx else None _gen = _gen_ctx.__enter__() if _gen_ctx else None
response: AIMessage = await llm_with_tools.ainvoke(messages) response: AIMessage = await llm_with_tools.ainvoke(messages)
if _gen_ctx: if _gen_ctx:
_gen.update(output=_as_text(response.content), usage=extract_usage(response)) _gen.update(output=_as_text(response.content), usage_details=extract_usage(response))
_gen_ctx.__exit__(None, None, None) _gen_ctx.__exit__(None, None, None)
messages.append(response) messages.append(response)
@@ -842,6 +894,7 @@ async def _run_single_agent_stream(
clear_tool_result_collector() clear_tool_result_collector()
if _span_ctx: if _span_ctx:
_span_ctx.__exit__(None, None, None) _span_ctx.__exit__(None, None, None)
_lf_ctx.__exit__(None, None, None)
if lf: if lf:
lf.flush() lf.flush()
@@ -851,6 +904,7 @@ async def run_home(user_id: str, message: str, context: dict[str, Any]) -> str:
system_prompt, langfuse_prompt = get_prompt_or_fallback( system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT "home_system", _HOME_SYSTEM_PROMPT
) )
system_prompt += _language_instruction(context)
response = await _run_single_agent( response = await _run_single_agent(
user_id=user_id, user_id=user_id,
system_prompt=system_prompt, system_prompt=system_prompt,
@@ -868,6 +922,7 @@ async def run_floating(user_id: str, message: str, context: dict[str, Any]) -> t
system_prompt, langfuse_prompt = get_prompt_or_fallback( system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT "floating_system", _FLOATING_SYSTEM_PROMPT
) )
system_prompt += _language_instruction(context)
response = await _run_single_agent( response = await _run_single_agent(
user_id=user_id, user_id=user_id,
system_prompt=system_prompt, system_prompt=system_prompt,
@@ -891,6 +946,7 @@ async def run_home_stream(
system_prompt, langfuse_prompt = get_prompt_or_fallback( system_prompt, langfuse_prompt = get_prompt_or_fallback(
"home_system", _HOME_SYSTEM_PROMPT "home_system", _HOME_SYSTEM_PROMPT
) )
system_prompt += _language_instruction(context)
text_chunks: list[str] = [] text_chunks: list[str] = []
async for event in _run_single_agent_stream( async for event in _run_single_agent_stream(
user_id=user_id, user_id=user_id,
@@ -923,6 +979,7 @@ async def run_floating_stream(
system_prompt, langfuse_prompt = get_prompt_or_fallback( system_prompt, langfuse_prompt = get_prompt_or_fallback(
"floating_system", _FLOATING_SYSTEM_PROMPT "floating_system", _FLOATING_SYSTEM_PROMPT
) )
system_prompt += _language_instruction(context)
sanitizer = _FloatingStreamSanitizer() sanitizer = _FloatingStreamSanitizer()
emitted_sanitized = False emitted_sanitized = False
raw_chunks: list[str] = [] raw_chunks: list[str] = []

View File

@@ -39,8 +39,10 @@ Linking a prompt to a generation::
from __future__ import annotations from __future__ import annotations
import hashlib
import logging import logging
from typing import Any from contextlib import contextmanager
from typing import Any, Generator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -67,9 +69,9 @@ def get_langfuse() -> Any | None:
_client = Langfuse( _client = Langfuse(
secret_key=settings.LANGFUSE_SECRET_KEY, secret_key=settings.LANGFUSE_SECRET_KEY,
public_key=settings.LANGFUSE_PUBLIC_KEY, public_key=settings.LANGFUSE_PUBLIC_KEY,
host=settings.LANGFUSE_HOST, host=settings.LANGFUSE_BASE_URL,
) )
logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_HOST) logger.info("langfuse: client initialized host=%s", settings.LANGFUSE_BASE_URL)
except Exception as exc: except Exception as exc:
logger.warning("langfuse: failed to initialize: %s", exc) logger.warning("langfuse: failed to initialize: %s", exc)
_client = None _client = None
@@ -145,3 +147,44 @@ def extract_usage(response: Any) -> dict[str, int]:
"output": int(meta.get("output_tokens", 0)), "output": int(meta.get("output_tokens", 0)),
"total": int(meta.get("total_tokens", 0)), "total": int(meta.get("total_tokens", 0)),
} }
def hash_user_id(user_id: str) -> str:
"""Return a SHA-256 hash of *user_id* for use as Langfuse ``user_id``.
This avoids sending raw database UUIDs to external observability services
while still providing a stable, deterministic identifier for per-user
metrics in the Langfuse dashboard.
"""
return hashlib.sha256(user_id.encode()).hexdigest()
@contextmanager
def langfuse_context(
user_id: str | None = None,
session_id: str | None = None,
) -> Generator[None, None, None]:
"""Propagate ``user_id`` (hashed) and ``session_id`` to all Langfuse observations.
No-op when Langfuse is not configured or parameters are empty.
"""
lf = get_langfuse()
if lf is None or (not user_id and not session_id):
yield
return
try:
from langfuse import propagate_attributes
except ImportError:
logger.debug("langfuse: propagate_attributes not available — skipping context")
yield
return
attrs: dict[str, str] = {}
if user_id:
attrs["user_id"] = hash_user_id(user_id)
if session_id:
attrs["session_id"] = session_id
with propagate_attributes(**attrs):
yield

View File

@@ -1,6 +1,6 @@
"""LLM factory — centralised model instantiation via LiteLLM. """LLM factory — centralised model instantiation via LiteLLM.
Every agent and the orchestrator call ``get_llm()`` or ``get_router_llm()`` Every agent and the orchestrator call ``get_llm()``
instead of directly constructing a provider-specific class. The model string instead of directly constructing a provider-specific class. The model string
follows the `LiteLLM model naming convention follows the `LiteLLM model naming convention
<https://docs.litellm.ai/docs/providers>`_: <https://docs.litellm.ai/docs/providers>`_:
@@ -11,7 +11,7 @@ follows the `LiteLLM model naming convention
* Ollama: ``ollama/llama3`` * Ollama: ``ollama/llama3``
* Bedrock: ``bedrock/anthropic.claude-v2`` * Bedrock: ``bedrock/anthropic.claude-v2``
Switch providers by changing **LLM_MODEL** / **LLM_ROUTER_MODEL** in ``.env`` Switch providers by changing **LLM_MODEL** in ``.env``
— no code changes required. — no code changes required.
""" """
@@ -19,6 +19,7 @@ from __future__ import annotations
import os import os
import warnings import warnings
from collections.abc import Callable
from openai import AsyncOpenAI from openai import AsyncOpenAI
import litellm import litellm
@@ -95,12 +96,33 @@ def get_llm(
) )
def get_router_llm( _AGENT_MODEL_SETTINGS: dict[str, Callable[[], str]] = {
"classifier": lambda: settings.LLM_MODEL_CLASSIFIER or settings.LLM_MODEL,
"home-agent": lambda: settings.LLM_MODEL_HOME_AGENT or settings.LLM_MODEL,
"floating-agent": lambda: settings.LLM_MODEL_FLOATING_AGENT or settings.LLM_MODEL,
"unified-processor": lambda: settings.LLM_MODEL_UNIFIED_PROCESSOR or settings.LLM_MODEL,
"cloud-processor": lambda: settings.LLM_MODEL_CLOUD_PROCESSOR or settings.LLM_MODEL,
"setup": lambda: settings.LLM_MODEL_SETUP_AGENT or settings.LLM_MODEL,
}
def model_for_agent(agent_name: str) -> str:
"""Return the resolved model string for *agent_name* (for Langfuse tracking)."""
return _AGENT_MODEL_SETTINGS.get(agent_name, lambda: settings.LLM_MODEL)()
def get_agent_llm(
agent_name: str,
*, *,
temperature: float = 0, temperature: float = 0,
) -> ChatOpenAI | ChatLiteLLM: ) -> ChatOpenAI | ChatLiteLLM:
"""Return the lighter model used for intent classification / routing.""" """Return an LLM configured for *agent_name*, respecting per-agent overrides.
return get_llm(model=settings.LLM_ROUTER_MODEL, temperature=temperature)
Falls back to ``settings.LLM_MODEL`` for unknown agent names or when the
per-agent override is left empty in ``.env``.
"""
model = model_for_agent(agent_name)
return get_llm(model=model, temperature=temperature)
async def embed(text: str) -> list[float]: async def embed(text: str) -> list[float]:

View File

@@ -25,7 +25,7 @@ from __future__ import annotations
import logging import logging
import re import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timezone
from typing import Any from typing import Any
import httpx import httpx

View File

@@ -30,7 +30,7 @@ async def lifespan(app: FastAPI):
def create_app() -> FastAPI: def create_app() -> FastAPI:
app = FastAPI( app = FastAPI(
title="Adiuva Cloud API", title="AdiuvAI Cloud API",
version="0.1.0", version="0.1.0",
docs_url="/docs" if settings.ENV == "dev" else None, docs_url="/docs" if settings.ENV == "dev" else None,
redoc_url=None, redoc_url=None,

View File

@@ -69,7 +69,8 @@ class User(Base):
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True) email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
name: Mapped[str | None] = mapped_column(String(100), nullable=True) name: Mapped[str | None] = mapped_column(String(100), nullable=True)
surname: Mapped[str | None] = mapped_column(String(100), nullable=True) surname: Mapped[str | None] = mapped_column(String(100), nullable=True)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False) password_hash: Mapped[str | None] = mapped_column(String(255), nullable=True)
avatar_url: Mapped[str | None] = mapped_column(Text, nullable=True)
tier: Mapped[str] = mapped_column(TierEnum, nullable=False, default="free") tier: Mapped[str] = mapped_column(TierEnum, nullable=False, default="free")
stripe_customer_id: Mapped[str | None] = mapped_column(String(255), nullable=True) stripe_customer_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
# Per-user Fernet key (base64-urlsafe, 44 chars). Generated on registration. # Per-user Fernet key (base64-urlsafe, 44 chars). Generated on registration.
@@ -78,6 +79,9 @@ class User(Base):
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now() DateTime(timezone=True), nullable=False, server_default=func.now()
) )
onboarding_completed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, default=None
)
updated_at: Mapped[datetime] = mapped_column( updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
) )
@@ -88,6 +92,9 @@ class User(Base):
subscription: Mapped[Subscription | None] = relationship( subscription: Mapped[Subscription | None] = relationship(
back_populates="user", uselist=False, cascade="all, delete-orphan" back_populates="user", uselist=False, cascade="all, delete-orphan"
) )
oauth_accounts: Mapped[list[OAuthAccount]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
class RefreshToken(Base): class RefreshToken(Base):
@@ -108,6 +115,25 @@ class RefreshToken(Base):
user: Mapped[User] = relationship(back_populates="refresh_tokens") user: Mapped[User] = relationship(back_populates="refresh_tokens")
class OAuthAccount(Base):
__tablename__ = "oauth_accounts"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
provider: Mapped[str] = mapped_column(String(50), nullable=False)
provider_user_id: Mapped[str] = mapped_column(String(255), nullable=False)
provider_email: Mapped[str | None] = mapped_column(String(255), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
user: Mapped[User] = relationship(back_populates="oauth_accounts")
class Subscription(Base): class Subscription(Base):
__tablename__ = "subscriptions" __tablename__ = "subscriptions"

View File

@@ -30,6 +30,16 @@ class UserProfile(BaseModel):
name: str | None = None name: str | None = None
surname: str | None = None surname: str | None = None
tier: BillingTier tier: BillingTier
avatar_url: str | None = None
has_password: bool = True
onboarding_completed_at: int | None = None # epoch ms, null = not onboarded
memory: dict[str, str] = Field(default_factory=dict) # decrypted core memory k/v
class OAuthAccountInfo(BaseModel):
provider: str
provider_email: str | None = None
created_at: int # epoch ms
# ── Chat ───────────────────────────────────────────────────────────── # ── Chat ─────────────────────────────────────────────────────────────
@@ -236,10 +246,11 @@ class AgentTriggerRequest(BaseModel):
device_id: str = Field(default="") device_id: str = Field(default="")
agent_id: str | None = None # FE stable agent ID (electron-store UUID) agent_id: str | None = None # FE stable agent ID (electron-store UUID)
what_to_extract: list[str] = Field(min_length=1) what_to_extract: list[str] = Field(min_length=1)
actions_by_type: dict[str, list[str]] | None = None
batch_interval: str = Field(min_length=1) batch_interval: str = Field(min_length=1)
custom_agent_prompt: str = Field(min_length=1) custom_agent_prompt: str | None = None
agent_config: dict | None = None
active_agents: int = Field(ge=0, default=0) active_agents: int = Field(ge=0, default=0)
last_run_at: int | None = None # epoch ms from FE — enables incremental scanning
# ── Agent Run Log ───────────────────────────────────────────────────── # ── Agent Run Log ─────────────────────────────────────────────────────

View File

@@ -7,7 +7,7 @@ services:
- path: .env - path: .env
required: false required: false
environment: environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuva DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/adiuvai
GITHUB_COPILOT_TOKEN_DIR: /root/.config/litellm/github_copilot GITHUB_COPILOT_TOKEN_DIR: /root/.config/litellm/github_copilot
volumes: volumes:
- copilot_tokens:/root/.config/litellm/github_copilot - copilot_tokens:/root/.config/litellm/github_copilot
@@ -21,7 +21,7 @@ services:
environment: environment:
POSTGRES_USER: postgres POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres POSTGRES_PASSWORD: postgres
POSTGRES_DB: adiuva POSTGRES_DB: adiuvai
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
healthcheck: healthcheck:

View File

@@ -28,7 +28,6 @@ from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
import pytest_asyncio
from app.core.agent_runner import ( from app.core.agent_runner import (
_extract_items_from_content, _extract_items_from_content,
@@ -597,7 +596,7 @@ async def test_run_cloud_agent_provider_fetch_error():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_cloud_agent_refreshed_token_persisted(): async def test_run_cloud_agent_refreshed_token_persisted():
"""When the provider refreshes its token, the new ciphertext is written to DB.""" """When the provider refreshes its token, the new ciphertext is written to DB."""
from app.integrations import EmailMessage, encrypt_token from app.integrations import encrypt_token
from cryptography.fernet import Fernet as _Fernet from cryptography.fernet import Fernet as _Fernet
fernet_key = _Fernet.generate_key().decode() fernet_key = _Fernet.generate_key().decode()
@@ -791,7 +790,6 @@ async def test_trigger_run_local_agent_creates_run_log(client, db_session):
json={ json={
"directory": "/home/user/docs", "directory": "/home/user/docs",
"what_to_extract": ["task", "note"], "what_to_extract": ["task", "note"],
"actions_by_type": {"task": ["add", "update"], "note": ["add"]},
"batch_interval": "0 */6 * * *", "batch_interval": "0 */6 * * *",
"custom_agent_prompt": "Extract tasks and notes.", "custom_agent_prompt": "Extract tasks and notes.",
"active_agents": 0, "active_agents": 0,

View File

@@ -40,7 +40,6 @@ from app.core.agent_runner import (
_format_projects, _format_projects,
_get_extraction_rules, _get_extraction_rules,
_get_no_match_behavior, _get_no_match_behavior,
_is_overdue,
run_local_agent, run_local_agent,
) )
from app.core.device_manager import DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager

View File

@@ -21,7 +21,6 @@ import time
import uuid import uuid
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession

View File

@@ -1,4 +1,4 @@
"""Tests for auth routes: register, login, refresh, me. """Tests for auth routes: register, login, refresh, me, OAuth social login.
Exercises the full auth lifecycle through the FastAPI TestClient against the Exercises the full auth lifecycle through the FastAPI TestClient against the
in-memory SQLite test database seeded by ``conftest.py``. in-memory SQLite test database seeded by ``conftest.py``.
@@ -7,9 +7,11 @@ in-memory SQLite test database seeded by ``conftest.py``.
from __future__ import annotations from __future__ import annotations
import time import time
from unittest.mock import AsyncMock, patch
from jose import jwt from jose import jwt
from app.auth.oauth_providers import GoogleOAuthProvider, OAuthUserInfo
from app.config.settings import settings from app.config.settings import settings
from tests.conftest import auth_header, TEST_USER_IDS from tests.conftest import auth_header, TEST_USER_IDS
@@ -204,3 +206,153 @@ class TestMe:
token = jwt.encode(payload, "wrong-secret", algorithm="HS256") token = jwt.encode(payload, "wrong-secret", algorithm="HS256")
resp = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}) resp = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 401 assert resp.status_code == 401
# ── TestOAuth ─────────────────────────────────────────────────────────
class TestOAuth:
"""GET /auth/oauth/google/authorize and POST /auth/oauth/google/callback."""
FAKE_PROVIDER_USER_ID = "google-sub-12345"
FAKE_EMAIL = "oauth@example.com"
FAKE_AVATAR = "https://lh3.googleusercontent.com/photo.jpg"
def _patch_google(self, monkeypatch) -> None:
monkeypatch.setattr(settings, "GOOGLE_AUTH_CLIENT_ID", "fake-client-id")
monkeypatch.setattr(settings, "GOOGLE_AUTH_CLIENT_SECRET", "fake-client-secret")
def _userinfo(
self,
email: str | None = None,
email_verified: bool = True,
) -> OAuthUserInfo:
return OAuthUserInfo(
provider_user_id=self.FAKE_PROVIDER_USER_ID,
email=email or self.FAKE_EMAIL,
email_verified=email_verified,
avatar_url=self.FAKE_AVATAR,
name="OAuth User",
)
def _authorize(self, client) -> str:
"""Call /authorize and return the fresh state token."""
resp = client.get("/api/v1/auth/oauth/google/authorize")
assert resp.status_code == 200
return resp.json()["state"]
def _callback(self, client, state: str, userinfo: OAuthUserInfo):
"""POST /callback with mocked provider exchange_code + get_userinfo."""
with (
patch.object(
GoogleOAuthProvider,
"exchange_code",
new=AsyncMock(return_value={"access_token": "google-access-tok"}),
),
patch.object(
GoogleOAuthProvider,
"get_userinfo",
new=AsyncMock(return_value=userinfo),
),
):
return client.post(
"/api/v1/auth/oauth/google/callback",
json={"code": "auth-code", "state": state},
)
def _decode_sub(self, access_token: str) -> str:
return jwt.decode(
access_token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)["sub"]
# -- authorize --
def test_authorize_returns_url_and_state(self, client, monkeypatch) -> None:
self._patch_google(monkeypatch)
resp = client.get("/api/v1/auth/oauth/google/authorize")
assert resp.status_code == 200
data = resp.json()
assert "url" in data and "state" in data
assert "accounts.google.com" in data["url"]
assert len(data["state"]) > 0
def test_authorize_unconfigured_returns_503(self, client, monkeypatch) -> None:
monkeypatch.setattr(settings, "GOOGLE_AUTH_CLIENT_ID", "")
monkeypatch.setattr(settings, "GOOGLE_AUTH_CLIENT_SECRET", "")
resp = client.get("/api/v1/auth/oauth/google/authorize")
assert resp.status_code == 503
# -- callback --
def test_callback_state_mismatch_returns_401(self, client, monkeypatch) -> None:
self._patch_google(monkeypatch)
resp = client.post(
"/api/v1/auth/oauth/google/callback",
json={"code": "code", "state": "not-a-real-state"},
)
assert resp.status_code == 401
def test_callback_creates_new_user(self, client, monkeypatch) -> None:
"""First-time Google login creates a new user and returns valid tokens."""
self._patch_google(monkeypatch)
state = self._authorize(client)
resp = self._callback(client, state, self._userinfo())
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data and "refresh_token" in data
payload = jwt.decode(
data["access_token"], settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)
assert payload["email"] == self.FAKE_EMAIL
def test_callback_existing_oauth_link_logs_in(self, client, monkeypatch) -> None:
"""Second Google login with the same account re-uses the existing user."""
self._patch_google(monkeypatch)
userinfo = self._userinfo()
# First login — creates user + oauth_accounts row
resp1 = self._callback(client, self._authorize(client), userinfo)
assert resp1.status_code == 200
sub1 = self._decode_sub(resp1.json()["access_token"])
# Second login — finds existing oauth_accounts row → same user
resp2 = self._callback(client, self._authorize(client), userinfo)
assert resp2.status_code == 200
sub2 = self._decode_sub(resp2.json()["access_token"])
assert sub1 == sub2
def test_callback_email_match_links_account(self, client, monkeypatch) -> None:
"""Verified Google email matching an existing password user links the accounts."""
email = "link-target@example.com"
reg_resp = client.post(
"/api/v1/auth/register",
json={"email": email, "password": "TestPass123!"},
)
assert reg_resp.status_code == 201
orig_sub = self._decode_sub(reg_resp.json()["access_token"])
self._patch_google(monkeypatch)
state = self._authorize(client)
resp = self._callback(client, state, self._userinfo(email=email, email_verified=True))
assert resp.status_code == 200
oauth_sub = self._decode_sub(resp.json()["access_token"])
# OAuth login must resolve to the same user as the original registration
assert orig_sub == oauth_sub
def test_callback_unverified_email_conflict_returns_409(self, client, monkeypatch) -> None:
"""Unverified Google email matching an existing account returns 409, not 500."""
email = "conflict@example.com"
reg_resp = client.post(
"/api/v1/auth/register",
json={"email": email, "password": "TestPass123!"},
)
assert reg_resp.status_code == 201
self._patch_google(monkeypatch)
state = self._authorize(client)
resp = self._callback(client, state, self._userinfo(email=email, email_verified=False))
assert resp.status_code == 409

View File

@@ -18,13 +18,12 @@ from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
import pytest_asyncio
from app.core.device_manager import DeviceConnection, DeviceConnectionManager from app.core.device_manager import DeviceConnectionManager
from app.db import get_session from app.db import get_session
from app.main import app from app.main import app
from app.models import AgentRunLog from app.models import AgentRunLog
from tests.conftest import TEST_USER_IDS, auth_header, make_jwt from tests.conftest import TEST_USER_IDS, make_jwt
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers

View File

@@ -40,11 +40,9 @@ Coverage:
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import pytest import pytest

View File

@@ -19,7 +19,7 @@ import pytest_asyncio
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from sqlalchemy import select from sqlalchemy import select
from app.core.memory_middleware import MemoryMiddleware, _PROACTIVE_CONFIDENCE_THRESHOLD from app.core.memory_middleware import MemoryMiddleware
from app.db import get_session from app.db import get_session
from app.main import app from app.main import app
from app.models import ( from app.models import (

View File

@@ -7,10 +7,9 @@ column is stored as JSON in tests (SQLite-compatible).
from __future__ import annotations from __future__ import annotations
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime
import pytest import pytest
import pytest_asyncio
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from sqlalchemy import select from sqlalchemy import select

View File

@@ -12,7 +12,6 @@ from __future__ import annotations
import re import re
from pathlib import Path from pathlib import Path
import pytest
import yaml import yaml
from app.core.preprocessors import detect_content_type, preprocess from app.core.preprocessors import detect_content_type, preprocess