feat: microservices scaffold + Auth Service (Step 1)

- Add shared/ module: config, db, models, schemas, redis utilities
- Add Auth Service (services/auth/): register, login, refresh, me,
  ForwardAuth /verify endpoint for Traefik
- Add Traefik config: ACME/Cloudflare DNS-01, dynamic routing,
  ForwardAuth middleware, sticky sessions for WS Gateway
- Add service scaffolds: ws-gateway, chat, batch-agent, billing (READMEs)
- Add redis>=5.0.0 to requirements.txt
- Monolith app/ is untouched — strangler fig migration
This commit is contained in:
Roberto Musso
2026-03-22 00:29:51 +01:00
parent 552b8eb305
commit aa219a4d08
25 changed files with 1685 additions and 0 deletions

View File

@@ -32,4 +32,5 @@ google-auth-oauthlib>=1.2.0
google-auth-httplib2>=0.2.0
msal>=1.28.0
cryptography>=42.0.0
redis>=5.0.0
ruff>=0.8.0

36
services/auth/Dockerfile Normal file
View File

@@ -0,0 +1,36 @@
# ── builder ──────────────────────────────────────────────────────────────────
FROM python:3.12-slim AS builder
WORKDIR /build
# Install shared + service deps in one layer
COPY services/auth/requirements.txt ./requirements.txt
RUN pip install --upgrade pip && \
pip install --no-cache-dir --prefix=/install -r requirements.txt
# ── runtime ──────────────────────────────────────────────────────────────────
FROM python:3.12-slim AS runtime
RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser
WORKDIR /app
COPY --from=builder /install /usr/local
# Copy shared module (available to all services)
COPY shared/ shared/
# Copy service source
COPY services/auth/app/ app/
RUN chown -R appuser:appgroup /app
USER appuser
EXPOSE 8000
CMD ["gunicorn", "app.main:app", \
"-k", "uvicorn.workers.UvicornWorker", \
"--bind", "0.0.0.0:8000", \
"--workers", "2", \
"--timeout", "30"]

16
services/auth/README.md Normal file
View File

@@ -0,0 +1,16 @@
# Auth Service
Owns: user registration, login, JWT RS256 issuance, token refresh, `/me` endpoint.
## Tables owned
- `users`
- `refresh_tokens`
- `subscriptions` (read; Billing Service writes)
## Endpoints
- `POST /auth/register`
- `POST /auth/login`
- `POST /auth/refresh`
- `GET /auth/me`
- `PUT /auth/me`
- `GET /auth/verify` (ForwardAuth for Traefik)

View File

67
services/auth/app/deps.py Normal file
View File

@@ -0,0 +1,67 @@
"""Auth dependencies — JWT validation for the Auth Service.
This is the canonical get_current_user used by protected endpoints
within the Auth Service itself (/me, /me PUT).
"""
from __future__ import annotations
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.config import settings
from shared.db import get_session
from shared.models import Subscription, User
from shared.schemas import UserProfile
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_session),
) -> UserProfile:
"""Validate a Bearer JWT and return the authenticated user.
The JWT is used for identity and expiry. Tier is fetched live from the
subscriptions table so upgrades/downgrades take effect immediately.
"""
credentials_exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)
user_id: str | None = payload.get("sub")
email: str | None = payload.get("email")
if not user_id or not email:
raise credentials_exc
except JWTError:
raise credentials_exc
# Live tier lookup
result = await db.execute(
select(Subscription.tier).where(Subscription.user_id == user_id)
)
default_tier = "power" if settings.ENV == "dev" else "free"
tier: str = result.scalar_one_or_none() or default_tier
# Fetch name/surname
user_result = await db.execute(
select(User.name, User.surname).where(User.id == user_id)
)
user_row = user_result.one_or_none()
return UserProfile(
id=user_id,
email=email,
name=user_row.name if user_row else None,
surname=user_row.surname if user_row else None,
tier=tier,
) # type: ignore[arg-type]

53
services/auth/app/main.py Normal file
View File

@@ -0,0 +1,53 @@
"""Auth Service — JWT issuance, user management, ForwardAuth verification.
Standalone FastAPI service extracted from the adiuva-api monolith.
Owns: users, refresh_tokens, subscriptions (read).
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from shared.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
from shared.db import engine
await engine.dispose()
def create_app() -> FastAPI:
app = FastAPI(
title="Adiuva Auth Service",
version="0.1.0",
docs_url="/docs" if settings.ENV == "dev" else None,
redoc_url=None,
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from app.routes import router
from app.verify import router as verify_router
app.include_router(router, prefix="/api/v1")
app.include_router(verify_router, prefix="/api/v1")
@app.get("/api/v1/health", tags=["health"])
async def health() -> dict:
return {"status": "ok", "service": "auth", "version": app.version}
return app
app = create_app()

248
services/auth/app/routes.py Normal file
View File

@@ -0,0 +1,248 @@
"""Auth routes: register, login, refresh, me.
Extracted from app/api/routes/auth.py — uses shared.* imports instead of app.*.
"""
from __future__ import annotations
import hashlib
import time
import uuid
from datetime import datetime, timedelta, timezone
import bcrypt
from cryptography.fernet import Fernet
from fastapi import APIRouter, Depends, HTTPException, status
from jose import jwt
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.config import settings
from shared.db import get_session
from shared.models import RefreshToken, Subscription, User
from shared.schemas import AuthTokens, UserProfile
from app.deps import get_current_user
router = APIRouter(prefix="/auth", tags=["auth"])
# ── Internal helpers ─────────────────────────────────────────────────
def _hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def _verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
def _hash_token(plain_token: str) -> str:
"""SHA-256 of the plain refresh token string."""
return hashlib.sha256(plain_token.encode()).hexdigest()
def _make_access_token(user_id: str, email: str, tier: str) -> tuple[str, int]:
"""Return (signed JWT, expires_at_ms)."""
now = int(time.time())
exp = now + settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60
payload = {
"sub": user_id,
"email": email,
"tier": tier,
"exp": exp,
"iat": now,
}
token = jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return token, exp * 1000 # ms for client
async def _get_live_tier(db: AsyncSession, user_id: str) -> str:
"""Fetch authoritative tier from subscriptions table."""
result = await db.execute(
select(Subscription.tier).where(Subscription.user_id == user_id)
)
default_tier = "power" if settings.ENV == "dev" else "free"
return result.scalar_one_or_none() or default_tier
# ── Request bodies ────────────────────────────────────────────────────
class _RegisterRequest(BaseModel):
email: str
password: str
name: str | None = None
surname: str | None = None
class _LoginRequest(BaseModel):
email: str
password: str
class _RefreshRequest(BaseModel):
refresh_token: str
class _UpdateProfileRequest(BaseModel):
name: str | None = None
surname: str | None = None
# ── Routes ────────────────────────────────────────────────────────────
@router.post("/register", response_model=AuthTokens, status_code=status.HTTP_201_CREATED)
async def register(
body: _RegisterRequest,
db: AsyncSession = Depends(get_session),
) -> AuthTokens:
"""Create a new account and return JWT tokens."""
existing = await db.execute(select(User).where(User.email == body.email))
if existing.scalar_one_or_none() is not None:
raise HTTPException(status.HTTP_409_CONFLICT, "Email already registered")
user = User(
id=str(uuid.uuid4()),
email=body.email,
name=body.name,
surname=body.surname,
password_hash=_hash_password(body.password),
tier="free",
encryption_key=Fernet.generate_key().decode(),
)
db.add(user)
await db.flush()
plain_token = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(
days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
rt = RefreshToken(
user_id=user.id,
token_hash=_hash_token(plain_token),
expires_at=expires_at,
)
db.add(rt)
await db.commit()
access_token, expires_at_ms = _make_access_token(user.id, user.email, user.tier)
return AuthTokens(
access_token=access_token,
refresh_token=plain_token,
expires_at=expires_at_ms,
)
@router.post("/login", response_model=AuthTokens)
async def login(
body: _LoginRequest,
db: AsyncSession = Depends(get_session),
) -> AuthTokens:
"""Validate credentials and return JWT tokens."""
result = await db.execute(select(User).where(User.email == body.email))
user = result.scalar_one_or_none()
if user is None or not _verify_password(body.password, user.password_hash):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
# Fetch live tier for the JWT claim
tier = await _get_live_tier(db, user.id)
plain_token = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(
days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
rt = RefreshToken(
user_id=user.id,
token_hash=_hash_token(plain_token),
expires_at=expires_at,
)
db.add(rt)
await db.commit()
access_token, expires_at_ms = _make_access_token(user.id, user.email, tier)
return AuthTokens(
access_token=access_token,
refresh_token=plain_token,
expires_at=expires_at_ms,
)
@router.post("/refresh", response_model=AuthTokens)
async def refresh(
body: _RefreshRequest,
db: AsyncSession = Depends(get_session),
) -> AuthTokens:
"""Rotate a refresh token and return a new token pair."""
token_hash = _hash_token(body.refresh_token)
result = await db.execute(
select(RefreshToken).where(RefreshToken.token_hash == token_hash)
)
rt = result.scalar_one_or_none()
now = datetime.now(timezone.utc)
if rt is None or rt.expires_at.replace(tzinfo=timezone.utc) < now:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid or expired refresh token")
await db.delete(rt)
user_result = await db.execute(select(User).where(User.id == rt.user_id))
user = user_result.scalar_one_or_none()
if user is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
# Fetch live tier for the new JWT
tier = await _get_live_tier(db, user.id)
plain_token = str(uuid.uuid4())
new_expires = now + timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS)
new_rt = RefreshToken(
user_id=user.id,
token_hash=_hash_token(plain_token),
expires_at=new_expires,
)
db.add(new_rt)
await db.commit()
access_token, expires_at_ms = _make_access_token(user.id, user.email, tier)
return AuthTokens(
access_token=access_token,
refresh_token=plain_token,
expires_at=expires_at_ms,
)
@router.get("/me", response_model=UserProfile)
async def me(current_user: UserProfile = Depends(get_current_user)) -> UserProfile:
"""Return the profile for the authenticated user."""
return current_user
@router.put("/me", response_model=UserProfile)
async def update_profile(
body: _UpdateProfileRequest,
current_user: UserProfile = Depends(get_current_user),
db: AsyncSession = Depends(get_session),
) -> UserProfile:
"""Update the authenticated user's name and surname."""
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalar_one()
if body.name is not None:
user.name = body.name
if body.surname is not None:
user.surname = body.surname
await db.commit()
await db.refresh(user)
return UserProfile(
id=user.id,
email=user.email,
name=user.name,
surname=user.surname,
tier=current_user.tier,
)

View File

@@ -0,0 +1,64 @@
"""ForwardAuth verification endpoint for Traefik.
Traefik calls GET /api/v1/auth/verify on every request to a protected
service. This endpoint validates the JWT from the Authorization header
and returns identity headers that Traefik injects into downstream requests.
Downstream services NEVER validate JWTs themselves — they trust the
X-User-Id, X-User-Email, X-User-Tier headers injected by Traefik.
"""
from __future__ import annotations
from fastapi import APIRouter, Request, Response
from fastapi import status as http_status
from jose import JWTError, jwt
from sqlalchemy import select
from shared.config import settings
from shared.db import async_session
from shared.models import Subscription
router = APIRouter(tags=["auth"])
@router.get("/auth/verify")
async def verify(request: Request) -> Response:
"""Validate JWT and return identity headers for Traefik ForwardAuth.
Returns 200 with X-User-* headers on success, 401 on failure.
Traefik copies response headers to the downstream request.
"""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return Response(status_code=http_status.HTTP_401_UNAUTHORIZED)
token = auth_header[7:] # strip "Bearer "
try:
payload = jwt.decode(
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)
user_id: str | None = payload.get("sub")
email: str | None = payload.get("email")
if not user_id or not email:
return Response(status_code=http_status.HTTP_401_UNAUTHORIZED)
except JWTError:
return Response(status_code=http_status.HTTP_401_UNAUTHORIZED)
# Live tier lookup from subscriptions table
async with async_session() as db:
result = await db.execute(
select(Subscription.tier).where(Subscription.user_id == user_id)
)
default_tier = "power" if settings.ENV == "dev" else "free"
tier: str = result.scalar_one_or_none() or default_tier
return Response(
status_code=http_status.HTTP_200_OK,
headers={
"X-User-Id": user_id,
"X-User-Email": email,
"X-User-Tier": tier,
},
)

View File

@@ -0,0 +1,11 @@
fastapi>=0.115.0
uvicorn[standard]>=0.34.0
gunicorn>=22.0.0
pydantic>=2.10.0
pydantic-settings>=2.7.0
python-jose[cryptography]>=3.3.0
sqlalchemy>=2.0.0
asyncpg>=0.30.0
bcrypt>=4.2.0
cryptography>=42.0.0
python-dotenv>=1.0.0

View File

@@ -0,0 +1,20 @@
# Batch Agent Service
Owns: agent_runner, journey builder, filesystem_agent, integrations (Gmail, MS Graph).
## Tables owned
- `local_agent_configs`
- `cloud_agent_configs`
- `agent_run_logs`
## Endpoints
- `GET /agents/catalog`
- `POST /agents/can-create`
- `POST /agents/trigger`
- `GET /agents/{id}/history`
## Redis channels
- Subscribe: `batch:request:{user_id}`
- Publish: `ws:out:{user_id}` (journey replies + tool calls)
- BRPOP: `tool:result:{call_id}` (30s timeout)
- SET+EX: `journey:{user_id}` (session state, TTL 1800s)

View File

View File

@@ -0,0 +1,15 @@
# Billing Service
Owns: Stripe integration, tier management, subscription CRUD.
## Tables owned (write)
- `subscriptions`
## Endpoints
- `POST /billing/checkout`
- `POST /billing/webhook` (Stripe, no JWT auth)
- `GET /billing/subscription`
- `DELETE /billing/subscription`
## Redis channels
- Publish: `tier:changed:{user_id}` on tier change

View File

21
services/chat/README.md Normal file
View File

@@ -0,0 +1,21 @@
# Chat Service
Owns: deep_agent (home + floating chat), memory middleware, domain agents
(task, note, project, timeline), LLM orchestration.
## Tables owned
- `memory_core`
- `memory_associative`
- `memory_episodic`
- `memory_proactive`
## Tables read (cross-service)
- `users` (for encryption_key — memory decryption)
## Endpoints
- `POST /chat` (REST fallback)
## Redis channels
- Subscribe: `chat:request:{user_id}`
- Publish: `ws:out:{user_id}` (stream frames + tool calls)
- BRPOP: `tool:result:{call_id}` (30s timeout)

View File

View File

@@ -0,0 +1,17 @@
# WS Gateway
Stateless WebSocket proxy. Accepts Electron connections, authenticates JWT,
routes frames to Chat/Batch services via Redis pub/sub.
## No business logic
This service does NOT know what tasks, notes, or agents are.
It only routes JSON frames between Electron and downstream services.
## Scaling
Sticky sessions on `user_id` (Traefik consistent hashing).
## Redis channels used
- Subscribe: `ws:out:{user_id}` (frames to send to client)
- Publish: `chat:request:{user_id}`, `batch:request:{user_id}`
- LPUSH: `tool:result:{call_id}` (from client tool_result frames)
- HSET/HDEL: `ws:devices:{user_id}` (device registry)

View File

5
shared/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Shared module — imported by all microservices.
Contains DB engine/session, ORM models, Pydantic schemas, config,
and Redis utilities. Changes here affect ALL services.
"""

72
shared/config.py Normal file
View File

@@ -0,0 +1,72 @@
"""Shared configuration — Pydantic Settings loaded from environment.
All services import ``settings`` from here. Each service only uses a subset
of the vars, but keeping one Settings class avoids fragmentation.
"""
from typing import Literal
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
# ── Database ─────────────────────────────────────────────────────
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/adiuva"
# ── JWT (Auth Service owns the secret; others only need it for
# local dev without Traefik ForwardAuth) ───────────────────────
JWT_SECRET: str = "change-me-in-production"
JWT_ALGORITHM: str = "HS256"
JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
JWT_REFRESH_TOKEN_EXPIRE_DAYS: int = 30
# ── Redis ────────────────────────────────────────────────────────
REDIS_URL: str = "redis://localhost:6379/0"
# ── Stripe ───────────────────────────────────────────────────────
STRIPE_SECRET_KEY: str = ""
STRIPE_WEBHOOK_SECRET: str = ""
# ── S3 ───────────────────────────────────────────────────────────
S3_BUCKET: str = ""
S3_REGION: str = "us-east-1"
S3_ENDPOINT_URL: str = ""
AWS_ACCESS_KEY_ID: str = ""
AWS_SECRET_ACCESS_KEY: str = ""
# ── Vector stores ────────────────────────────────────────────────
PINECONE_API_KEY: str = ""
PINECONE_INDEX: str = "adiuva"
QDRANT_URL: str = ""
QDRANT_API_KEY: str = ""
# ── LLM providers ────────────────────────────────────────────────
OPENAI_API_KEY: str = ""
ANTHROPIC_API_KEY: str = ""
GOOGLE_API_KEY: str = ""
CEREBRAS_API_KEY: str = ""
LLM_MODEL: str = "gpt-4o"
LLM_ROUTER_MODEL: str = "gpt-4o-mini"
LLM_EMBED_MODEL: str = "text-embedding-3-small"
GITHUB_COPILOT_TOKEN_DIR: str = ""
# ── OAuth (integrations) ─────────────────────────────────────────
GMAIL_CLIENT_ID: str = ""
GMAIL_CLIENT_SECRET: str = ""
MS_CLIENT_ID: str = ""
MS_CLIENT_SECRET: str = ""
MS_TENANT_ID: str = "common"
OAUTH_ENCRYPTION_KEY: str = ""
# ── CORS ─────────────────────────────────────────────────────────
CORS_ORIGINS: list[str] = ["app://.", "http://localhost:3000", "http://localhost:5173"]
# ── Environment ──────────────────────────────────────────────────
ENV: Literal["dev", "prod"] = "dev"
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
settings = Settings()

32
shared/db.py Normal file
View File

@@ -0,0 +1,32 @@
"""Database engine, session factory, and declarative base.
All services use the async SQLAlchemy API via ``get_session()``.
Alembic migrations use the synchronous psycopg2 URL (see alembic/env.py).
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from shared.config import settings
engine = create_async_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
echo=False,
)
async_session = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
"""Shared declarative base for all ORM models."""
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""FastAPI dependency that yields an async DB session per request."""
async with async_session() as session:
yield session

455
shared/models.py Normal file
View File

@@ -0,0 +1,455 @@
"""SQLAlchemy ORM models for all persistent tables.
Centralized here so that Alembic migrations and all services share
the same model definitions. Each service only queries the tables it owns.
Ownership:
Auth Service → users, refresh_tokens, subscriptions
Chat Service → memory_core, memory_associative, memory_episodic, memory_proactive
Batch Agent → local_agent_configs, cloud_agent_configs, agent_run_logs
Billing Service → subscriptions (shared write with Auth)
(excluded MVP) → storage_records, backup_metadata, plugins, plugin_*, revenue_events
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import (
BigInteger,
Boolean,
DateTime,
Enum,
Float,
ForeignKey,
Integer,
JSON,
String,
Text,
UniqueConstraint,
Uuid,
func,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from shared.db import Base
# ── Helpers ──────────────────────────────────────────────────────────────
def _uuid() -> str:
return str(uuid.uuid4())
def _now() -> datetime:
return datetime.now(timezone.utc)
# ── Enum types ────────────────────────────────────────────────────────────
TierEnum = Enum("free", "pro", "power", "team", name="billing_tier")
PluginStatusEnum = Enum("pending_review", "approved", "rejected", name="plugin_status")
ReviewDecisionEnum = Enum("approved", "rejected", name="review_decision")
AgentTypeEnum = Enum("local", "cloud", name="agent_type")
AgentStatusEnum = Enum("running", "success", "error", "partial", name="agent_run_status")
CloudProviderEnum = Enum("gmail", "teams", "outlook", name="cloud_provider")
# ── Auth models ───────────────────────────────────────────────────────────
class User(Base):
__tablename__ = "users"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
name: Mapped[str | None] = mapped_column(String(100), nullable=True)
surname: Mapped[str | None] = mapped_column(String(100), nullable=True)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
tier: Mapped[str] = mapped_column(TierEnum, nullable=False, default="free")
stripe_customer_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
encryption_key: Mapped[str | None] = mapped_column(String(64), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
refresh_tokens: Mapped[list[RefreshToken]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
subscription: Mapped[Subscription | None] = relationship(
back_populates="user", uselist=False, cascade="all, delete-orphan"
)
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
token_hash: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
user: Mapped[User] = relationship(back_populates="refresh_tokens")
class Subscription(Base):
__tablename__ = "subscriptions"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
nullable=False, unique=True, index=True
)
stripe_subscription_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
tier: Mapped[str] = mapped_column(TierEnum, nullable=False, default="free")
status: Mapped[str] = mapped_column(String(50), nullable=False, default="free")
current_period_end: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
user: Mapped[User] = relationship(back_populates="subscription")
# ── Storage models (excluded from MVP, kept for Alembic) ──────────────
class StorageRecord(Base):
__tablename__ = "storage_records"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
table_name: Mapped[str] = mapped_column(String(100), nullable=False)
s3_key: Mapped[str] = mapped_column(String(500), nullable=False)
checksum: Mapped[str] = mapped_column(String(64), nullable=False)
size_bytes: Mapped[int] = mapped_column(Integer, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
class BackupMetadata(Base):
__tablename__ = "backup_metadata"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
s3_key: Mapped[str] = mapped_column(String(500), nullable=False)
version: Mapped[int] = mapped_column(Integer, nullable=False)
timestamp: Mapped[int] = mapped_column(BigInteger, nullable=False)
checksum: Mapped[str] = mapped_column(String(64), nullable=False)
size_bytes: Mapped[int] = mapped_column(Integer, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
# ── Plugin models (excluded from MVP, kept for Alembic) ───────────────
class Plugin(Base):
__tablename__ = "plugins"
id: Mapped[str] = mapped_column(String(255), primary_key=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False, default="")
version: Mapped[str] = mapped_column(String(50), nullable=False, default="1.0.0")
author_id: Mapped[str | None] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
author_name: Mapped[str] = mapped_column(String(255), nullable=False, default="")
category: Mapped[str] = mapped_column(String(100), nullable=False, default="")
price_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
permissions: Mapped[str] = mapped_column(Text, nullable=False, default="[]")
status: Mapped[str] = mapped_column(PluginStatusEnum, nullable=False, default="pending_review")
s3_package_key: Mapped[str | None] = mapped_column(String(500), nullable=True)
install_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
avg_rating: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
rejection_reason: Mapped[str | None] = mapped_column(Text, nullable=True)
submitted_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
installations: Mapped[list[PluginInstallation]] = relationship(
back_populates="plugin", cascade="all, delete-orphan"
)
reviews: Mapped[list[PluginReview]] = relationship(
back_populates="plugin", cascade="all, delete-orphan"
)
revenue_events: Mapped[list[RevenueEvent]] = relationship(
back_populates="plugin", cascade="all, delete-orphan"
)
class PluginInstallation(Base):
__tablename__ = "plugin_installations"
__table_args__ = (UniqueConstraint("plugin_id", "user_id", name="uq_plugin_user"),)
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
plugin_id: Mapped[str] = mapped_column(
String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
installed_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
plugin: Mapped[Plugin] = relationship(back_populates="installations")
class PluginReview(Base):
__tablename__ = "plugin_reviews"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
plugin_id: Mapped[str] = mapped_column(
String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True
)
reviewer_id: Mapped[str | None] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
decision: Mapped[str] = mapped_column(ReviewDecisionEnum, nullable=False)
notes: Mapped[str | None] = mapped_column(Text, nullable=True)
reviewed_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
plugin: Mapped[Plugin] = relationship(back_populates="reviews")
class RevenueEvent(Base):
__tablename__ = "revenue_events"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
plugin_id: Mapped[str] = mapped_column(
String(255), ForeignKey("plugins.id", ondelete="CASCADE"), nullable=False, index=True
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
amount_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
developer_share_cents: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
stripe_transfer_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
paid_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
plugin: Mapped[Plugin] = relationship(back_populates="revenue_events")
# ── Agent models ──────────────────────────────────────────────────────────
class LocalAgentConfig(Base):
__tablename__ = "local_agent_configs"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
device_id: Mapped[str] = mapped_column(String(255), nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
directory_paths: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
file_extensions: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *")
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
run_logs: Mapped[list[AgentRunLog]] = relationship(
back_populates="local_agent",
primaryjoin="and_(AgentRunLog.agent_id == LocalAgentConfig.id, AgentRunLog.agent_type == 'local')",
foreign_keys="AgentRunLog.agent_id",
cascade="all, delete-orphan",
overlaps="run_logs,cloud_agent",
)
class CloudAgentConfig(Base):
__tablename__ = "cloud_agent_configs"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
provider: Mapped[str] = mapped_column(CloudProviderEnum, nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
data_types: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
prompt_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
oauth_token_encrypted: Mapped[str | None] = mapped_column(Text, nullable=True)
filter_config: Mapped[dict | None] = mapped_column(JSON, nullable=True)
schedule_cron: Mapped[str] = mapped_column(String(100), nullable=False, default="0 */6 * * *")
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
run_logs: Mapped[list[AgentRunLog]] = relationship(
back_populates="cloud_agent",
primaryjoin="and_(AgentRunLog.agent_id == CloudAgentConfig.id, AgentRunLog.agent_type == 'cloud')",
foreign_keys="AgentRunLog.agent_id",
cascade="all, delete-orphan",
overlaps="run_logs,local_agent",
)
class AgentRunLog(Base):
__tablename__ = "agent_run_logs"
id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), primary_key=True, default=_uuid
)
agent_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
agent_type: Mapped[str] = mapped_column(AgentTypeEnum, nullable=False)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
status: Mapped[str] = mapped_column(AgentStatusEnum, nullable=False, default="running")
items_processed: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
items_created: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
errors: Mapped[list | None] = mapped_column(JSON, nullable=True)
started_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
local_agent: Mapped[LocalAgentConfig | None] = relationship(
back_populates="run_logs",
primaryjoin="and_(AgentRunLog.agent_id == LocalAgentConfig.id, AgentRunLog.agent_type == 'local')",
foreign_keys="AgentRunLog.agent_id",
overlaps="run_logs,cloud_agent",
)
cloud_agent: Mapped[CloudAgentConfig | None] = relationship(
back_populates="run_logs",
primaryjoin="and_(AgentRunLog.agent_id == CloudAgentConfig.id, AgentRunLog.agent_type == 'cloud')",
foreign_keys="AgentRunLog.agent_id",
overlaps="run_logs,local_agent",
)
# ── Memory models ─────────────────────────────────────────────────────────
class MemoryCore(Base):
"""Per-user persistent key/value preferences, encrypted at rest."""
__tablename__ = "memory_core"
id: Mapped[str] = mapped_column(Uuid(as_uuid=False), primary_key=True, default=_uuid)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
nullable=False, index=True,
)
key: Mapped[str] = mapped_column(String(255), nullable=False)
value_encrypted: Mapped[str] = mapped_column(Text, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
class MemoryAssociative(Base):
"""Per-user semantic memory: encrypted content + pgvector embedding."""
__tablename__ = "memory_associative"
id: Mapped[str] = mapped_column(Uuid(as_uuid=False), primary_key=True, default=_uuid)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
nullable=False, index=True,
)
content_encrypted: Mapped[str] = mapped_column(Text, nullable=False)
embedding: Mapped[list | None] = mapped_column(JSON, nullable=True)
entity_type: Mapped[str | None] = mapped_column(String(100), nullable=True)
entity_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
class MemoryEpisodic(Base):
"""Per-user session summaries, encrypted at rest."""
__tablename__ = "memory_episodic"
id: Mapped[str] = mapped_column(Uuid(as_uuid=False), primary_key=True, default=_uuid)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
nullable=False, index=True,
)
summary_encrypted: Mapped[str] = mapped_column(Text, nullable=False)
session_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
class MemoryProactive(Base):
"""Per-user inferred behavioral patterns, encrypted at rest."""
__tablename__ = "memory_proactive"
id: Mapped[str] = mapped_column(Uuid(as_uuid=False), primary_key=True, default=_uuid)
user_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
nullable=False, index=True,
)
pattern_encrypted: Mapped[str] = mapped_column(Text, nullable=False)
confidence: Mapped[float] = mapped_column(Float, nullable=False, default=0.5)
source: Mapped[str] = mapped_column(String(50), nullable=False, default="inferred")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)

53
shared/redis.py Normal file
View File

@@ -0,0 +1,53 @@
"""Redis client and pub/sub utilities for inter-service communication.
All services that need Redis import from here.
"""
from __future__ import annotations
import redis.asyncio as aioredis
from shared.config import settings
redis_client: aioredis.Redis = aioredis.from_url(
settings.REDIS_URL,
decode_responses=True,
)
# ── Channel naming conventions ────────────────────────────────────────
# See /memories/repo/microservices-architecture.md for full list.
def ws_out_channel(user_id: str) -> str:
"""Frames to forward to Electron via WS Gateway."""
return f"ws:out:{user_id}"
def chat_request_channel(user_id: str) -> str:
"""Chat requests (home + floating) from WS Gateway → Chat Service."""
return f"chat:request:{user_id}"
def batch_request_channel(user_id: str) -> str:
"""Batch requests (journey + triggers) from WS Gateway → Batch Agent."""
return f"batch:request:{user_id}"
def tool_result_key(call_id: str) -> str:
"""Tool result list: LPUSH by WS Gateway, BRPOP by Chat/Batch."""
return f"tool:result:{call_id}"
def device_key(user_id: str) -> str:
"""Device registry hash."""
return f"ws:devices:{user_id}"
def tier_changed_channel(user_id: str) -> str:
"""Billing tier change notifications."""
return f"tier:changed:{user_id}"
def journey_session_key(user_id: str) -> str:
"""Journey builder session (String + TTL 1800s)."""
return f"journey:{user_id}"

317
shared/schemas.py Normal file
View File

@@ -0,0 +1,317 @@
"""Pydantic schemas — API request/response contracts.
Shared across all services. Mirrors the TypeScript types from
the Electron app (src/shared/api-types.ts).
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel, Field
# ── Billing ──────────────────────────────────────────────────────────
BillingTier = Literal["free", "pro", "power", "team"]
# ── Auth ─────────────────────────────────────────────────────────────
class AuthTokens(BaseModel):
access_token: str
refresh_token: str
expires_at: int
class UserProfile(BaseModel):
id: str
email: str
name: str | None = None
surname: str | None = None
tier: BillingTier
# ── Chat ─────────────────────────────────────────────────────────────
class ChatContext(BaseModel):
user_profile: dict[str, Any] = Field(default_factory=dict)
relevant_documents: list[str] = Field(default_factory=list)
recent_tasks: list[dict[str, Any]] = Field(default_factory=list)
conversation_history: list[dict[str, Any]] = Field(default_factory=list)
class ChatRequest(BaseModel):
message: str
context: ChatContext = Field(default_factory=ChatContext)
class ChatResponse(BaseModel):
response: str
# ── Backup ───────────────────────────────────────────────────────────
class BackupMetadata(BaseModel):
version: int
timestamp: int
checksum: str
chunk_count: int
# ── Cloud Storage (E2E encrypted blobs) ──────────────────────────────
class StorageRecord(BaseModel):
id: str
user_id: str
table: str
blob: bytes
checksum: str
created_at: int
updated_at: int
class StorageRecordCreate(BaseModel):
table: str
blob: bytes
checksum: str
class StorageRecordUpdate(BaseModel):
blob: bytes
checksum: str
# ── Cloud Vector Store (E2E encrypted vectors) ────────────────────────
class VectorItem(BaseModel):
id: str
blob: bytes
checksum: str
class VectorUpsertRequest(BaseModel):
vectors: list[VectorItem]
class VectorSearchRequest(BaseModel):
query_blob: bytes
top_k: int = 10
class VectorSearchResult(BaseModel):
id: str
score: float
blob: bytes
class VectorSearchResponse(BaseModel):
results: list[VectorSearchResult]
# ── Plugin Marketplace ────────────────────────────────────────────────
class PluginManifest(BaseModel):
id: str
name: str
description: str
version: str
author: str
permissions: list[str]
category: str
price_cents: int = 0
class PluginListResponse(BaseModel):
plugins: list[PluginManifest]
total: int
page: int
class PluginInstallRequest(BaseModel):
plugin_id: str
# ── WebSocket Frame Protocol ──────────────────────────────────────────
class WsFrameType(str, Enum):
# ── v2 frame types (kept for backward compat) ──────────────────────
chat_request = "chat_request"
text_chunk = "text_chunk"
tool_call = "tool_call"
tool_result = "tool_result"
final = "final"
ping = "ping"
device_hello = "device_hello"
# ── v3 frame types ─────────────────────────────────────────────────
home_request = "home_request"
floating_request = "floating_request"
stream_start = "stream_start"
stream_text = "stream_text"
stream_end = "stream_end"
floating_domain = "floating_domain"
data_request = "data_request"
data_response = "data_response"
mutation = "mutation"
# ── v4 journey frame types ────────────────────────────────────────
journey_start = "journey_start"
journey_message = "journey_message"
journey_reply = "journey_reply"
class WsToolCall(BaseModel):
"""Server → Client: requests a CRUD/vector operation on the local DB."""
type: Literal[WsFrameType.tool_call] = WsFrameType.tool_call
id: str
action: str
table: str | None = None
data: dict[str, Any] | None = None
filters: dict[str, Any] | None = None
vector: list[float] | None = None
limit: int | None = None
class WsToolResult(BaseModel):
"""Client → Server: result of a CRUD/vector operation."""
type: Literal[WsFrameType.tool_result] = WsFrameType.tool_result
id: str
row: dict[str, Any] | None = None
rows: list[dict[str, Any]] | None = None
results: list[dict[str, Any]] | None = None
deleted: bool | None = None
ok: bool | None = None
error: str | None = None
class WsTextChunk(BaseModel):
"""Server → Client: incremental LLM response text."""
type: Literal[WsFrameType.text_chunk] = WsFrameType.text_chunk
text: str
class WsFinal(BaseModel):
"""Server → Client: signals end of response with the complete text."""
type: Literal[WsFrameType.final] = WsFrameType.final
response: str
# ── WebSocket Agent Frame Protocol ────────────────────────────────────
class WsDeviceHello(BaseModel):
"""Client → Server: device identification on WS connect."""
type: Literal[WsFrameType.device_hello] = WsFrameType.device_hello
device_id: str
agent_ids: list[str] = Field(default_factory=list)
# ── WebSocket v3 Frame Models ─────────────────────────────────────────
class WsFloatingScope(BaseModel):
"""Scope for a floating request."""
type: Literal["task", "project", "note", "timeline"]
id: str | None = None
class WsHomeRequest(BaseModel):
"""Client → Server: Home chat message."""
type: Literal[WsFrameType.home_request] = WsFrameType.home_request
message: str
conversation_history: list[dict[str, Any]] = Field(default_factory=list)
class WsFloatingRequest(BaseModel):
"""Client → Server: Floating chat message scoped to an entity."""
type: Literal[WsFrameType.floating_request] = WsFrameType.floating_request
message: str
scope: WsFloatingScope
class WsStreamStart(BaseModel):
"""Server → Client: signals start of a streaming response."""
type: Literal[WsFrameType.stream_start] = WsFrameType.stream_start
request_id: str
class WsStreamText(BaseModel):
"""Server → Client: streamed text token."""
type: Literal[WsFrameType.stream_text] = WsFrameType.stream_text
request_id: str
chunk: str
class WsStreamEnd(BaseModel):
"""Server → Client: signals end of a streaming response."""
type: Literal[WsFrameType.stream_end] = WsFrameType.stream_end
request_id: str
class WsDomain(BaseModel):
"""Structured floating domain payload for UI routing decisions."""
type: Literal["task", "timeline", "project", "node"]
id: str | None = None
section: Literal["task", "timeline", "note"] | None = None
class WsFloatingDomain(BaseModel):
"""Server → Client: domain determined for a floating request."""
type: Literal[WsFrameType.floating_domain] = WsFrameType.floating_domain
request_id: str
domain: WsDomain
# ── Agent Catalog ─────────────────────────────────────────────────────
class AgentCatalogItem(BaseModel):
type: str
name: str
description: str
class AgentCreationCheckRequest(BaseModel):
active_agents: int = Field(ge=0, default=0)
class AgentCreationCheckResponse(BaseModel):
allowed: bool
tier: BillingTier
active_agents: int
limit: int
class AgentTriggerRequest(BaseModel):
directory: str = Field(min_length=1)
device_id: str = Field(default="")
agent_id: str | None = None
what_to_extract: list[str] = Field(min_length=1)
actions_by_type: dict[str, list[str]] | None = None
batch_interval: str = Field(min_length=1)
custom_agent_prompt: str = Field(min_length=1)
active_agents: int = Field(ge=0, default=0)
# ── Agent Run Log ─────────────────────────────────────────────────────
class AgentRunLogResponse(BaseModel):
id: str
agent_id: str
agent_type: Literal["local", "cloud"]
status: Literal["running", "success", "error", "partial"]
items_processed: int
items_created: int
errors: list[str]
started_at: int
completed_at: int | None

143
traefik/dynamic/routers.yml Normal file
View File

@@ -0,0 +1,143 @@
# Dynamic routing configuration
http:
middlewares:
# ForwardAuth: validates JWT via Auth Service, injects identity headers
auth-forward:
forwardAuth:
address: "http://auth:8000/api/v1/auth/verify"
trustForwardHeader: true
authResponseHeaders:
- "X-User-Id"
- "X-User-Email"
- "X-User-Tier"
# Rate limiting (basic — per-client IP; upgrade to per-tier later)
rate-limit:
rateLimit:
average: 60
burst: 20
period: "1m"
# Strip /api/v1 prefix before forwarding to services
strip-api-prefix:
stripPrefix:
prefixes:
- "/api/v1"
routers:
# ── Auth (no ForwardAuth on public endpoints) ──────────────
auth-public:
rule: "PathPrefix(`/api/v1/auth/register`) || PathPrefix(`/api/v1/auth/login`) || PathPrefix(`/api/v1/auth/refresh`)"
entryPoints:
- websecure
middlewares:
- rate-limit
- strip-api-prefix
service: auth-svc
tls: {}
auth-protected:
rule: "PathPrefix(`/api/v1/auth`)"
entryPoints:
- websecure
middlewares:
- auth-forward
- rate-limit
- strip-api-prefix
service: auth-svc
tls: {}
# ── WebSocket Gateway (sticky sessions) ────────────────────
ws-gateway:
rule: "PathPrefix(`/api/v1/ws`)"
entryPoints:
- websecure
middlewares:
- rate-limit
service: ws-gateway-svc
tls: {}
# ── Chat Service ───────────────────────────────────────────
chat:
rule: "PathPrefix(`/api/v1/chat`)"
entryPoints:
- websecure
middlewares:
- auth-forward
- rate-limit
- strip-api-prefix
service: chat-svc
tls: {}
# ── Batch Agent Service ────────────────────────────────────
batch-agent:
rule: "PathPrefix(`/api/v1/agents`)"
entryPoints:
- websecure
middlewares:
- auth-forward
- rate-limit
- strip-api-prefix
service: batch-agent-svc
tls: {}
# ── Billing Service ────────────────────────────────────────
billing-webhook:
rule: "PathPrefix(`/api/v1/billing/webhook`)"
entryPoints:
- websecure
middlewares:
- rate-limit
- strip-api-prefix
service: billing-svc
tls: {}
priority: 10
billing:
rule: "PathPrefix(`/api/v1/billing`)"
entryPoints:
- websecure
middlewares:
- auth-forward
- rate-limit
- strip-api-prefix
service: billing-svc
tls: {}
# ── Health (no auth) ───────────────────────────────────────
health:
rule: "Path(`/api/v1/health`)"
entryPoints:
- websecure
service: auth-svc
tls: {}
services:
auth-svc:
loadBalancer:
servers:
- url: "http://auth:8000"
ws-gateway-svc:
loadBalancer:
sticky:
cookie:
name: "ws_affinity"
servers:
- url: "http://ws-gateway:8000"
chat-svc:
loadBalancer:
servers:
- url: "http://chat:8000"
batch-agent-svc:
loadBalancer:
servers:
- url: "http://batch-agent:8000"
billing-svc:
loadBalancer:
servers:
- url: "http://billing:8000"

39
traefik/traefik.yml Normal file
View File

@@ -0,0 +1,39 @@
# Traefik static configuration for microservices gateway
api:
dashboard: true
insecure: true # Dashboard on :8080 (internal only in prod)
entryPoints:
web:
address: ":80"
http:
redirections:
entryPoint:
to: websecure
scheme: https
websecure:
address: ":443"
http:
tls:
certResolver: cloudflare
providers:
docker:
exposedByDefault: false
file:
directory: /etc/traefik/dynamic
watch: true
# Automatic TLS via Let's Encrypt + Cloudflare DNS-01 challenge
certificatesResolvers:
cloudflare:
acme:
email: "${ACME_EMAIL}"
storage: /etc/traefik/acme/acme.json
dnsChallenge:
provider: cloudflare
delayBeforeCheck: 10
resolvers:
- "1.1.1.1:53"
- "8.8.8.8:53"