"""Vectors routes: upsert, search, and delete cloud vector store entries.""" from __future__ import annotations from fastapi import APIRouter, Depends from pydantic import BaseModel from app.api.deps import get_current_user from app.schemas import ( UserProfile, VectorSearchRequest, VectorSearchResponse, VectorUpsertRequest, ) from app.storage.encryption import reject_if_tampered from app.storage.vector_store import VectorStore router = APIRouter(prefix="/storage", tags=["vectors"]) _vector_store = VectorStore() class _VectorDeleteRequest(BaseModel): ids: list[str] @router.post("/vectors/upsert", response_model=dict) async def upsert_vectors( body: VectorUpsertRequest, current_user: UserProfile = Depends(get_current_user), ) -> dict[str, int]: """Verify checksums and store encrypted vectors in the user-scoped namespace.""" for item in body.vectors: reject_if_tampered(item.blob, item.checksum) await _vector_store.upsert(current_user.id, body.vectors) return {"upserted": len(body.vectors)} @router.post("/vectors/search", response_model=VectorSearchResponse) async def search_vectors( body: VectorSearchRequest, current_user: UserProfile = Depends(get_current_user), ) -> VectorSearchResponse: """Search the user-scoped vector namespace with an encrypted query blob.""" results = await _vector_store.search(current_user.id, body.query_blob, body.top_k) return VectorSearchResponse(results=results) @router.delete("/vectors", response_model=dict) async def delete_vectors( body: _VectorDeleteRequest, current_user: UserProfile = Depends(get_current_user), ) -> dict[str, bool]: """Delete vectors by ID, scoped to the authenticated user.""" await _vector_store.delete(current_user.id, body.ids) return {"ok": True}