Processors
This section contains auto-generated documentation for all processor classes in Routir. Processors handle request batching, caching, and content processing.
Base Classes
- class routir.processors.abstract.Processor(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]
Bases:
FactoryEnabledBase class for request processors with optional caching.
A
Processorsits between the HTTP layer and an engine. It handles cache lookup/store and delegates to_submit()for the actual work.When to subclass Processor vs BatchProcessor
Subclass
Processorwhen requests should be handled one at a time — e.g. content lookup by document ID where batching adds no value. Override_submit()to implement the logic.Subclass
BatchProcessorwhen the underlying engine (e.g. a GPU model) benefits from processing multiple requests together. Override_process_batch()instead.
Both classes share the same caching interface; the cache is checked before
_submit()/_process_batch()is called.- cache_key
Function
(item: dict) -> hashableused to derive the cache key from a request dict. Default key includesservice,query,limit, andsubset.- Type:
callable
- __init__(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]
Initialize the processor with optional caching.
- Parameters:
cache_size (int) – Maximum number of cached entries.
-1or0disables caching entirely. Whenredis_urlis set, this controls the Redis key-count budget (approximate).cache_ttl (int) – Cache entry time-to-live in seconds (default 600).
cache_key (callable, optional) –
(item: dict) -> hashablefunction to derive a cache key from a request dict. The default key is(service, query, limit, subset). Override when additional request fields affect the result (e.g. passcache_key_fieldsvia a closure, asload_config()does per service).redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.
redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.
- class routir.processors.abstract.BatchProcessor(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Bases:
ProcessorProcessor that accumulates requests into batches before engine inference.
Requests are queued in an
asyncio.Queueby_submit(). A background worker collects items until eitherbatch_sizeis reached ormax_wait_timeseconds elapse, then calls_process_batch()with the whole batch. This amortises GPU/model overhead across concurrent requests, improving throughput at the cost of a small latency increase.Subclasses must override
_process_batch(); all other machinery is provided here.The worker is started lazily on the first request (or explicitly via
start()), and runs for the lifetime of the process.- __init__(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Initialize the batch processor.
- Parameters:
batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).
max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.
cache_size (int) – LRU cache size;
-1disables caching.cache_ttl (int) – Cache TTL in seconds.
cache_key (callable, optional) – Custom cache-key function; see
Processorfor details.**kwargs – Forwarded to
Processor.__init__.
Query Processors
- class routir.processors.query_processors.AsyncQueryProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Bases:
ProcessorProcessor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls
- __init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Initialize the processor with optional caching.
- Parameters:
cache_size (int) – Maximum number of cached entries.
-1or0disables caching entirely. Whenredis_urlis set, this controls the Redis key-count budget (approximate).cache_ttl (int) – Cache entry time-to-live in seconds (default 600).
cache_key (callable, optional) –
(item: dict) -> hashablefunction to derive a cache key from a request dict. The default key is(service, query, limit, subset). Override when additional request fields affect the result (e.g. passcache_key_fieldsvia a closure, asload_config()does per service).redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.
redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.
- class routir.processors.query_processors.BatchQueryProcessor(engine, **kwargs)[source]
Bases:
BatchProcessor- __init__(engine, **kwargs)[source]
Initialize the batch processor.
- Parameters:
batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).
max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.
cache_size (int) – LRU cache size;
-1disables caching.cache_ttl (int) – Cache TTL in seconds.
cache_key (callable, optional) – Custom cache-key function; see
Processorfor details.**kwargs – Forwarded to
Processor.__init__.
- class routir.processors.query_processors.BatchDecomposeQueryProcessor(engine, **kwargs)[source]
Bases:
BatchProcessor- __init__(engine, **kwargs)[source]
Initialize the batch processor.
- Parameters:
batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).
max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.
cache_size (int) – LRU cache size;
-1disables caching.cache_ttl (int) – Cache TTL in seconds.
cache_key (callable, optional) – Custom cache-key function; see
Processorfor details.**kwargs – Forwarded to
Processor.__init__.
Content Processors
View-dispatch entry point for local document collections.
Houses ContentProcessor (view-dispatching, backed by one
ViewBackend per declared view) and IRDSProcessor
(ir_datasets backed) - the two ways a RoutIR server resolves
(collection, view, doc_id) -> payload for collections that live on the
local machine. Remote collections are handled by routir.collections.relay.
- Dependency direction:
This module imports
Processorfromroutir.processors.abstract. The reverse must never happen -routir.processorsprovides the online request/cache plumbing and must not depend on collection storage.
- class routir.collections.processor.ContentProcessor(collection_config, cache_size=256, cache_ttl=600)[source]
Bases:
ProcessorProcessor for retrieving document content by ID, dispatched per view.
Builds one
ViewBackendper entry inCollectionConfig.viewsat startup; per-request dispatch picks the right backend by view name (defaulting todefault_viewwhen the request omitsview).- config
Collection configuration.
- default_view
View used when a request omits
view.
- backends
Mapping of view name to
ViewBackendinstance.
- lang_mapping
Optional mapping of document IDs to language codes.
- __init__(collection_config, cache_size=256, cache_ttl=600)[source]
Initialize content processor.
- Parameters:
collection_config (CollectionConfig) – Collection configuration with at least one view (either declared directly via
viewsor synthesized from the deprecateddoc_path/content_field/ … fields).cache_size – Maximum cache entries. Defaults to a small in-memory LRU so per-id lookups (especially bytes views like keyframes, which incur real disk/tar I/O) don’t pay the I/O cost on every repeat fetch. Set to
0to disable.cache_ttl – Cache TTL in seconds.
- class routir.collections.processor.IRDSProcessor(collection_config, cache_size=256, cache_ttl=600)[source]
Bases:
ProcessorProcessor for retrieving document content by ID from IRDS format.
Inherits from ContentProcessor and uses IRDS-specific line reader.
- __init__(collection_config, cache_size=256, cache_ttl=600)[source]
Initialize content processor.
- Parameters:
collection_config (CollectionConfig) – Collection configuration with doc_path, id_field, etc.
cache_size – Maximum cache entries (default 256).
cache_ttl – Cache TTL in seconds.
Remote-collection wrapper that proxies content lookups to another RoutIR endpoint.
Used by routir.config.load.auto_add_relay_services() to register a local
proxy processor for every content service advertised by a server in
server_imports.
This module does not participate in ViewSpec dispatch the way
routir.collections.processor does — today the relay forwards whole
(collection, id) -> text requests as-is. View-aware proxying lands in PR4.
Local caching is important here: a single rerank stage can fetch the top-N
documents (often 100 or more), so without caching every query pays the full
network round-trip per doc. The per-run doc_content_cache on
SearchPipeline only dedupes within a single request.
- class routir.collections.relay.RelayContentProcessor(collection, endpoint, grpc_endpoint=None, api_key=None, transport='auto', timeout=600, retries=10, tls=None, cache_size=-1, cache_ttl=600, redis_url=None, redis_kwargs=None)[source]
Bases:
ProcessorForward content lookups to a remote RoutIR endpoint via
AsyncClient.The processor mirrors
ContentProcessoron the wire: cache key is the documentid, request shape is{"id": str}, response shape includestext(and optionallytitle,language) on success, orerroron failure.- Parameters:
collection (str) – Remote collection name to query.
endpoint (str) – REST URL of the remote RoutIR server.
grpc_endpoint (str, optional) – gRPC target for the data plane.
api_key (str, optional) – Bearer token forwarded by the client.
transport (str) – One of
"auto"(default),"grpc","rest".timeout (float) – Per-request timeout in seconds (default 600).
retries (int) – Client-level retry budget (default 10).
tls (bool, optional) – Explicit TLS flag for the gRPC channel.
cache_size (int) – Local LRU cache capacity;
<= 0disables.cache_ttl (int) – Local cache TTL in seconds.
redis_url (str, optional) – Use Redis instead of LRU when set.
redis_kwargs (dict) – Extra kwargs forwarded to the Redis client.
- __init__(collection, endpoint, grpc_endpoint=None, api_key=None, transport='auto', timeout=600, retries=10, tls=None, cache_size=-1, cache_ttl=600, redis_url=None, redis_kwargs=None)[source]
Initialize the processor with optional caching.
- Parameters:
cache_size (int) – Maximum number of cached entries.
-1or0disables caching entirely. Whenredis_urlis set, this controls the Redis key-count budget (approximate).cache_ttl (int) – Cache entry time-to-live in seconds (default 600).
cache_key (callable, optional) –
(item: dict) -> hashablefunction to derive a cache key from a request dict. The default key is(service, query, limit, subset). Override when additional request fields affect the result (e.g. passcache_key_fieldsvia a closure, asload_config()does per service).redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.
redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.
Score Processors
- class routir.processors.score_processors.AsyncPairwiseScoreProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Bases:
ProcessorProcessor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls
- __init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]
Initialize the processor with optional caching.
- Parameters:
cache_size (int) – Maximum number of cached entries.
-1or0disables caching entirely. Whenredis_urlis set, this controls the Redis key-count budget (approximate).cache_ttl (int) – Cache entry time-to-live in seconds (default 600).
cache_key (callable, optional) –
(item: dict) -> hashablefunction to derive a cache key from a request dict. The default key is(service, query, limit, subset). Override when additional request fields affect the result (e.g. passcache_key_fieldsvia a closure, asload_config()does per service).redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.
redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.
- class routir.processors.score_processors.BatchPairwiseScoreProcessor(engine, **kwargs)[source]
Bases:
BatchProcessor- __init__(engine, **kwargs)[source]
Initialize the batch processor.
- Parameters:
batch_size (int) – Maximum number of requests accumulated into one batch before the engine is called (default 32).
max_wait_time (float) – Maximum seconds to wait for the batch to fill before processing a partial batch (default 0.1 s). Tune this to balance latency vs. GPU utilisation — lower values reduce wait time, higher values pack more requests per batch.
cache_size (int) – LRU cache size;
-1disables caching.cache_ttl (int) – Cache TTL in seconds.
cache_key (callable, optional) – Custom cache-key function; see
Processorfor details.**kwargs – Forwarded to
Processor.__init__.
Cache
- class routir.processors.cache.Cache(capacity=-1, ttl=-1)[source]
Bases:
ABCAbstract base class for cache implementations.
- class routir.processors.cache.LRUCache(capacity=-1, ttl=-1)[source]
Bases:
CacheAn async LRU cache implementation.
- class routir.processors.cache.RedisCache(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]
Bases:
CacheA Redis-based cache implementation with async support.
- __init__(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]
Initialize Redis cache.
- Parameters:
capacity (int) – Maximum number of items to store (-1 for unlimited, not enforced by Redis)
ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)
redis_url (str) – Redis connection URL
key_prefix (str) – Prefix for all cache keys to avoid collisions
**redis_kwargs – Additional arguments to pass to Redis client
Random Access Readers
Random-access reader machinery used by text-jsonl views (and similar).
Houses RandomAccessReader (the abstract O(1) lookup interface) plus the
two concrete backends shipped today: OffsetFile for plain JSONL
collections (with a .offsetmap sidecar) and MSMARCOSegOffset for
MSMARCO v2.1 sharded gzip corpora where IDs encode shard and byte offset
directly.
These primitives know nothing about Processor or the request/response
shapes – they are pure storage adapters consumed by content view backends.
- class routir.collections.indexing.offset_file.RandomAccessReader(path)[source]
Bases:
objectAbstract base for O(1) document lookup by string ID.
Subclasses wrap different on-disk formats (plain JSONL, sharded gzip) and expose a uniform
reader[doc_id]interface that returns the raw JSON line for a document.- path
Root path of the document store (file or directory).
- Type:
Path
- class routir.collections.indexing.offset_file.OffsetFile(path, key=None, cache_dir=None, id_field='id')[source]
Bases:
RandomAccessReaderFast random-access reader for JSONL document files using a byte-offset map.
On first access, scans the JSONL file and builds a
{doc_id: byte_offset}mapping that is pickled to a.offsetmapsidecar file beside the corpus. Subsequent startups load the sidecar directly (O(1) per document lookup).Documents are retrieved by seeking to the stored byte offset and reading one line – no need to load the entire corpus into memory.
The sidecar location is resolved via the shared fallback chain in
sidecar: per-viewcache_dirfirst, then adjacent to the JSONL, then${XDG_CACHE_HOME}/routir/offsetmap/.Example:
reader = OffsetFile("/data/corpus.jsonl", id_field="docid") line = reader["msmarco_v2_doc_00_123456"] # raw JSON string doc = json.loads(line)
- __init__(path, key=None, cache_dir=None, id_field='id')[source]
Initialize OffsetFile and build or load the byte-offset map.
- Parameters:
path (Path) – Path to the JSONL corpus file.
key (callable, optional) – Legacy
(line: str) -> doc_idfunction. Preferid_fieldinstead. When both are provided,keytakes precedence.cache_dir (str, optional) – Directory for the
.offsetmapsidecar file (PR7 – interpreted as a directory, not a single file). If unset, RoutIR writes adjacent to the corpus; onPermissionErrorit falls back to${XDG_CACHE_HOME:-~/.cache}/routir/offsetmap/. Inside the directory, the sidecar name is<basename>.<hash16>.offsetmapwhere the hash is the first 16 hex chars ofsha256(realpath(corpus)), preventing collisions across multiple corpora that share one cache dir.id_field (str) – JSON field name used as the document ID when
keyis not provided (default"id").
- create_offsetmap(fn, candidates, key)[source]
Scan fn line-by-line and write a
{doc_id: byte_offset}pickle.The pickle is written to the first writable entry in candidates (see
atomic_write_sidecar()). This is called once at first startup and may take several minutes for large corpora. Raises if more than 10 consecutive lines fail to parse.
- class routir.collections.indexing.offset_file.MSMARCOSegOffset(path, num_workers=32, filename_pattern='msmarco_v2.1_doc_segmented_{shard}.json.gz', id_parser=None, force_load_all=False)[source]
Bases:
RandomAccessReaderRandom-access reader for MSMARCO v2.1 sharded gzip document files.
The MSMARCO v2.1 segmented document corpus is stored as multiple gzip shards. Document IDs encode both the shard number and the byte offset within that shard (e.g.
msmarco_v2.1_doc_segmented_00_123456_0_789), enabling direct O(1) seeks without an external offset map.Uses
rapidgzipfor parallel decompression when available, falling back to the standardgzipmodule. Open file handles per shard are cached incached_fpsfor the lifetime of the reader.Example:
reader = MSMARCOSegOffset("/data/msmarco-v2.1-doc-segmented/") line = reader["msmarco_v2.1_doc_segmented_00_123_0_456"] doc = json.loads(line)
- __init__(path, num_workers=32, filename_pattern='msmarco_v2.1_doc_segmented_{shard}.json.gz', id_parser=None, force_load_all=False)[source]
Initialize the MSMARCO segmented document reader.
- Parameters:
path (Path) – Directory containing the sharded
.json.gzfiles.num_workers (int) – Parallelism passed to
rapidgzipfor decompression (default 32). Ignored when falling back to nativegzip.filename_pattern (str) – Glob/format pattern for shard filenames.
{shard}is replaced with the shard identifier extracted from the document ID.id_parser (callable, optional) –
(doc_id: str) -> (shard, offset)function. Defaults to_parse_idx(), which handles the standard MSMARCO v2.1 ID formatmsmarco_v2.1_doc_segmented_<shard>_<…>_<…>_<offset>. Override for custom ID formats.force_load_all (bool) – When
True, all documents across all shards are loaded into memory at startup. Trades startup time and memory for maximum throughput. DefaultFalseuses on-demand seek-based access.
Registry
- exception routir.processors.registry.ServiceNotFound[source]
Bases:
KeyErrorRaised when a (name, service_type) combo is not registered.
KeyError so existing code that catches KeyError still works, but the distinct type lets transport layers map cleanly to HTTP 400 / gRPC NOT_FOUND.
- class routir.processors.registry.DummyProcessor(engine, method)[source]
Bases:
ProcessorA minimal, no-cache processor that wraps a single engine method.
Used internally by
auto_register()to expose an engine method (search,fuse, etc.) as aProcessorwithout any batching or caching overhead. Requests are dispatched directly to the engine method on every call.Not intended for production workloads where batching matters; prefer the config-based loading path via
load_config(), which createsBatchProcessorinstances.- __init__(engine, method)[source]
Initialize the processor with optional caching.
- Parameters:
cache_size (int) – Maximum number of cached entries.
-1or0disables caching entirely. Whenredis_urlis set, this controls the Redis key-count budget (approximate).cache_ttl (int) – Cache entry time-to-live in seconds (default 600).
cache_key (callable, optional) –
(item: dict) -> hashablefunction to derive a cache key from a request dict. The default key is(service, query, limit, subset). Override when additional request fields affect the result (e.g. passcache_key_fieldsvia a closure, asload_config()does per service).redis_url (str, optional) – Redis connection URL. When provided, Redis is used instead of the in-memory LRU cache.
redis_kwargs (dict) – Additional keyword arguments forwarded to the Redis client.
- routir.processors.registry.auto_register(methods, view_kind=None, **default_init_kwargs)[source]
Class decorator that instantiates an engine and registers it as a processor.
This is a lightweight alternative to the JSON config-based loading path, useful for built-in or singleton engines (e.g. stateless fusion rules like RRF) that do not need per-service batching or caching.
Warning
auto_registercreates a single engine instance withdefault_init_kwargsat decoration time usingDummyProcessor(no batching, no caching). For production engines with GPU models, use the config-based path instead so thatBatchProcessorand LRU/Redis caching are applied.The engine class must implement the
{method}_batchmethod for every method listed. The decorator checks the correspondingcan_{method}property (e.g.can_fusefor"fuse") and raisesTypeErrorif it returnsFalse.The service is registered under the engine’s class name in
ProcessorRegistry.- Parameters:
methods (str or list[str]) – Service type(s) to register. Valid values:
"search","score","fuse","decompose_query".view_kind (str, optional) – Modality of the engine’s
score_batch/search_batchinputs ("text"or"bytes"). When omitted, falls back to the engine class’saccepts_view_kindattribute (which defaults to"text"). Onlyscore/searchslots store this metadata;fuseanddecompose_queryalways register as"text".**default_init_kwargs – Keyword arguments forwarded to
engine_cls()at decoration time.
- Returns:
The unmodified engine class (decorator is transparent).
Example
@auto_register("fuse") class RRFFusion(Engine): async def fuse_batch(self, queries, batch_scores, **kwargs): ... # RRFFusion is now accessible as ProcessorRegistry["RRFFusion"]["fuse"]