Pipeline
This section contains auto-generated documentation for pipeline components.
Pipeline Module
- class routir.pipeline.pipeline.SearchPipeline(pipeline, collection, runtime_kwargs=None, verify=True)[source]
Bases:
objectExecutes a RoutIR pipeline defined by the pipeline DSL.
A
SearchPipelineis constructed from a parsedPipelineComponenttree (or directly from a DSL string viafrom_string()) and runs all stages recursively, passing results between them.Pipeline DSL syntax (see
routir.pipeline.parserfor full reference):# Simple retrieval "bm25%20" # Sequential reranking: retrieve 100, rerank to top 20 "bm25%100 >> my-reranker%20" # Parallel retrieval + fusion "{dense, sparse}RRF%100" # Query expansion + parallel + fusion "expander{dense, sparse}RRF%100" # Named alias for per-stage runtime_kwargs "dense[d]%100 >> reranker%20"
runtime_kwargs
Pass per-stage parameters at query time by giving a stage an alias with
[alias]and providing a dict keyed by that alias:pipeline = SearchPipeline.from_string( "dense[d]%100 >> reranker%20", collection="my-docs", runtime_kwargs={"d": {"instruction": "Retrieve relevant passages"}}, )
Verification
At construction time the pipeline checks that every referenced service is registered in
ProcessorRegistry. Reranking stages additionally require a content service for the collection (to fetch document text for rescoring).- pipeline
Parsed pipeline AST root node.
- Type:
- runtime_kwargs
Per-alias extra parameters injected into service call payloads at execution time. Keys are pipeline aliases (defaulting to the service name); values are dicts merged into the request.
- Type:
- doc_content_cache
Per-run cache mapping document IDs to their text content, populated lazily during reranking and shared across all stages within a single
run()call.- Type:
- __init__(pipeline, collection, runtime_kwargs=None, verify=True)[source]
Initialize search pipeline.
- Parameters:
pipeline (PipelineComponent) – Parsed pipeline AST, typically produced by
parse().collection (str) – Document collection name. Must be registered as a
"content"service inProcessorRegistrywhen any reranking stage is present.runtime_kwargs (dict, optional) –
Per-alias extra parameters injected into individual service call payloads at execution time. Keys are pipeline aliases; values are dicts. Example:
{"dense": {"instruction": "Retrieve relevant passages"}}
verify (bool) – If
True(default), raiseRuntimeErrorat construction time if any referenced service is missing from the registry, or if a reranking stage is present but no content service is configured for the collection.
- classmethod from_string(pipeline_string, collection, runtime_kwargs=None, verify=True)[source]
Create a pipeline from a DSL string.
This is the primary constructor for most use cases.
- Parameters:
pipeline_string (str) –
Pipeline specification. Examples:
"bm25%20" # simple retrieval, top 20 "bm25%100 >> reranker%20" # sequential reranking "{dense, sparse}RRF%100" # parallel + fusion "expander{dense, sparse}RRF%100" # query expansion "dense[d]%100 >> reranker%20" # with alias for runtime_kwargs
collection (str) – Document collection name for content lookup during reranking.
runtime_kwargs (dict, optional) –
Per-alias extra parameters injected into service payloads at query time. Keys are pipeline aliases (the name inside
[...]; defaults to service name). Example:{"d": {"instruction": "Given a query, retrieve passages…"}}
verify (bool) – Verify that all referenced services exist in the registry at construction time (default:
True).
- Returns:
Ready-to-run pipeline instance.
- Return type:
- async get_doc_content(doc_id)[source]
Retrieve and cache document content.
- Parameters:
doc_id (str) – Document identifier
- Returns:
Document text content
- async run(query, last_output=None, current_node=None, scratch=None)[source]
Recursively execute the pipeline for a single query.
For most callers, only
queryneeds to be provided — the remaining parameters are used internally during recursive traversal of the AST.The method dispatches on the type of
current_node:CallSequence— iterates over stages, passinglast_outputforward at each step.ParallelCallSequences— optionally runs the expander first, then fans out to all branches concurrently, and finally calls the merger with all ranked lists.SystemCall— submits one request to the appropriate processor and applies the%limitif set.
- Parameters:
query (str) – The user query string.
last_output (dict, optional) – Output dict from the previous pipeline stage, threaded automatically during recursion. Callers should leave this as
None. After a search stage it contains{"scores": {docid: float, …}}; after an expander stage it contains{"queries": [str, …]}.current_node (PipelineComponent, optional) – The AST node to execute. Defaults to the root of the pipeline. Set automatically during recursion; callers should leave this as
None.scratch (dict, optional) – Internal cache of intermediate stage results, keyed by
(alias, role)tuples. Populated during execution so the same stage is not re-run if referenced multiple times. Callers should leave this asNone.
- Returns:
Result from the final pipeline stage. Typically contains:
"scores"—{docid: float}ranked results."cached"— whether the result was served from cache."timestamp"— Unix timestamp of processing.
- Return type:
Parser
Pipeline DSL parser for RoutIR search pipelines.
The pipeline DSL composes retrieval and reranking services into multi-stage, parallel, or query-expansion pipelines using a concise string syntax.
Syntax reference:
# Single service — retrieve top 20
"bm25%20"
# Sequential — retrieve 100, rerank to top 20
"bm25%100 >> cross-encoder%20"
# Parallel retrieval + fusion — dense and sparse run concurrently, fused by RRF
"{dense, sparse}RRF%100"
# Query expansion — expander generates sub-queries, each runs the parallel branches
"expander{dense, sparse}RRF%100"
# Alias — name a stage so runtime_kwargs can target it by alias
"dense[d]%100 >> cross-encoder%20"
Grammar operators:
service%N— call service, keep top N results.service[alias]— give this call the name alias forruntime_kwargstargeting.A >> B— sequential: run A, pass output to B (B auto-assigned role"rerank").{A, B}Merger— parallel: run A and B concurrently, fuse with Merger.Expander{A, B}Merger— query expansion: Expander produces sub-queries, each dispatched to A and B, all results fused by Merger.
The module-level parser singleton parses DSL strings into
PipelineComponent AST nodes consumed by SearchPipeline.
- class routir.pipeline.parser.SystemCall(name, alias=None, limit=None, role='search')[source]
Bases:
objectAn AST node representing a single service call in the pipeline.
- alias
Label used to target this stage with
runtime_kwargs. Defaults tonamewhen not specified with the[alias]syntax.- Type:
- limit
Maximum results to return (the
%Nsuffix).Nonemeans no explicit limit; the service uses its own default.- Type:
int or None
- role
Role assigned during pipeline construction. One of:
"search"— first-stage retrieval; receives the raw query."rerank"— later stage; receives previous results and document text fetched from the collection."expander"— generates sub-queries for parallel execution."merger"— fuses multiple ranked lists into one final ranking.
- Type:
- property all_calls
- __init__(name, alias=None, limit=None, role='search')
- class routir.pipeline.parser.CallSequence(stages)[source]
Bases:
objectAn AST node for a sequential chain of pipeline stages (
A >> B >> C).During construction, all stages after the first are automatically assigned role
"rerank": they receive the previous stage’s scored results together with document text fetched from the collection and are expected to rescore them.- stages
Ordered list of
SystemCallorParallelCallSequencesnodes executed left-to-right.stages[0]has role"search"; all later stages have role"rerank".- Type:
- stages: List[SystemCall | ParallelCallSequences]
- property all_calls
- __init__(stages)
- class routir.pipeline.parser.ParallelCallSequences(sequences, merger, expander=None)[source]
Bases:
objectAn AST node for parallel retrieval with a merger (
{A, B}Merger).Runs multiple retrieval branches concurrently and fuses their results. Optionally preceded by a query-expansion service (
E{A, B}Merger).- sequences
Independent retrieval pipelines run in parallel; each is a
CallSequence.- Type:
- merger
Service that fuses the parallel results. Its role is automatically set to
"merger"; it must implementfuse_batch().- Type:
- expander
Optional query-expansion service. When present, its role is
"expander"and it must implementdecompose_query_batch(). The expanded sub-queries are each dispatched to every sequence, and all results are merged bymerger. WhenNone, the original query is sent directly to all sequences.- Type:
SystemCall or None
- sequences: List[CallSequence]
- merger: SystemCall
- expander: SystemCall | None = None
- property all_calls
- __init__(sequences, merger, expander=None)
- routir.pipeline.parser.PipelineComponent
Type alias for any top-level pipeline AST node.
alias of
SystemCall|CallSequence|ParallelCallSequences
- class routir.pipeline.parser.PipelineTransformer(visit_tokens=True)[source]
Bases:
TransformerLark
Transformerthat converts a parse tree into RoutIR AST nodes.Each method corresponds to a grammar rule and returns the appropriate
SystemCall,CallSequence, orParallelCallSequencesinstance. Used internally by theparsersingleton; not normally called directly.