feat(scouts): add connector registry
This commit is contained in:
32
app/scouts/connectors/registry.py
Normal file
32
app/scouts/connectors/registry.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Connector registry — single source of truth for source_type -> connector."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
_CONNECTORS: dict[str, Any] = {}
|
||||
|
||||
|
||||
def register_connector(connector: Any) -> None:
|
||||
"""Register a SourceConnector instance under its ``source_type``.
|
||||
|
||||
Calling twice with the same ``source_type`` replaces the prior entry —
|
||||
useful for tests and hot-reload, but in production each connector
|
||||
should be registered exactly once at startup.
|
||||
"""
|
||||
if not getattr(connector, "source_type", None):
|
||||
raise ValueError("Connector must declare a non-empty source_type")
|
||||
_CONNECTORS[connector.source_type] = connector
|
||||
|
||||
|
||||
def get_connector(source_type: str) -> Any:
|
||||
"""Return the registered connector for ``source_type`` or raise KeyError."""
|
||||
try:
|
||||
return _CONNECTORS[source_type]
|
||||
except KeyError as exc:
|
||||
raise KeyError(f"No connector registered for source_type {source_type!r}") from exc
|
||||
|
||||
|
||||
def _reset_for_tests() -> None:
|
||||
"""Clear the registry — for use in pytest fixtures only."""
|
||||
_CONNECTORS.clear()
|
||||
48
tests/test_scout_connector_registry.py
Normal file
48
tests/test_scout_connector_registry.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Tests for the connector registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from app.scouts.connectors.base import ItemRef
|
||||
from app.scouts.connectors.registry import (
|
||||
get_connector,
|
||||
register_connector,
|
||||
_reset_for_tests,
|
||||
)
|
||||
|
||||
|
||||
class _DummyConnector:
|
||||
source_type = "dummy"
|
||||
async def list_new(self, scout): return []
|
||||
async def fetch_metadata(self, scout, ref): raise NotImplementedError
|
||||
async def fetch_content(self, scout, ref): raise NotImplementedError
|
||||
async def archive(self, scout, ref): raise NotImplementedError
|
||||
async def setup_watch(self, scout): raise NotImplementedError
|
||||
async def renew_watch(self, scout): raise NotImplementedError
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_registry():
|
||||
_reset_for_tests()
|
||||
yield
|
||||
_reset_for_tests()
|
||||
|
||||
|
||||
def test_register_and_get():
|
||||
c = _DummyConnector()
|
||||
register_connector(c)
|
||||
assert get_connector("dummy") is c
|
||||
|
||||
|
||||
def test_unknown_source_raises():
|
||||
with pytest.raises(KeyError):
|
||||
get_connector("nope")
|
||||
|
||||
|
||||
def test_double_register_replaces():
|
||||
a = _DummyConnector()
|
||||
b = _DummyConnector()
|
||||
register_connector(a)
|
||||
register_connector(b)
|
||||
assert get_connector("dummy") is b
|
||||
Reference in New Issue
Block a user