Processors

This section contains auto-generated documentation for all processor classes in Routir. Processors handle request batching, caching, and content processing.

Base Classes

class routir.processors.abstract.Processor(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Bases: FactoryEnabled

Base class for request processors with optional caching.

A Processor sits between the HTTP layer and an engine. It handles cache lookup/store and delegates to _submit() for the actual work.

When to subclass Processor vs BatchProcessor

  • Subclass Processor when requests should be handled one at a time — e.g. content lookup by document ID where batching adds no value. Override _submit() to implement the logic.

  • Subclass BatchProcessor when the underlying engine (e.g. a GPU model) benefits from processing multiple requests together. Override _process_batch() instead.

Both classes share the same caching interface; the cache is checked before _submit() / _process_batch() is called.

cache

Active cache instance (LRU or Redis), or None when caching is disabled.

Type:

Cache or None

cache_key

Function (item: dict) -> hashable used to derive the cache key from a request dict. Default key includes service, query, limit, and subset.

Type:

callable

__init__(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

async start()[source]

Initialize the processor (called before serving requests).

Heavy initialization tasks can be performed here.

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Any) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

class routir.processors.abstract.BatchProcessor(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that accumulates requests into batches before engine inference.

Requests are queued in an asyncio.Queue by _submit(). A background worker collects items until either batch_size is reached or max_wait_time seconds elapse, then calls _process_batch() with the whole batch. This amortises GPU/model overhead across concurrent requests, improving throughput at the cost of a small latency increase.

Subclasses must override _process_batch(); all other machinery is provided here.

The worker is started lazily on the first request (or explicitly via start()), and runs for the lifetime of the process.

__init__(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

async start()[source]

Initialize and start the batch processor.

Query Processors

class routir.processors.query_processors.AsyncQueryProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

class routir.processors.query_processors.BatchQueryProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

class routir.processors.query_processors.BatchDecomposeQueryProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

Content Processors

View-dispatch entry point for local document collections.

Houses ContentProcessor (view-dispatching, backed by one ViewBackend per declared view) and IRDSProcessor (ir_datasets backed) - the two ways a RoutIR server resolves (collection, view, doc_id) -> payload for collections that live on the local machine. Remote collections are handled by routir.collections.relay.

Dependency direction:

This module imports Processor from routir.processors.abstract. The reverse must never happen - routir.processors provides the online request/cache plumbing and must not depend on collection storage.

class routir.collections.processor.ContentProcessor(collection_config, cache_size=256, cache_ttl=600)[source]

Bases: Processor

Processor for retrieving document content by ID, dispatched per view.

Builds one ViewBackend per entry in CollectionConfig.views at startup; per-request dispatch picks the right backend by view name (defaulting to default_view when the request omits view).

config

Collection configuration.

default_view

View used when a request omits view.

backends

Mapping of view name to ViewBackend instance.

lang_mapping

Optional mapping of document IDs to language codes.

__init__(collection_config, cache_size=256, cache_ttl=600)[source]

Initialize content processor.

Parameters:
  • collection_config (CollectionConfig) – Collection configuration with at least one view (either declared directly via views or synthesized from the deprecated doc_path / content_field / … fields).

  • cache_size – Maximum cache entries. Defaults to a small in-memory LRU so per-id lookups (especially bytes views like keyframes, which incur real disk/tar I/O) don’t pay the I/O cost on every repeat fetch. Set to 0 to disable.

  • cache_ttl – Cache TTL in seconds.

class routir.collections.processor.IRDSProcessor(collection_config, cache_size=256, cache_ttl=600)[source]

Bases: Processor

Processor for retrieving document content by ID from IRDS format.

Inherits from ContentProcessor and uses IRDS-specific line reader.

__init__(collection_config, cache_size=256, cache_ttl=600)[source]

Initialize content processor.

Parameters:
  • collection_config (CollectionConfig) – Collection configuration with doc_path, id_field, etc.

  • cache_size – Maximum cache entries (default 256).

  • cache_ttl – Cache TTL in seconds.

Remote-collection wrapper that proxies content lookups to another RoutIR endpoint.

Used by routir.config.load.auto_add_relay_services() to register a local proxy processor for every content service advertised by a server in server_imports.

This module does not participate in ViewSpec dispatch the way routir.collections.processor does — today the relay forwards whole (collection, id) -> text requests as-is. View-aware proxying lands in PR4.

Local caching is important here: a single rerank stage can fetch the top-N documents (often 100 or more), so without caching every query pays the full network round-trip per doc. The per-run doc_content_cache on SearchPipeline only dedupes within a single request.

class routir.collections.relay.RelayContentProcessor(collection, endpoint, grpc_endpoint=None, api_key=None, transport='auto', timeout=600, retries=10, tls=None, cache_size=-1, cache_ttl=600, redis_url=None, redis_kwargs=None)[source]

Bases: Processor

Forward content lookups to a remote RoutIR endpoint via AsyncClient.

The processor mirrors ContentProcessor on the wire: cache key is the document id, request shape is {"id": str}, response shape includes text (and optionally title, language) on success, or error on failure.

Parameters:
  • collection (str) – Remote collection name to query.

  • endpoint (str) – REST URL of the remote RoutIR server.

  • grpc_endpoint (str, optional) – gRPC target for the data plane.

  • api_key (str, optional) – Bearer token forwarded by the client.

  • transport (str) – One of "auto" (default), "grpc", "rest".

  • timeout (float) – Per-request timeout in seconds (default 600).

  • retries (int) – Client-level retry budget (default 10).

  • tls (bool, optional) – Explicit TLS flag for the gRPC channel.

  • cache_size (int) – Local LRU cache capacity; <= 0 disables.

  • cache_ttl (int) – Local cache TTL in seconds.

  • redis_url (str, optional) – Use Redis instead of LRU when set.

  • redis_kwargs (dict) – Extra kwargs forwarded to the Redis client.

__init__(collection, endpoint, grpc_endpoint=None, api_key=None, transport='auto', timeout=600, retries=10, tls=None, cache_size=-1, cache_ttl=600, redis_url=None, redis_kwargs=None)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

Score Processors

class routir.processors.score_processors.AsyncPairwiseScoreProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

class routir.processors.score_processors.BatchPairwiseScoreProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Initialize the batch processor.

Parameters:
  • batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).

  • max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.

  • cache_size (int) – LRU cache size; -1 disables caching.

  • cache_ttl (int) – Cache TTL in seconds.

  • cache_key (callable, optional) – Custom cache-key function; see Processor for details.

  • **kwargs – Forwarded to Processor.__init__.

Cache

class routir.processors.cache.Cache(capacity=-1, ttl=-1)[source]

Bases: ABC

Abstract base class for cache implementations.

__init__(capacity=-1, ttl=-1)[source]

Initialize cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

abstractmethod async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

abstractmethod async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

abstractmethod async remove(key)[source]

Remove an item from the cache.

Parameters:

key (str) – The cache key

abstractmethod async clear()[source]

Clear all items from the cache.

abstractmethod async __len__()[source]

Return the number of items in the cache.

async connect()[source]

Connect to cache backend (if needed). Override in subclasses.

async close()[source]

Close cache connection (if needed). Override in subclasses.

async __aenter__()[source]

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Async context manager exit.

class routir.processors.cache.LRUCache(capacity=-1, ttl=-1)[source]

Bases: Cache

An async LRU cache implementation.

__init__(capacity=-1, ttl=-1)[source]

Initialize LRU cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear the cache.

async __len__()[source]

Return the number of items in the cache.

class routir.processors.cache.RedisCache(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Bases: Cache

A Redis-based cache implementation with async support.

__init__(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Initialize Redis cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited, not enforced by Redis)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

  • redis_url (str) – Redis connection URL

  • key_prefix (str) – Prefix for all cache keys to avoid collisions

  • **redis_kwargs – Additional arguments to pass to Redis client

async connect()[source]

Establish connection to Redis.

async close()[source]

Close Redis connection.

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear all cache entries with the current prefix.

async __len__()[source]

Return the number of items in the cache with the current prefix.

Random Access Readers

Random-access reader machinery used by text-jsonl views (and similar).

Houses RandomAccessReader (the abstract O(1) lookup interface) plus the two concrete backends shipped today: OffsetFile for plain JSONL collections (with a .offsetmap sidecar) and MSMARCOSegOffset for MSMARCO v2.1 sharded gzip corpora where IDs encode shard and byte offset directly.

These primitives know nothing about Processor or the request/response shapes – they are pure storage adapters consumed by content view backends.

class routir.collections.indexing.offset_file.RandomAccessReader(path)[source]

Bases: object

Abstract base for O(1) document lookup by string ID.

Subclasses wrap different on-disk formats (plain JSONL, sharded gzip) and expose a uniform reader[doc_id] interface that returns the raw JSON line for a document.

path

Root path of the document store (file or directory).

Type:

Path

__init__(path)[source]
__getitem__(idx)[source]

Return the raw JSON line for document idx.

__contains__(idx)[source]

Return True if idx is present in this reader.

class routir.collections.indexing.offset_file.OffsetFile(path, key=None, cache_dir=None, id_field='id')[source]

Bases: RandomAccessReader

Fast random-access reader for JSONL document files using a byte-offset map.

On first access, scans the JSONL file and builds a {doc_id: byte_offset} mapping that is pickled to a .offsetmap sidecar file beside the corpus. Subsequent startups load the sidecar directly (O(1) per document lookup).

Documents are retrieved by seeking to the stored byte offset and reading one line – no need to load the entire corpus into memory.

The sidecar location is resolved via the shared fallback chain in sidecar: per-view cache_dir first, then adjacent to the JSONL, then ${XDG_CACHE_HOME}/routir/offsetmap/.

Example:

reader = OffsetFile("/data/corpus.jsonl", id_field="docid")
line = reader["msmarco_v2_doc_00_123456"]  # raw JSON string
doc = json.loads(line)
__init__(path, key=None, cache_dir=None, id_field='id')[source]

Initialize OffsetFile and build or load the byte-offset map.

Parameters:
  • path (Path) – Path to the JSONL corpus file.

  • key (callable, optional) – Legacy (line: str) -> doc_id function. Prefer id_field instead. When both are provided, key takes precedence.

  • cache_dir (str, optional) – Directory for the .offsetmap sidecar file (PR7 – interpreted as a directory, not a single file). If unset, RoutIR writes adjacent to the corpus; on PermissionError it falls back to ${XDG_CACHE_HOME:-~/.cache}/routir/offsetmap/. Inside the directory, the sidecar name is <basename>.<hash16>.offsetmap where the hash is the first 16 hex chars of sha256(realpath(corpus)), preventing collisions across multiple corpora that share one cache dir.

  • id_field (str) – JSON field name used as the document ID when key is not provided (default "id").

create_offsetmap(fn, candidates, key)[source]

Scan fn line-by-line and write a {doc_id: byte_offset} pickle.

The pickle is written to the first writable entry in candidates (see atomic_write_sidecar()). This is called once at first startup and may take several minutes for large corpora. Raises if more than 10 consecutive lines fail to parse.

class routir.collections.indexing.offset_file.MSMARCOSegOffset(path, num_workers=32, filename_pattern='msmarco_v2.1_doc_segmented_{shard}.json.gz', id_parser=None, force_load_all=False)[source]

Bases: RandomAccessReader

Random-access reader for MSMARCO v2.1 sharded gzip document files.

The MSMARCO v2.1 segmented document corpus is stored as multiple gzip shards. Document IDs encode both the shard number and the byte offset within that shard (e.g. msmarco_v2.1_doc_segmented_00_123456_0_789), enabling direct O(1) seeks without an external offset map.

Uses rapidgzip for parallel decompression when available, falling back to the standard gzip module. Open file handles per shard are cached in cached_fps for the lifetime of the reader.

Example:

reader = MSMARCOSegOffset("/data/msmarco-v2.1-doc-segmented/")
line = reader["msmarco_v2.1_doc_segmented_00_123_0_456"]
doc = json.loads(line)
__init__(path, num_workers=32, filename_pattern='msmarco_v2.1_doc_segmented_{shard}.json.gz', id_parser=None, force_load_all=False)[source]

Initialize the MSMARCO segmented document reader.

Parameters:
  • path (Path) – Directory containing the sharded .json.gz files.

  • num_workers (int) – Parallelism passed to rapidgzip for decompression (default 32). Ignored when falling back to native gzip.

  • filename_pattern (str) – Glob/format pattern for shard filenames. {shard} is replaced with the shard identifier extracted from the document ID.

  • id_parser (callable, optional) – (doc_id: str) -> (shard, offset) function. Defaults to _parse_idx(), which handles the standard MSMARCO v2.1 ID format msmarco_v2.1_doc_segmented_<shard>_<…>_<…>_<offset>. Override for custom ID formats.

  • force_load_all (bool) – When True, all documents across all shards are loaded into memory at startup. Trades startup time and memory for maximum throughput. Default False uses on-demand seek-based access.

Registry

exception routir.processors.registry.ServiceNotFound[source]

Bases: KeyError

Raised when a (name, service_type) combo is not registered.

KeyError so existing code that catches KeyError still works, but the distinct type lets transport layers map cleanly to HTTP 400 / gRPC NOT_FOUND.

class routir.processors.registry.DummyProcessor(engine, method)[source]

Bases: Processor

A minimal, no-cache processor that wraps a single engine method.

Used internally by auto_register() to expose an engine method (search, fuse, etc.) as a Processor without any batching or caching overhead. Requests are dispatched directly to the engine method on every call.

Not intended for production workloads where batching matters; prefer the config-based loading path via load_config(), which creates BatchProcessor instances.

__init__(engine, method)[source]

Initialize the processor with optional caching.

Parameters:
  • cache_size (int) – Maximum number of cached entries. -1 or 0 disables caching entirely. When redis_url is set, this controls the Redis key-count budget (approximate).

  • cache_ttl (int) – Cache entry time-to-live in seconds (default 600).

  • cache_key (callable, optional) – (item: dict) -> hashable function to derive a cache key from a request dict. The default key is (service, query, limit, subset). Override when additional request fields affect the result (e.g. pass cache_key_fields via a closure, as load_config() does per service).

  • redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.

  • redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Dict[str, Any]) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

routir.processors.registry.auto_register(methods, view_kind=None, **default_init_kwargs)[source]

Class decorator that instantiates an engine and registers it as a processor.

This is a lightweight alternative to the JSON config-based loading path, useful for built-in or singleton engines (e.g. stateless fusion rules like RRF) that do not need per-service batching or caching.

Warning

auto_register creates a single engine instance with default_init_kwargs at decoration time using DummyProcessor (no batching, no caching). For production engines with GPU models, use the config-based path instead so that BatchProcessor and LRU/Redis caching are applied.

The engine class must implement the {method}_batch method for every method listed. The decorator checks the corresponding can_{method} property (e.g. can_fuse for "fuse") and raises TypeError if it returns False.

The service is registered under the engine’s class name in ProcessorRegistry.

Parameters:
  • methods (str or list[str]) – Service type(s) to register. Valid values: "search", "score", "fuse", "decompose_query".

  • view_kind (str, optional) – Modality of the engine’s score_batch / search_batch inputs ("text" or "bytes"). When omitted, falls back to the engine class’s accepts_view_kind attribute (which defaults to "text"). Only score / search slots store this metadata; fuse and decompose_query always register as "text".

  • **default_init_kwargs – Keyword arguments forwarded to engine_cls() at decoration time.

Returns:

The unmodified engine class (decorator is transparent).

Example

@auto_register("fuse")
class RRFFusion(Engine):
    async def fuse_batch(self, queries, batch_scores, **kwargs):
        ...

# RRFFusion is now accessible as ProcessorRegistry["RRFFusion"]["fuse"]