Processors

This section contains auto-generated documentation for all processor classes in Routir. Processors handle request batching, caching, and content processing.

Base Classes

class routir.processors.abstract.Processor(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Bases: FactoryEnabled

Base class for request processors with optional caching.

A Processor sits between the HTTP layer and an engine. It handles cache lookup/store and delegates to _submit() for the actual work.

When to subclass Processor vs BatchProcessor

  • Subclass Processor when requests should be handled one at a time — e.g. content lookup by document ID where batching adds no value. Override _submit() to implement the logic.

  • Subclass BatchProcessor when the underlying engine (e.g. a GPU model) benefits from processing multiple requests together. Override _process_batch() instead.

Both classes share the same caching interface; the cache is checked before _submit() / _process_batch() is called.

cache

Active cache instance (LRU or Redis), or None when caching is disabled.

Type:

Cache or None

cache_key

Function (item: dict) -> hashable used to derive the cache key from a request dict. Default key includes service, query, limit, and subset.

Type:

callable

__init__(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

async start()[source]

Initialize the processor (called before serving requests).

Heavy initialization tasks can be performed here.

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Any) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

class routir.processors.abstract.BatchProcessor(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that accumulates requests into batches before engine inference.

Requests are queued in an asyncio.Queue by _submit(). A background worker collects items until either batch_size is reached or max_wait_time seconds elapse, then calls _process_batch() with the whole batch. This amortises GPU/model overhead across concurrent requests, improving throughput at the cost of a small latency increase.

Subclasses must override _process_batch(); all other machinery is provided here.

The worker is started lazily on the first request (or explicitly via start()), and runs for the lifetime of the process.

__init__(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

async start()[source]

Initialize and start the batch processor.

Query Processors

class routir.processors.query_processors.AsyncQueryProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

class routir.processors.query_processors.BatchQueryProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

class routir.processors.query_processors.BatchDecomposeQueryProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

Content Processors

class routir.processors.content_processors.ContentProcessor(collection_config, cache_size=0, cache_ttl=600)[source]

Bases: Processor

Processor for retrieving document content by ID.

Provides fast random access to documents in JSONL files using offset maps.

config

Collection configuration

line_reader

Random access reader for document file

content_field

Field(s) containing document text

lang_mapping

Optional mapping of document IDs to language codes

__init__(collection_config, cache_size=0, cache_ttl=600)[source]

Initialize content processor.

Parameters:
  • collection_config (CollectionConfig) – Collection configuration with doc_path, id_field, etc.

  • cache_size – Maximum cache entries

  • cache_ttl – Cache TTL in seconds

class routir.processors.content_processors.IRDSProcessor(collection_config, cache_size=0, cache_ttl=600)[source]

Bases: Processor

Processor for retrieving document content by ID from IRDS format.

Inherits from ContentProcessor and uses IRDS-specific line reader.

__init__(collection_config, cache_size=0, cache_ttl=600)[source]

Initialize content processor.

Parameters:
  • collection_config (CollectionConfig) – Collection configuration with doc_path, id_field, etc.

  • cache_size – Maximum cache entries

  • cache_ttl – Cache TTL in seconds

Score Processors

class routir.processors.score_processors.AsyncPairwiseScoreProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

class routir.processors.score_processors.BatchPairwiseScoreProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

Cache

class routir.processors.cache.Cache(capacity=-1, ttl=-1)[source]

Bases: ABC

Abstract base class for cache implementations.

__init__(capacity=-1, ttl=-1)[source]

Initialize cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

abstractmethod async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

abstractmethod async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

abstractmethod async remove(key)[source]

Remove an item from the cache.

Parameters:

key (str) – The cache key

abstractmethod async clear()[source]

Clear all items from the cache.

abstractmethod async __len__()[source]

Return the number of items in the cache.

async connect()[source]

Connect to cache backend (if needed). Override in subclasses.

async close()[source]

Close cache connection (if needed). Override in subclasses.

async __aenter__()[source]

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Async context manager exit.

class routir.processors.cache.LRUCache(capacity=-1, ttl=-1)[source]

Bases: Cache

An async LRU cache implementation.

__init__(capacity=-1, ttl=-1)[source]

Initialize LRU cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear the cache.

async __len__()[source]

Return the number of items in the cache.

class routir.processors.cache.RedisCache(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Bases: Cache

A Redis-based cache implementation with async support.

__init__(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Initialize Redis cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited, not enforced by Redis)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

  • redis_url (str) – Redis connection URL

  • key_prefix (str) – Prefix for all cache keys to avoid collisions

  • **redis_kwargs – Additional arguments to pass to Redis client

async connect()[source]

Establish connection to Redis.

async close()[source]

Close Redis connection.

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear all cache entries with the current prefix.

async __len__()[source]

Return the number of items in the cache with the current prefix.

File Random Access Reader

Registry

class routir.processors.registry.DummyProcessor(engine, method)[source]

Bases: Processor

A minimal, no-cache processor that wraps a single engine method.

Used internally by auto_register() to expose an engine method (search, fuse, etc.) as a Processor without any batching or caching overhead. Requests are dispatched directly to the engine method on every call.

Not intended for production workloads where batching matters; prefer the config-based loading path via load_config(), which creates BatchProcessor instances.

__init__(engine, method)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Dict[str, Any]) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

routir.processors.registry.auto_register(methods, **default_init_kwargs)[source]

Class decorator that instantiates an engine and registers it as a processor.

This is a lightweight alternative to the JSON config-based loading path, useful for built-in or singleton engines (e.g. stateless fusion rules like RRF) that do not need per-service batching or caching.

Warning

auto_register creates a single engine instance with default_init_kwargs at decoration time using DummyProcessor (no batching, no caching). For production engines with GPU models, use the config-based path instead so that BatchProcessor and LRU/Redis caching are applied.

The engine class must implement the {method}_batch method for every method listed. The decorator checks the corresponding can_{method} property (e.g. can_fuse for "fuse") and raises TypeError if it returns False.

The service is registered under the engine’s class name in ProcessorRegistry.

Parameters:
  • methods (str or list[str]) – Service type(s) to register. Valid values: "search", "score", "fuse", "decompose_query".

  • **default_init_kwargs – Keyword arguments forwarded to engine_cls() at decoration time.

Returns:

The unmodified engine class (decorator is transparent).

Example

@auto_register("fuse")
class RRFFusion(Engine):
    async def fuse_batch(self, queries, batch_scores, **kwargs):
        ...

# RRFFusion is now accessible as ProcessorRegistry["RRFFusion"]["fuse"]