Source code for routir.processors.registry

import time
from typing import Any, Dict, List, Union

from ..models.abstract import Engine
from .abstract import Processor


_callable_service_types = {"search", "score", "decompose_query", "fuse"}
_uncallable_service_types = {"content"}

# TODO: why am I creating this mess...
_output_keys = {"search": "scores", "score": "scores", "decompose_query": "queries", "fuse": "scores"}


[docs] class ServiceNotFound(KeyError): """Raised when a (name, service_type) combo is not registered. KeyError so existing code that catches KeyError still works, but the distinct type lets transport layers map cleanly to HTTP 400 / gRPC NOT_FOUND. """ def __str__(self) -> str: # KeyError wraps its message in repr() quotes by default; override so # str(e) yields the plain message that REST/gRPC surface to callers. return self.args[0] if self.args else ""
class _ProcessorRegistry: """ Global registry for all processors and services. Manages processor instances indexed by service name and type. Attributes: all_services: Nested dict of service_name -> service_type -> processor slot_meta: Nested dict of service_name -> service_type -> metadata dict. Mirrors :attr:`all_services` and carries per-slot metadata such as ``view_kind`` (for score/search slots) and ``views`` / ``default_view`` (for content slots). Empty dict if no metadata was supplied at registration time. """ def __init__(self): """Initialize empty registry.""" # TODO: make collection name part of the namespace self.all_services: Dict[str, Dict[str, Processor]] = {} # Parallel metadata store: same outer/inner keys as ``all_services``. self.slot_meta: Dict[str, Dict[str, Dict[str, Any]]] = {} @property def valid_service_types(self): """Get set of valid service types.""" return _callable_service_types | _uncallable_service_types def __getitem__(self, name: str): """Get all processors for a service name.""" return self.all_services[name] def register(self, name: str, service_type: str, processor: Processor, **meta: Any): """ Register a processor. Args: name: Service name service_type: Service type ('search', 'score', 'content', etc.) processor: Processor instance **meta: Optional per-slot metadata. Recognised keys include ``view_kind`` (str, for score/search slots) and ``views`` / ``default_view`` (for content slots). The registry is agnostic to specific keys — callers read them back via :meth:`get_meta`. Raises: ValueError: If service type is invalid or service already exists. """ if service_type not in self.valid_service_types: raise ValueError(f"Invalid service type '{service_type}'. Valid types: {self.valid_service_types}") if name in self.all_services and service_type in self.all_services[name]: raise ValueError(f"Service type '{service_type}' of name '{name}' already exists.") if name not in self.all_services: self.all_services[name] = {} self.slot_meta[name] = {} self.all_services[name][service_type] = processor self.slot_meta[name][service_type] = dict(meta) if meta else {} def get_meta(self, name: str, service_type: str) -> Dict[str, Any]: """Return the metadata dict for a registered slot (empty dict if absent).""" return self.slot_meta.get(name, {}).get(service_type, {}) def get_slot_meta(self, name: str, service_type: str, key: str, default: Any = None) -> Any: """Convenience accessor: return one metadata value for a slot.""" return self.slot_meta.get(name, {}).get(service_type, {}).get(key, default) def has_service(self, name: str, service_type: str): """Check if a service exists.""" return name in self.all_services and service_type in self.all_services[name] def get(self, name: str, service_type: str): """Get a processor by service name and type.""" if self.has_service(name, service_type): return self.all_services[name][service_type] async def submit(self, name: str, service_type: str, data: dict) -> dict: if not self.has_service(name, service_type): raise ServiceNotFound( f"Service '{name}' not found or does not support {service_type}" ) return await self.get(name, service_type).submit(data) def get_all_services(self): """ Get all registered services grouped by type. Returns: Dict mapping service types to lists of service names """ services = {t: [] for t in self.valid_service_types} for name, processors in self.all_services.items(): for s in processors: services[s].append(name) return services
[docs] class DummyProcessor(Processor): """A minimal, no-cache processor that wraps a single engine method. Used internally by :func:`auto_register` to expose an engine method (``search``, ``fuse``, etc.) as a :class:`Processor` without any batching or caching overhead. Requests are dispatched directly to the engine method on every call. Not intended for production workloads where batching matters; prefer the config-based loading path via :func:`~routir.config.load.load_config`, which creates :class:`~routir.processors.abstract.BatchProcessor` instances. """
[docs] def __init__(self, engine: Engine, method: str): super().__init__(cache_size=0) if not hasattr(engine, method): raise ValueError(f"Engine {engine.__class__.__name__} does not have method '{method}'") self.service = getattr(engine, method) self.result_key = _output_keys[method]
[docs] async def submit(self, item: Dict[str, Any]) -> Dict[str, Any]: return {self.result_key: (await self.service(**item)), "processed": True, "timestamp": time.time()}
# singleton ProcessorRegistry = _ProcessorRegistry()
[docs] def auto_register(methods: Union[str, List[str]], view_kind: str = None, **default_init_kwargs): """ Class decorator that instantiates an engine and registers it as a processor. This is a lightweight alternative to the JSON config-based loading path, useful for **built-in** or **singleton** engines (e.g. stateless fusion rules like RRF) that do not need per-service batching or caching. .. warning:: ``auto_register`` creates a single engine instance with ``default_init_kwargs`` at decoration time using :class:`DummyProcessor` (no batching, no caching). For production engines with GPU models, use the config-based path instead so that :class:`~routir.processors.abstract.BatchProcessor` and LRU/Redis caching are applied. The engine class must implement the ``{method}_batch`` method for every *method* listed. The decorator checks the corresponding ``can_{method}`` property (e.g. ``can_fuse`` for ``"fuse"``) and raises ``TypeError`` if it returns ``False``. The service is registered under the engine's *class name* in :data:`ProcessorRegistry`. Args: methods (str or list[str]): Service type(s) to register. Valid values: ``"search"``, ``"score"``, ``"fuse"``, ``"decompose_query"``. view_kind (str, optional): Modality of the engine's ``score_batch`` / ``search_batch`` inputs (``"text"`` or ``"bytes"``). When omitted, falls back to the engine class's ``accepts_view_kind`` attribute (which defaults to ``"text"``). Only ``score`` / ``search`` slots store this metadata; ``fuse`` and ``decompose_query`` always register as ``"text"``. **default_init_kwargs: Keyword arguments forwarded to ``engine_cls()`` at decoration time. Returns: The unmodified engine class (decorator is transparent). Example: .. code-block:: python @auto_register("fuse") class RRFFusion(Engine): async def fuse_batch(self, queries, batch_scores, **kwargs): ... # RRFFusion is now accessible as ProcessorRegistry["RRFFusion"]["fuse"] """ if isinstance(methods, str): methods = [methods] def engine_dec(engine_cls: type[Engine]): """Decorator function that registers the engine.""" if not issubclass(engine_cls, Engine): raise TypeError(f"{engine_cls.__name__} must be a subclass of Engine") for method in methods: if not getattr(engine_cls, f"can_{method}"): raise TypeError( f"Engine {engine_cls.__name__} does not implement '{method}' " f"(can_{method} is False - did you override {method}_batch?)" ) resolved_view_kind = view_kind or getattr(engine_cls, "accepts_view_kind", "text") # register each for method in methods: slot_meta = {} # Only score/search slots care about modality; fuse / decompose_query # always work on text scores or strings. if method in {"score", "search"}: slot_meta["view_kind"] = resolved_view_kind else: slot_meta["view_kind"] = "text" ProcessorRegistry.register( engine_cls.__name__, method, processor=DummyProcessor(engine_cls(**default_init_kwargs), method), **slot_meta, ) return engine_cls return engine_dec