Models
This section contains auto-generated documentation for all model classes in Routir.
Base Classes
- class routir.models.abstract.Engine(name=None, config=None, **kwargs)[source]
Bases:
FactoryEnabledAbstract base class for all retrieval and reranking engines.
Subclass this to integrate any retrieval model — dense retrieval, sparse search, cross-encoder reranker, or fusion — with RoutIR’s asynchronous batching and caching infrastructure.
Only implement the methods that apply to your engine. Capabilities are auto-detected by the
can_search,can_score,can_decompose_query, andcan_fuseproperties; unimplemented methods raiseNotImplementedError.Two-layer extension pattern
All RoutIR extensions follow this structure:
A model wrapper class that handles tokenisation and synchronous inference.
An Engine subclass that adapts the wrapper to RoutIR’s async batch interface.
Example
from routir.models.abstract import Engine class MyRerankerEngine(Engine): def __init__(self, name=None, config=None, **kwargs): super().__init__(name, config, **kwargs) self.model = MyModelWrapper( model_path=config.get("model", "org/model"), ) async def score_batch(self, queries, passages, candidate_length=None, **kwargs): # See score_batch docstring for the required pattern. ...
Engines are loaded by class name via the
FactoryEnabledregistry — the class name in the config"engine"field must match exactly.- index_path
Pathparsed fromconfig["index_path"], orNoneif not present.- Type:
Path or None
- __init__(name=None, config=None, **kwargs)[source]
Initialize the engine.
- Parameters:
name (str, optional) – Human-readable engine name. Falls back to
config["name"]if omitted.config (dict or str or Path, optional) –
Engine configuration. Accepted forms:
dict— used directly.strorPathpointing to a JSON file — loaded and parsed.None— treated as an empty dict.
Any extra
**kwargsare merged in and take precedence over the config dict values.Keys consumed by this base class:
"name"— fallback engine name whennamearg isNone."index_path"— path to a search index; exposed asself.index_path(aPathobject).
All other keys (e.g.
"model","ngpus") are preserved inself.configfor subclass use.**kwargs – Merged into
config; kwargs override config-file values.
- async search_batch(queries, limit=20, **kwargs)[source]
Retrieve ranked documents for a batch of queries.
Override this method to implement a search engine (dense, sparse, or hybrid). Return a
dictper query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.Example
async def search_batch(self, queries, limit=20, **kwargs): if isinstance(limit, int): limit = [limit] * len(queries) return [ {hit.docid: hit.score for hit in self.searcher.search(q, k=k)} for q, k in zip(queries, limit) ]
- Parameters:
- Returns:
One result dict per query, mapping document IDs to scores:
[ {"doc1": 12.3, "doc2": 9.8}, # results for queries[0] {"doc3": 7.1}, # results for queries[1] ]
- Return type:
- async score_batch(queries, passages, candidate_length=None, **kwargs)[source]
Score query–passage pairs for a batch of queries.
Override this method to implement a reranker or bi-encoder scorer.
Critical: passages are passed as a flattened list across all queries.
candidate_length[i]tells you how many consecutive entries inpassagesbelong toqueries[i].Example
# Input layout queries = ["query1", "query2" ] candidate_length = [2, 3 ] passages = ["p1", "p2", "p3", "p4", "p5" ] # ^---- query1 ----^ ^------ query2 ---^ # Expected return # [[score_p1, score_p2], [score_p3, score_p4, score_p5]]
Standard implementation skeleton:
async def score_batch(self, queries, passages, candidate_length=None, **kwargs): if candidate_length is None: candidate_length = [len(passages)] # Expand each query to align with its passages expanded_queries = [ queries[i] for i, l in enumerate(candidate_length) for _ in range(l) ] pairs = list(zip(expanded_queries, passages)) # Score all pairs (sync model call is fine inside async) all_scores = self.model.score(pairs) # Re-group by query start = 0 result = [] for l in candidate_length: result.append(all_scores[start:start + l]) start += l return result
- Parameters:
queries (List[str]) –
Nquery strings.passages (List[str]) – Flattened list of all passages for all queries. Total length must equal
sum(candidate_length).candidate_length (List[int], optional) – Number of passages for each query.
candidate_length[i]is the passage count forqueries[i]. Defaults to[len(passages)](all passages belong to the single query).**kwargs – Extra scoring parameters forwarded from the caller.
- Returns:
One score list per query, preserving passage order:
[[score_p1, score_p2], [score_p3, score_p4, score_p5]]
- Return type:
List[List[float]]
- async decompose_query_batch(queries, limit=None, **kwargs)[source]
Decompose each query into sub-queries for query expansion.
Override this method to implement a query expansion engine. Used in pipeline expressions like
expander{retriever}Fusion, where the expander generates sub-queries that are each sent to the retriever and then fused.Example
async def decompose_query_batch(self, queries, limit=None, **kwargs): results = [] for i, query in enumerate(queries): n = limit[i] if limit else 5 sub_queries = self.llm.expand(query, n=n) results.append(sub_queries) return results
- Parameters:
- Returns:
One list of sub-queries per input query:
[["sub-query 1a", "sub-query 1b"], ["sub-query 2a"]]
- Return type:
List[List[str]]
- async fuse_batch(queries, batch_scores, **kwargs)[source]
Fuse multiple ranked lists into a single ranking per query.
Override this method to implement a fusion engine. Used in pipeline expressions like
{dense, sparse}RRF%100, where each system in the braces contributes a ranked list and this method merges them.Example
async def fuse_batch(self, queries, batch_scores, **kwargs): results = [] for scores_list in batch_scores: fused = {} for rank_list in scores_list: for rank, (doc_id, _) in enumerate( sorted(rank_list.items(), key=lambda x: -x[1]) ): fused[doc_id] = fused.get(doc_id, 0) + 1 / (60 + rank + 1) results.append(fused) return results
- Parameters:
queries (List[str]) – Query strings, one per result set to fuse.
batch_scores (List[List[Dict[str, float]]]) –
Outer list has one entry per query. Each entry is a list of ranked-result dicts to fuse:
batch_scores[i] = [ {"doc1": 0.9, "doc2": 0.7}, # results from system A {"doc2": 0.8, "doc3": 0.6}, # results from system B ]
**kwargs – Fusion parameters forwarded from the caller.
- Returns:
One fused result dict per query:
[{"doc1": 0.032, "doc2": 0.030, "doc3": 0.016}]
- Return type:
- class routir.models.abstract.Reranker(name=None, config=None, **kwargs)[source]
Bases:
EngineConvenience base class for rerankers that retrieve candidates from an upstream engine.
Use this instead of
Enginedirectly when your reranker should:Fetch candidate documents from an upstream retrieval service before rescoring.
Look up document text from a RoutIR
/contentendpoint.
If you handle candidate retrieval yourself (e.g. the pipeline supplies candidates), inherit from
Engineand implementscore_batchonly.Configuration keys (inside the
"config"block of your service definition):upstream_service(dict, optional) — Upstream engine to load for first-stage retrieval. Required fields:{ "engine": "PyseriniBM25", "config": {"index_path": "/path/to/bm25-index"} }
text_service(dict, optional) — RoutIR endpoint for document-text lookup, used bysearch_batchto fetch passage text before scoring. Both sub-keys are required if this block is present:{ "endpoint": "http://localhost:5000", "collection": "my-collection" }
rerank_topk_max(int, default 100) — Hard cap on how many candidates the upstream engine fetches, regardless ofrerank_multiplier.rerank_multiplier(float, default 5) — The upstream engine retrieveslimit × rerank_multipliercandidates; the reranker then selects the bestlimit.
Full config example:
{ "name": "my-reranker", "engine": "MyRerankerEngine", "config": { "model": "org/reranker-model", "upstream_service": { "engine": "PyseriniBM25", "config": {"index_path": "/data/bm25-index"} }, "text_service": { "endpoint": "http://localhost:5001", "collection": "my-docs" }, "rerank_topk_max": 200, "rerank_multiplier": 10 } }
When
upstream_serviceis configured, callingsearch_batchon this reranker automatically:Retrieves
limit × rerank_multipliercandidates from the upstream engine.Fetches document text for each candidate via
text_service.Calls your
score_batchimplementation with those candidates.Returns the top-
limitresults.
Subclasses only need to implement
score_batch;search_batchis provided by this class.- upstream
Loaded upstream engine instance, or
Noneifupstream_serviceis not configured.- Type:
Engine or None
- __init__(name=None, config=None, **kwargs)[source]
Initialize the reranker.
Reads
upstream_service,text_service,rerank_topk_max, andrerank_multiplierfromconfig. See the class docstring for the expected structure of each key.
- async get_text(docids)[source]
Retrieve document text for the given document IDs.
- Parameters:
docids (str | List[str]) – Single document ID or list of document IDs
- Returns:
Dict mapping document IDs to their text content
- Raises:
RuntimeError – If no text service is configured
- async search_batch(queries, limit=20, **kwargs)[source]
Retrieve and rerank candidates using the configured upstream engine.
This implementation is provided by
Rerankerand callsscore_batchinternally. Subclasses do not need to override this method; implementscore_batchinstead.- Raises:
RuntimeError – If
upstream_servicewas not configured.
- class routir.models.abstract.Aggregation(passage_mapping)[source]
Bases:
objectMaps passages to documents and aggregates passage scores to document scores.
Used for passage-level retrieval where results need to be aggregated to the document level using MaxP (maximum passage score).
- mapping
Dict mapping passage IDs to document IDs
- maxp(passage_scores)[source]
Aggregate passage scores to document scores using MaxP.
Takes the maximum score among all passages belonging to each document.
- property n_docs
Get number of unique documents.
PLAID-X (ColBERT)
- class routir.models.plaidx.PLAIDX(name='PLAID-X', config=None, **kwargs)[source]
Bases:
EnginePLAID-X engine for late-interaction dense retrieval using ColBERT.
Provides fast and effective neural retrieval with token-level interactions.
- colbert_config
ColBERT configuration
- searcher
ColBERT searcher instance
- passage_mapper
Optional mapping for passage-to-document aggregation
- subset_mapper
Optional mapping of document IDs to subsets
- inference_batch_size
Batch size for inference operations
- async search_batch(queries, limit=20, subsets=None, maxp=True)[source]
Retrieve ranked documents for a batch of queries.
Override this method to implement a search engine (dense, sparse, or hybrid). Return a
dictper query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.Example
async def search_batch(self, queries, limit=20, **kwargs): if isinstance(limit, int): limit = [limit] * len(queries) return [ {hit.docid: hit.score for hit in self.searcher.search(q, k=k)} for q, k in zip(queries, limit) ]
- Parameters:
- Returns:
One result dict per query, mapping document IDs to scores:
[ {"doc1": 12.3, "doc2": 9.8}, # results for queries[0] {"doc3": 7.1}, # results for queries[1] ]
- Return type:
- async score_batch(queries, passages, candidate_length)[source]
Score query–passage pairs for a batch of queries.
Override this method to implement a reranker or bi-encoder scorer.
Critical: passages are passed as a flattened list across all queries.
candidate_length[i]tells you how many consecutive entries inpassagesbelong toqueries[i].Example
# Input layout queries = ["query1", "query2" ] candidate_length = [2, 3 ] passages = ["p1", "p2", "p3", "p4", "p5" ] # ^---- query1 ----^ ^------ query2 ---^ # Expected return # [[score_p1, score_p2], [score_p3, score_p4, score_p5]]
Standard implementation skeleton:
async def score_batch(self, queries, passages, candidate_length=None, **kwargs): if candidate_length is None: candidate_length = [len(passages)] # Expand each query to align with its passages expanded_queries = [ queries[i] for i, l in enumerate(candidate_length) for _ in range(l) ] pairs = list(zip(expanded_queries, passages)) # Score all pairs (sync model call is fine inside async) all_scores = self.model.score(pairs) # Re-group by query start = 0 result = [] for l in candidate_length: result.append(all_scores[start:start + l]) start += l return result
- Parameters:
queries (List[str]) –
Nquery strings.passages (List[str]) – Flattened list of all passages for all queries. Total length must equal
sum(candidate_length).candidate_length (List[int], optional) – Number of passages for each query.
candidate_length[i]is the passage count forqueries[i]. Defaults to[len(passages)](all passages belong to the single query).**kwargs – Extra scoring parameters forwarded from the caller.
- Returns:
One score list per query, preserving passage order:
[[score_p1, score_p2], [score_p3, score_p4, score_p5]]
- Return type:
List[List[float]]
LSR (SPLADE)
- class routir.models.lsr.LSR(name='LSR', config=None, **kwargs)[source]
Bases:
EngineLearned Sparse Retrieval engine using SPLADE model as the query encoder.
Performs sparse retrieval with learned term weights using Anserini indexes.
- anserini
Anserini index interface
- model
SPLADE model for query encoding
- subset_mapper
Optional mapping of document IDs to subsets
- async search_batch(queries, limit=20, subsets=None, maxp=True)[source]
Retrieve ranked documents for a batch of queries.
Override this method to implement a search engine (dense, sparse, or hybrid). Return a
dictper query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.Example
async def search_batch(self, queries, limit=20, **kwargs): if isinstance(limit, int): limit = [limit] * len(queries) return [ {hit.docid: hit.score for hit in self.searcher.search(q, k=k)} for q, k in zip(queries, limit) ]
- Parameters:
- Returns:
One result dict per query, mapping document IDs to scores:
[ {"doc1": 12.3, "doc2": 9.8}, # results for queries[0] {"doc3": 7.1}, # results for queries[1] ]
- Return type:
mT5
- class routir.models.mt5.MT5Reranker(name=None, config=None, **kwargs)[source]
Bases:
RerankermT5-based reranker for passage scoring.
Uses sequence-to-sequence T5 models fine-tuned for relevance scoring.
- prompt
Template for formatting query-document pairs
- model
T5 model instance
- tokenizer
T5 tokenizer
- q_max_length
Maximum query length
- d_max_length
Maximum document length
- batch_size
Batch size for inference
- token_false_id
Token ID for “false” prediction
- token_true_id
Token ID for “true” prediction
- prompt = 'Query: {query} Document: {document} Relevant:'
Qwen3
Relay
Fusion
- class routir.models.fusion.Fusion(name=None, config=None, **kwargs)[source]
Bases:
EngineEngine that fuses results from multiple upstream engines.
Retrieves results from multiple engines and combines them using a fusion method (RRF or score-based).
- upstream
List of upstream engines
- fusion_function
Function to use for fusion
- fusion_args
Arguments for the fusion function
- __init__(name=None, config=None, **kwargs)[source]
Initialize fusion engine.
- Parameters:
name – Engine name
config – Must contain ‘upstream_service’ list
**kwargs – Additional configuration
- async search_batch(queries, limit=20, **kwargs)[source]
Retrieve ranked documents for a batch of queries.
Override this method to implement a search engine (dense, sparse, or hybrid). Return a
dictper query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.Example
async def search_batch(self, queries, limit=20, **kwargs): if isinstance(limit, int): limit = [limit] * len(queries) return [ {hit.docid: hit.score for hit in self.searcher.search(q, k=k)} for q, k in zip(queries, limit) ]
- Parameters:
- Returns:
One result dict per query, mapping document IDs to scores:
[ {"doc1": 12.3, "doc2": 9.8}, # results for queries[0] {"doc3": 7.1}, # results for queries[1] ]
- Return type: