Processors

This section contains auto-generated documentation for all processor classes in Routir. Processors handle request batching, caching, and content processing.

Base Classes

class routir.processors.abstract.Processor(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Bases: FactoryEnabled

Base class for request processors with caching support.

Handles request submission with optional caching via LRU or Redis.

cache

Cache instance (LRU or Redis)

cache_key

Function to generate cache keys from requests

__init__(cache_size=1024, cache_ttl=600, cache_key=None, redis_url=None, redis_kwargs={})[source]

Initialize processor with caching.

Parameters:
  • cache_size – Maximum cache entries (-1 to disable)

  • cache_ttl – Cache entry time-to-live in seconds

  • cache_key – Function to generate cache keys from requests

  • redis_url (str) – Optional Redis URL for distributed caching

  • redis_kwargs (Dict[str, Any]) – Additional Redis configuration

async start()[source]

Initialize the processor (called before serving requests).

Heavy initialization tasks can be performed here.

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Any) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

class routir.processors.abstract.BatchProcessor(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

__init__(batch_size=32, max_wait_time=0.1, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Simple dynamic batch processor for batching similar requests.

Parameters:
  • batch_size – Maximum number of items in a batch

  • max_wait_time – Maximum time to wait before processing a partial batch

async start()[source]

Initialize and start the batch processor.

Query Processors

class routir.processors.query_processors.AsyncQueryProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize processor with caching.

Parameters:
  • cache_size – Maximum cache entries (-1 to disable)

  • cache_ttl – Cache entry time-to-live in seconds

  • cache_key – Function to generate cache keys from requests

  • redis_url – Optional Redis URL for distributed caching

  • redis_kwargs – Additional Redis configuration

class routir.processors.query_processors.BatchQueryProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Simple dynamic batch processor for batching similar requests.

Parameters:
  • batch_size – Maximum number of items in a batch

  • max_wait_time – Maximum time to wait before processing a partial batch

Content Processors

class routir.processors.content_processors.ContentProcessor(collection_config, cache_size=0, cache_ttl=600)[source]

Bases: Processor

Processor for retrieving document content by ID.

Provides fast random access to documents in JSONL files using offset maps.

config

Collection configuration

line_reader

Random access reader for document file

content_field

Field(s) containing document text

lang_mapping

Optional mapping of document IDs to language codes

__init__(collection_config, cache_size=0, cache_ttl=600)[source]

Initialize content processor.

Parameters:
  • collection_config (ColllectionConfig) – Collection configuration with doc_path, id_field, etc.

  • cache_size – Maximum cache entries

  • cache_ttl – Cache TTL in seconds

Score Processors

class routir.processors.score_processors.AsyncPairwiseScoreProcessor(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Bases: Processor

Processor that serve every request independently through async calls. Should be used when all the engine does is issuing async calls

__init__(engine, cache_size=1024, cache_ttl=600, cache_key=None, **kwargs)[source]

Initialize processor with caching.

Parameters:
  • cache_size – Maximum cache entries (-1 to disable)

  • cache_ttl – Cache entry time-to-live in seconds

  • cache_key – Function to generate cache keys from requests

  • redis_url – Optional Redis URL for distributed caching

  • redis_kwargs – Additional Redis configuration

class routir.processors.score_processors.BatchPairwiseScoreProcessor(engine, **kwargs)[source]

Bases: BatchProcessor

__init__(engine, **kwargs)[source]

Simple dynamic batch processor for batching similar requests.

Parameters:
  • batch_size – Maximum number of items in a batch

  • max_wait_time – Maximum time to wait before processing a partial batch

Cache

class routir.processors.cache.Cache(capacity=-1, ttl=-1)[source]

Bases: ABC

Abstract base class for cache implementations.

__init__(capacity=-1, ttl=-1)[source]

Initialize cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

abstractmethod async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

abstractmethod async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

abstractmethod async remove(key)[source]

Remove an item from the cache.

Parameters:

key (str) – The cache key

abstractmethod async clear()[source]

Clear all items from the cache.

abstractmethod async __len__()[source]

Return the number of items in the cache.

async connect()[source]

Connect to cache backend (if needed). Override in subclasses.

async close()[source]

Close cache connection (if needed). Override in subclasses.

async __aenter__()[source]

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Async context manager exit.

class routir.processors.cache.LRUCache(capacity=-1, ttl=-1)[source]

Bases: Cache

An async LRU cache implementation.

__init__(capacity=-1, ttl=-1)[source]

Initialize LRU cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear the cache.

async __len__()[source]

Return the number of items in the cache.

class routir.processors.cache.RedisCache(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Bases: Cache

A Redis-based cache implementation with async support.

__init__(capacity=-1, ttl=-1, redis_url='redis://localhost:6379', key_prefix='routircache:', **redis_kwargs)[source]

Initialize Redis cache.

Parameters:
  • capacity (int) – Maximum number of items to store (-1 for unlimited, not enforced by Redis)

  • ttl (float) – Time-to-live in seconds for cache entries (-1 for no expiry)

  • redis_url (str) – Redis connection URL

  • key_prefix (str) – Prefix for all cache keys to avoid collisions

  • **redis_kwargs – Additional arguments to pass to Redis client

async connect()[source]

Establish connection to Redis.

async close()[source]

Close Redis connection.

async get(key)[source]

Get an item from the cache.

Parameters:

key (str) – The cache key

Returns:

The cached value or None if not found or expired

Return type:

Any | None

async put(key, value)[source]

Add an item to the cache.

Parameters:
  • key (str) – The cache key

  • value (Any) – The value to cache

async remove(key)[source]

Remove an item from the cache.

async clear()[source]

Clear all cache entries with the current prefix.

async __len__()[source]

Return the number of items in the cache with the current prefix.

File Random Access Reader

Registry

class routir.processors.registry.DummyProcessor(engine, method)[source]

Bases: Processor

__init__(engine, method)[source]

Initialize processor with caching.

Parameters:
  • cache_size – Maximum cache entries (-1 to disable)

  • cache_ttl – Cache entry time-to-live in seconds

  • cache_key – Function to generate cache keys from requests

  • redis_url – Optional Redis URL for distributed caching

  • redis_kwargs – Additional Redis configuration

async submit(item)[source]

Submit a request for processing with caching.

Parameters:

item (Dict[str, Any]) – Request data

Returns:

Response dict with ‘cached’ field indicating cache hit/miss

Return type:

Dict[str, Any]

routir.processors.registry.auto_register(methods, **default_init_kwargs)[source]

Decorator to auto-register engines as processors.

Parameters:
  • methods (str | List[str]) – Service type(s) to register (‘search’, ‘score’, ‘fuse’, etc.)

  • **default_init_kwargs – Default initialization kwargs for the engine

Returns:

Decorator function

Example

@auto_register(“fuse”) class MyFusionEngine(Engine):