feat(api): folder quota helpers with atomic token usage
Implements check_folder_quota and add_token_usage in app/billing/quota.py with dialect-aware upsert (pg_insert on PostgreSQL, read-then-write on SQLite). Adds test_user_free/test_user_power fixtures and db alias to conftest.py. 6 new tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -17,6 +17,8 @@ from jose import jwt
|
||||
from sqlalchemy import StaticPool, event
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config.settings import settings
|
||||
from app.db import Base, get_session
|
||||
from app.main import app
|
||||
@@ -134,6 +136,32 @@ def auth_header(tier: str = "power", user_id: str | None = None) -> dict[str, st
|
||||
return {"Authorization": f"Bearer {make_jwt(tier, user_id)}"}
|
||||
|
||||
|
||||
# ── Convenience aliases and per-tier user fixtures ────────────────────
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db(db_session: AsyncSession) -> AsyncSession:
|
||||
"""Alias for db_session — used by folder quota tests."""
|
||||
return db_session
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_user_free(db_session: AsyncSession):
|
||||
"""Return the seeded free-tier User row."""
|
||||
result = await db_session.execute(
|
||||
select(User).where(User.id == TEST_USER_IDS["free"])
|
||||
)
|
||||
return result.scalar_one()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_user_power(db_session: AsyncSession):
|
||||
"""Return the seeded power-tier User row."""
|
||||
result = await db_session.execute(
|
||||
select(User).where(User.id == TEST_USER_IDS["power"])
|
||||
)
|
||||
return result.scalar_one()
|
||||
|
||||
|
||||
# ── CLI options ───────────────────────────────────────────────────────
|
||||
|
||||
def pytest_addoption(parser):
|
||||
|
||||
Reference in New Issue
Block a user