Pipeline

This section contains auto-generated documentation for pipeline components.

Pipeline Module

class routir.pipeline.pipeline.SearchPipeline(pipeline, collection, runtime_kwargs=None, verify=True)[source]

Bases: object

Executes a RoutIR pipeline defined by the pipeline DSL.

A SearchPipeline is constructed from a parsed PipelineComponent tree (or directly from a DSL string via from_string()) and runs all stages recursively, passing results between them.

Pipeline DSL syntax (see routir.pipeline.parser for full reference):

# Simple retrieval
"bm25%20"

# Sequential reranking: retrieve 100, rerank to top 20
"bm25%100 >> my-reranker%20"

# Parallel retrieval + fusion
"{dense, sparse}RRF%100"

# Query expansion + parallel + fusion
"expander{dense, sparse}RRF%100"

# Named alias for per-stage runtime_kwargs
"dense[d]%100 >> reranker%20"

runtime_kwargs

Pass per-stage parameters at query time by giving a stage an alias with [alias] and providing a dict keyed by that alias:

pipeline = SearchPipeline.from_string(
    "dense[d]%100 >> reranker%20",
    collection="my-docs",
    runtime_kwargs={"d": {"instruction": "Retrieve relevant passages"}},
)

Verification

At construction time the pipeline checks that every referenced service is registered in ProcessorRegistry. Reranking stages additionally require a content service for the collection (to fetch document text for rescoring).

pipeline

Parsed pipeline AST root node.

Type:

PipelineComponent

collection

Collection name used to fetch document text during reranking stages.

Type:

str

runtime_kwargs

Per-alias extra parameters injected into service call payloads at execution time. Keys are pipeline aliases (defaulting to the service name); values are dicts merged into the request.

Type:

dict

doc_content_cache

Per-run cache mapping document IDs to their text content, populated lazily during reranking and shared across all stages within a single run() call.

Type:

dict

__init__(pipeline, collection, runtime_kwargs=None, verify=True)[source]

Initialize search pipeline.

Parameters:
  • pipeline (PipelineComponent) – Parsed pipeline AST, typically produced by parse().

  • collection (str) – Document collection name. Must be registered as a "content" service in ProcessorRegistry when any reranking stage is present.

  • runtime_kwargs (dict, optional) –

    Per-alias extra parameters injected into individual service call payloads at execution time. Keys are pipeline aliases; values are dicts. Example:

    {"dense": {"instruction": "Retrieve relevant passages"}}
    

  • verify (bool) – If True (default), raise RuntimeError at construction time if any referenced service is missing from the registry, or if a reranking stage is present but no content service is configured for the collection.

verify()[source]

Verify that all required services exist in the registry.

classmethod from_string(pipeline_string, collection, runtime_kwargs=None, verify=True)[source]

Create a pipeline from a DSL string.

This is the primary constructor for most use cases.

Parameters:
  • pipeline_string (str) –

    Pipeline specification. Examples:

    "bm25%20"                          # simple retrieval, top 20
    "bm25%100 >> reranker%20"          # sequential reranking
    "{dense, sparse}RRF%100"           # parallel + fusion
    "expander{dense, sparse}RRF%100"   # query expansion
    "dense[d]%100 >> reranker%20"      # with alias for runtime_kwargs
    

  • collection (str) – Document collection name for content lookup during reranking.

  • runtime_kwargs (dict, optional) –

    Per-alias extra parameters injected into service payloads at query time. Keys are pipeline aliases (the name inside [...]; defaults to service name). Example:

    {"d": {"instruction": "Given a query, retrieve passages…"}}
    

  • verify (bool) – Verify that all referenced services exist in the registry at construction time (default: True).

Returns:

Ready-to-run pipeline instance.

Return type:

SearchPipeline

async get_doc_content(doc_id)[source]

Retrieve and cache document content.

Parameters:

doc_id (str) – Document identifier

Returns:

Document text content

async run(query, last_output=None, current_node=None, scratch=None)[source]

Recursively execute the pipeline for a single query.

For most callers, only query needs to be provided — the remaining parameters are used internally during recursive traversal of the AST.

The method dispatches on the type of current_node:

  • CallSequence — iterates over stages, passing last_output forward at each step.

  • ParallelCallSequences — optionally runs the expander first, then fans out to all branches concurrently, and finally calls the merger with all ranked lists.

  • SystemCall — submits one request to the appropriate processor and applies the %limit if set.

Parameters:
  • query (str) – The user query string.

  • last_output (dict, optional) – Output dict from the previous pipeline stage, threaded automatically during recursion. Callers should leave this as None. After a search stage it contains {"scores": {docid: float, …}}; after an expander stage it contains {"queries": [str, …]}.

  • current_node (PipelineComponent, optional) – The AST node to execute. Defaults to the root of the pipeline. Set automatically during recursion; callers should leave this as None.

  • scratch (dict, optional) – Internal cache of intermediate stage results, keyed by (alias, role) tuples. Populated during execution so the same stage is not re-run if referenced multiple times. Callers should leave this as None.

Returns:

Result from the final pipeline stage. Typically contains:

  • "scores"{docid: float} ranked results.

  • "cached" — whether the result was served from cache.

  • "timestamp" — Unix timestamp of processing.

Return type:

dict

Parser

Pipeline DSL parser for RoutIR search pipelines.

The pipeline DSL composes retrieval and reranking services into multi-stage, parallel, or query-expansion pipelines using a concise string syntax.

Syntax reference:

# Single service — retrieve top 20
"bm25%20"

# Sequential — retrieve 100, rerank to top 20
"bm25%100 >> cross-encoder%20"

# Parallel retrieval + fusion — dense and sparse run concurrently, fused by RRF
"{dense, sparse}RRF%100"

# Query expansion — expander generates sub-queries, each runs the parallel branches
"expander{dense, sparse}RRF%100"

# Alias — name a stage so runtime_kwargs can target it by alias
"dense[d]%100 >> cross-encoder%20"

Grammar operators:

  • service%N — call service, keep top N results.

  • service[alias] — give this call the name alias for runtime_kwargs targeting.

  • A >> B — sequential: run A, pass output to B (B auto-assigned role "rerank").

  • {A, B}Merger — parallel: run A and B concurrently, fuse with Merger.

  • Expander{A, B}Merger — query expansion: Expander produces sub-queries, each dispatched to A and B, all results fused by Merger.

The module-level parser singleton parses DSL strings into PipelineComponent AST nodes consumed by SearchPipeline.

class routir.pipeline.parser.SystemCall(name, alias=None, limit=None, role='search')[source]

Bases: object

An AST node representing a single service call in the pipeline.

name

Service name as registered in ProcessorRegistry.

Type:

str

alias

Label used to target this stage with runtime_kwargs. Defaults to name when not specified with the [alias] syntax.

Type:

str

limit

Maximum results to return (the %N suffix). None means no explicit limit; the service uses its own default.

Type:

int or None

role

Role assigned during pipeline construction. One of:

  • "search" — first-stage retrieval; receives the raw query.

  • "rerank" — later stage; receives previous results and document text fetched from the collection.

  • "expander" — generates sub-queries for parallel execution.

  • "merger" — fuses multiple ranked lists into one final ranking.

Type:

str

name: str
alias: str | None = None
limit: int | None = None
role: str | None = 'search'
property all_calls
as_role(role)[source]
__init__(name, alias=None, limit=None, role='search')
class routir.pipeline.parser.CallSequence(stages)[source]

Bases: object

An AST node for a sequential chain of pipeline stages (A >> B >> C).

During construction, all stages after the first are automatically assigned role "rerank": they receive the previous stage’s scored results together with document text fetched from the collection and are expected to rescore them.

stages

Ordered list of SystemCall or ParallelCallSequences nodes executed left-to-right. stages[0] has role "search"; all later stages have role "rerank".

Type:

list

stages: List[SystemCall | ParallelCallSequences]
property all_calls
as_role(role)[source]
__init__(stages)
class routir.pipeline.parser.ParallelCallSequences(sequences, merger, expander=None)[source]

Bases: object

An AST node for parallel retrieval with a merger ({A, B}Merger).

Runs multiple retrieval branches concurrently and fuses their results. Optionally preceded by a query-expansion service (E{A, B}Merger).

sequences

Independent retrieval pipelines run in parallel; each is a CallSequence.

Type:

list[CallSequence]

merger

Service that fuses the parallel results. Its role is automatically set to "merger"; it must implement fuse_batch().

Type:

SystemCall

expander

Optional query-expansion service. When present, its role is "expander" and it must implement decompose_query_batch(). The expanded sub-queries are each dispatched to every sequence, and all results are merged by merger. When None, the original query is sent directly to all sequences.

Type:

SystemCall or None

sequences: List[CallSequence]
merger: SystemCall
expander: SystemCall | None = None
property all_calls
as_role(role)[source]
__init__(sequences, merger, expander=None)
routir.pipeline.parser.PipelineComponent

Type alias for any top-level pipeline AST node.

alias of SystemCall | CallSequence | ParallelCallSequences

class routir.pipeline.parser.PipelineTransformer(visit_tokens=True)[source]

Bases: Transformer

Lark Transformer that converts a parse tree into RoutIR AST nodes.

Each method corresponds to a grammar rule and returns the appropriate SystemCall, CallSequence, or ParallelCallSequences instance. Used internally by the parser singleton; not normally called directly.

seq(stages)[source]
system_call(tokens)[source]
alias(tokens)[source]
parallel_seq(tokens)[source]
stage(tokens)[source]
seq_list(tokens)[source]