Models

This section contains auto-generated documentation for all model classes in Routir.

Base Classes

class routir.models.abstract.Engine(name=None, config=None, **kwargs)[source]

Bases: FactoryEnabled

Abstract base class for all retrieval and reranking engines.

Subclass this to integrate any retrieval model — dense retrieval, sparse search, cross-encoder reranker, or fusion — with RoutIR’s asynchronous batching and caching infrastructure.

Only implement the methods that apply to your engine. Capabilities are auto-detected by the can_search, can_score, can_decompose_query, and can_fuse properties; unimplemented methods raise NotImplementedError.

Two-layer extension pattern

All RoutIR extensions follow this structure:

  1. A model wrapper class that handles tokenisation and synchronous inference.

  2. An Engine subclass that adapts the wrapper to RoutIR’s async batch interface.

Example

from routir.models.abstract import Engine

class MyRerankerEngine(Engine):
    def __init__(self, name=None, config=None, **kwargs):
        super().__init__(name, config, **kwargs)
        self.model = MyModelWrapper(
            model_path=config.get("model", "org/model"),
        )

    async def score_batch(self, queries, passages, candidate_length=None, **kwargs):
        # See score_batch docstring for the required pattern.
        ...

Engines are loaded by class name via the FactoryEnabled registry — the class name in the config "engine" field must match exactly.

name

Engine identifier used in API responses and logs.

Type:

str

config

Merged configuration (from the config arg plus kwargs).

Type:

dict

index_path

Path parsed from config["index_path"], or None if not present.

Type:

Path or None

__init__(name=None, config=None, **kwargs)[source]

Initialize the engine.

Parameters:
  • name (str, optional) – Human-readable engine name. Falls back to config["name"] if omitted.

  • config (dict or str or Path, optional) –

    Engine configuration. Accepted forms:

    • dict — used directly.

    • str or Path pointing to a JSON file — loaded and parsed.

    • None — treated as an empty dict.

    Any extra **kwargs are merged in and take precedence over the config dict values.

    Keys consumed by this base class:

    • "name" — fallback engine name when name arg is None.

    • "index_path" — path to a search index; exposed as self.index_path (a Path object).

    All other keys (e.g. "model", "ngpus") are preserved in self.config for subclass use.

  • **kwargs – Merged into config; kwargs override config-file values.

async search_batch(queries, limit=20, **kwargs)[source]

Retrieve ranked documents for a batch of queries.

Override this method to implement a search engine (dense, sparse, or hybrid). Return a dict per query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.

Example

async def search_batch(self, queries, limit=20, **kwargs):
    if isinstance(limit, int):
        limit = [limit] * len(queries)
    return [
        {hit.docid: hit.score
         for hit in self.searcher.search(q, k=k)}
        for q, k in zip(queries, limit)
    ]
Parameters:
  • queries (List[str]) – Batch of query strings, e.g. ["What is AI?", "BERT paper"].

  • limit (int or List[int]) – Maximum results per query. A single int applies the same limit to every query; a List[int] sets per-query limits.

  • **kwargs – Forwarded from the caller (e.g. pipeline runtime kwargs).

Returns:

One result dict per query, mapping document IDs to scores:

[
    {"doc1": 12.3, "doc2": 9.8},  # results for queries[0]
    {"doc3": 7.1},                 # results for queries[1]
]

Return type:

List[Dict[str, float]]

async search(query, limit=20, **kwargs)[source]

Perform single query search.

async score_batch(queries, passages, candidate_length=None, **kwargs)[source]

Score query–passage pairs for a batch of queries.

Override this method to implement a reranker or bi-encoder scorer.

Critical: passages are passed as a flattened list across all queries. candidate_length[i] tells you how many consecutive entries in passages belong to queries[i].

Example

# Input layout
queries          = ["query1",            "query2"          ]
candidate_length = [2,                   3                 ]
passages         = ["p1", "p2",          "p3", "p4", "p5"  ]
#                   ^---- query1 ----^   ^------ query2 ---^

# Expected return
# [[score_p1, score_p2], [score_p3, score_p4, score_p5]]

Standard implementation skeleton:

async def score_batch(self, queries, passages, candidate_length=None, **kwargs):
    if candidate_length is None:
        candidate_length = [len(passages)]

    # Expand each query to align with its passages
    expanded_queries = [
        queries[i]
        for i, l in enumerate(candidate_length)
        for _ in range(l)
    ]
    pairs = list(zip(expanded_queries, passages))

    # Score all pairs (sync model call is fine inside async)
    all_scores = self.model.score(pairs)

    # Re-group by query
    start = 0
    result = []
    for l in candidate_length:
        result.append(all_scores[start:start + l])
        start += l
    return result
Parameters:
  • queries (List[str]) – N query strings.

  • passages (List[str]) – Flattened list of all passages for all queries. Total length must equal sum(candidate_length).

  • candidate_length (List[int], optional) – Number of passages for each query. candidate_length[i] is the passage count for queries[i]. Defaults to [len(passages)] (all passages belong to the single query).

  • **kwargs – Extra scoring parameters forwarded from the caller.

Returns:

One score list per query, preserving passage order:

[[score_p1, score_p2], [score_p3, score_p4, score_p5]]

Return type:

List[List[float]]

async score(query, passages, **kwargs)[source]

Score a single query against multiple passages.

async decompose_query_batch(queries, limit=None, **kwargs)[source]

Decompose each query into sub-queries for query expansion.

Override this method to implement a query expansion engine. Used in pipeline expressions like expander{retriever}Fusion, where the expander generates sub-queries that are each sent to the retriever and then fused.

Example

async def decompose_query_batch(self, queries, limit=None, **kwargs):
    results = []
    for i, query in enumerate(queries):
        n = limit[i] if limit else 5
        sub_queries = self.llm.expand(query, n=n)
        results.append(sub_queries)
    return results
Parameters:
  • queries (List[str]) – Original query strings to expand.

  • limit (List[int], optional) – Maximum number of sub-queries per query. limit[i] applies to queries[i]. None means no limit (engine decides).

  • **kwargs – Additional parameters forwarded from the caller.

Returns:

One list of sub-queries per input query:

[["sub-query 1a", "sub-query 1b"], ["sub-query 2a"]]

Return type:

List[List[str]]

async decompose_query(query, **kwargs)[source]

Decompose a single query into sub-queries.

async fuse_batch(queries, batch_scores, **kwargs)[source]

Fuse multiple ranked lists into a single ranking per query.

Override this method to implement a fusion engine. Used in pipeline expressions like {dense, sparse}RRF%100, where each system in the braces contributes a ranked list and this method merges them.

Example

async def fuse_batch(self, queries, batch_scores, **kwargs):
    results = []
    for scores_list in batch_scores:
        fused = {}
        for rank_list in scores_list:
            for rank, (doc_id, _) in enumerate(
                sorted(rank_list.items(), key=lambda x: -x[1])
            ):
                fused[doc_id] = fused.get(doc_id, 0) + 1 / (60 + rank + 1)
        results.append(fused)
    return results
Parameters:
  • queries (List[str]) – Query strings, one per result set to fuse.

  • batch_scores (List[List[Dict[str, float]]]) –

    Outer list has one entry per query. Each entry is a list of ranked-result dicts to fuse:

    batch_scores[i] = [
        {"doc1": 0.9, "doc2": 0.7},  # results from system A
        {"doc2": 0.8, "doc3": 0.6},  # results from system B
    ]
    

  • **kwargs – Fusion parameters forwarded from the caller.

Returns:

One fused result dict per query:

[{"doc1": 0.032, "doc2": 0.030, "doc3": 0.016}]

Return type:

List[Dict[str, float]]

async fuse(query, scores, **kwargs)[source]

Fuse multiple result sets for a single query.

Check if this engine implements search functionality.

property can_score: bool

Check if this engine implements scoring functionality.

property can_decompose_query: bool

Check if this engine implements query decomposition.

property can_fuse: bool

Check if this engine implements result fusion.

class routir.models.abstract.Reranker(name=None, config=None, **kwargs)[source]

Bases: Engine

Convenience base class for rerankers that retrieve candidates from an upstream engine.

Use this instead of Engine directly when your reranker should:

  • Fetch candidate documents from an upstream retrieval service before rescoring.

  • Look up document text from a RoutIR /content endpoint.

If you handle candidate retrieval yourself (e.g. the pipeline supplies candidates), inherit from Engine and implement score_batch only.

Configuration keys (inside the "config" block of your service definition):

  • upstream_service (dict, optional) — Upstream engine to load for first-stage retrieval. Required fields:

    {
        "engine": "PyseriniBM25",
        "config": {"index_path": "/path/to/bm25-index"}
    }
    
  • text_service (dict, optional) — RoutIR endpoint for document-text lookup, used by search_batch to fetch passage text before scoring. Both sub-keys are required if this block is present:

    {
        "endpoint": "http://localhost:5000",
        "collection": "my-collection"
    }
    
  • rerank_topk_max (int, default 100) — Hard cap on how many candidates the upstream engine fetches, regardless of rerank_multiplier.

  • rerank_multiplier (float, default 5) — The upstream engine retrieves limit × rerank_multiplier candidates; the reranker then selects the best limit.

Full config example:

{
    "name": "my-reranker",
    "engine": "MyRerankerEngine",
    "config": {
        "model": "org/reranker-model",
        "upstream_service": {
            "engine": "PyseriniBM25",
            "config": {"index_path": "/data/bm25-index"}
        },
        "text_service": {
            "endpoint": "http://localhost:5001",
            "collection": "my-docs"
        },
        "rerank_topk_max": 200,
        "rerank_multiplier": 10
    }
}

When upstream_service is configured, calling search_batch on this reranker automatically:

  1. Retrieves limit × rerank_multiplier candidates from the upstream engine.

  2. Fetches document text for each candidate via text_service.

  3. Calls your score_batch implementation with those candidates.

  4. Returns the top-limit results.

Subclasses only need to implement score_batch; search_batch is provided by this class.

upstream

Loaded upstream engine instance, or None if upstream_service is not configured.

Type:

Engine or None

text_service

Parsed text-service config dict, or None.

Type:

dict or None

rerank_topk_max

Maximum candidates passed to the reranker.

Type:

int

rerank_multiplier

Upstream over-retrieval factor.

Type:

float

__init__(name=None, config=None, **kwargs)[source]

Initialize the reranker.

Reads upstream_service, text_service, rerank_topk_max, and rerank_multiplier from config. See the class docstring for the expected structure of each key.

Parameters:
  • name (str, optional) – Reranker name.

  • config (dict or str or Path, optional) – Configuration dict or path. See Engine for accepted forms.

  • **kwargs – Merged into config; kwargs override config values.

async get_text(docids)[source]

Retrieve document text for the given document IDs.

Parameters:

docids (str | List[str]) – Single document ID or list of document IDs

Returns:

Dict mapping document IDs to their text content

Raises:

RuntimeError – If no text service is configured

async search_batch(queries, limit=20, **kwargs)[source]

Retrieve and rerank candidates using the configured upstream engine.

This implementation is provided by Reranker and calls score_batch internally. Subclasses do not need to override this method; implement score_batch instead.

Raises:

RuntimeError – If upstream_service was not configured.

class routir.models.abstract.Aggregation(passage_mapping)[source]

Bases: object

Maps passages to documents and aggregates passage scores to document scores.

Used for passage-level retrieval where results need to be aggregated to the document level using MaxP (maximum passage score).

mapping

Dict mapping passage IDs to document IDs

__init__(passage_mapping)[source]

Initialize with passage-to-document mapping.

Parameters:

passage_mapping (Dict[str, str]) – Dict mapping passage IDs to document IDs

__contains__(pid)[source]

Check if passage ID exists in mapping.

__getitem__(pid)[source]

Get document ID for a passage ID.

maxp(passage_scores)[source]

Aggregate passage scores to document scores using MaxP.

Takes the maximum score among all passages belonging to each document.

Parameters:

passage_scores (Dict[str, float]) – Dict mapping passage IDs to scores

Returns:

Dict mapping document IDs to their maximum passage scores

Return type:

Dict[str, float]

property n_docs

Get number of unique documents.

__len__()[source]

Get number of passages.

PLAID-X (ColBERT)

routir.models.plaidx.colbert_all_pair_scores(Q, D, Dm=None)[source]
class routir.models.plaidx.PLAIDX(name='PLAID-X', config=None, **kwargs)[source]

Bases: Engine

PLAID-X engine for late-interaction dense retrieval using ColBERT.

Provides fast and effective neural retrieval with token-level interactions.

colbert_config

ColBERT configuration

searcher

ColBERT searcher instance

passage_mapper

Optional mapping for passage-to-document aggregation

subset_mapper

Optional mapping of document IDs to subsets

inference_batch_size

Batch size for inference operations

__init__(name='PLAID-X', config=None, **kwargs)[source]

Initialize PLAID-X engine.

Parameters:
  • name (str) – Engine name

  • config (str | Path | Dict[str, Any]) – Configuration with index_path, checkpoint, use_gpu, etc.

  • **kwargs – Additional configuration

filter_subset(scores, only_subset=None)[source]
async search_batch(queries, limit=20, subsets=None, maxp=True)[source]

Retrieve ranked documents for a batch of queries.

Override this method to implement a search engine (dense, sparse, or hybrid). Return a dict per query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.

Example

async def search_batch(self, queries, limit=20, **kwargs):
    if isinstance(limit, int):
        limit = [limit] * len(queries)
    return [
        {hit.docid: hit.score
         for hit in self.searcher.search(q, k=k)}
        for q, k in zip(queries, limit)
    ]
Parameters:
  • queries (List[str]) – Batch of query strings, e.g. ["What is AI?", "BERT paper"].

  • limit (int or List[int]) – Maximum results per query. A single int applies the same limit to every query; a List[int] sets per-query limits.

  • **kwargs – Forwarded from the caller (e.g. pipeline runtime kwargs).

Returns:

One result dict per query, mapping document IDs to scores:

[
    {"doc1": 12.3, "doc2": 9.8},  # results for queries[0]
    {"doc3": 7.1},                 # results for queries[1]
]

Return type:

List[Dict[str, float]]

async score_batch(queries, passages, candidate_length)[source]

Score query–passage pairs for a batch of queries.

Override this method to implement a reranker or bi-encoder scorer.

Critical: passages are passed as a flattened list across all queries. candidate_length[i] tells you how many consecutive entries in passages belong to queries[i].

Example

# Input layout
queries          = ["query1",            "query2"          ]
candidate_length = [2,                   3                 ]
passages         = ["p1", "p2",          "p3", "p4", "p5"  ]
#                   ^---- query1 ----^   ^------ query2 ---^

# Expected return
# [[score_p1, score_p2], [score_p3, score_p4, score_p5]]

Standard implementation skeleton:

async def score_batch(self, queries, passages, candidate_length=None, **kwargs):
    if candidate_length is None:
        candidate_length = [len(passages)]

    # Expand each query to align with its passages
    expanded_queries = [
        queries[i]
        for i, l in enumerate(candidate_length)
        for _ in range(l)
    ]
    pairs = list(zip(expanded_queries, passages))

    # Score all pairs (sync model call is fine inside async)
    all_scores = self.model.score(pairs)

    # Re-group by query
    start = 0
    result = []
    for l in candidate_length:
        result.append(all_scores[start:start + l])
        start += l
    return result
Parameters:
  • queries (List[str]) – N query strings.

  • passages (List[str]) – Flattened list of all passages for all queries. Total length must equal sum(candidate_length).

  • candidate_length (List[int], optional) – Number of passages for each query. candidate_length[i] is the passage count for queries[i]. Defaults to [len(passages)] (all passages belong to the single query).

  • **kwargs – Extra scoring parameters forwarded from the caller.

Returns:

One score list per query, preserving passage order:

[[score_p1, score_p2], [score_p3, score_p4, score_p5]]

Return type:

List[List[float]]

LSR (SPLADE)

class routir.models.lsr.LSR(name='LSR', config=None, **kwargs)[source]

Bases: Engine

Learned Sparse Retrieval engine using SPLADE model as the query encoder.

Performs sparse retrieval with learned term weights using Anserini indexes.

anserini

Anserini index interface

model

SPLADE model for query encoding

subset_mapper

Optional mapping of document IDs to subsets

__init__(name='LSR', config=None, **kwargs)[source]

Initialize LSR engine.

Parameters:
  • name (str) – Engine name

  • config (str | Path | Dict[str, Any]) – Configuration with index_path, model_name, max_length, etc.

  • **kwargs – Additional configuration

filter_subset(scores, only_subset=None)[source]
async search_batch(queries, limit=20, subsets=None, maxp=True)[source]

Retrieve ranked documents for a batch of queries.

Override this method to implement a search engine (dense, sparse, or hybrid). Return a dict per query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.

Example

async def search_batch(self, queries, limit=20, **kwargs):
    if isinstance(limit, int):
        limit = [limit] * len(queries)
    return [
        {hit.docid: hit.score
         for hit in self.searcher.search(q, k=k)}
        for q, k in zip(queries, limit)
    ]
Parameters:
  • queries (List[str]) – Batch of query strings, e.g. ["What is AI?", "BERT paper"].

  • limit (int or List[int]) – Maximum results per query. A single int applies the same limit to every query; a List[int] sets per-query limits.

  • **kwargs – Forwarded from the caller (e.g. pipeline runtime kwargs).

Returns:

One result dict per query, mapping document IDs to scores:

[
    {"doc1": 12.3, "doc2": 9.8},  # results for queries[0]
    {"doc3": 7.1},                 # results for queries[1]
]

Return type:

List[Dict[str, float]]

mT5

class routir.models.mt5.MT5Reranker(name=None, config=None, **kwargs)[source]

Bases: Reranker

mT5-based reranker for passage scoring.

Uses sequence-to-sequence T5 models fine-tuned for relevance scoring.

prompt

Template for formatting query-document pairs

model

T5 model instance

tokenizer

T5 tokenizer

q_max_length

Maximum query length

d_max_length

Maximum document length

batch_size

Batch size for inference

token_false_id

Token ID for “false” prediction

token_true_id

Token ID for “true” prediction

prompt = 'Query: {query} Document: {document} Relevant:'
__init__(name=None, config=None, **kwargs)[source]

Initialize MT5 reranker.

Parameters:
  • name – Reranker name

  • config – Configuration with model_name_or_path, max_lengths, etc.

  • **kwargs – Additional configuration

tokenize_pairs(batch)[source]
forward(input_ids, attention_mask=None)[source]
async score(queries, passages, candidate_length=None, with_progress=False, **kwargs)[source]

Score a single query against multiple passages.

Qwen3

Relay

Fusion

class routir.models.fusion.Fusion(name=None, config=None, **kwargs)[source]

Bases: Engine

Engine that fuses results from multiple upstream engines.

Retrieves results from multiple engines and combines them using a fusion method (RRF or score-based).

upstream

List of upstream engines

fusion_function

Function to use for fusion

fusion_args

Arguments for the fusion function

__init__(name=None, config=None, **kwargs)[source]

Initialize fusion engine.

Parameters:
  • name – Engine name

  • config – Must contain ‘upstream_service’ list

  • **kwargs – Additional configuration

async search_batch(queries, limit=20, **kwargs)[source]

Retrieve ranked documents for a batch of queries.

Override this method to implement a search engine (dense, sparse, or hybrid). Return a dict per query mapping document IDs (strings) to relevance scores; higher scores mean higher relevance.

Example

async def search_batch(self, queries, limit=20, **kwargs):
    if isinstance(limit, int):
        limit = [limit] * len(queries)
    return [
        {hit.docid: hit.score
         for hit in self.searcher.search(q, k=k)}
        for q, k in zip(queries, limit)
    ]
Parameters:
  • queries (List[str]) – Batch of query strings, e.g. ["What is AI?", "BERT paper"].

  • limit (int or List[int]) – Maximum results per query. A single int applies the same limit to every query; a List[int] sets per-query limits.

  • **kwargs – Forwarded from the caller (e.g. pipeline runtime kwargs).

Returns:

One result dict per query, mapping document IDs to scores:

[
    {"doc1": 12.3, "doc2": 9.8},  # results for queries[0]
    {"doc3": 7.1},                 # results for queries[1]
]

Return type:

List[Dict[str, float]]

class routir.models.fusion.RRF(name=None, config=None, **kwargs)[source]

Bases: Engine

Reciprocal Rank Fusion engine for combining result lists.

async fuse_batch(queries, batch_rankings, **kwargs)[source]

Fuse multiple rankings using RRF algorithm.

class routir.models.fusion.ScoreFusion(name=None, config=None, **kwargs)[source]

Bases: Engine

Score-based fusion engine for combining result lists.

async fuse_batch(queries, batch_rankings, **kwargs)[source]

Fuse multiple rankings by summing scores.

LLM Rerankers