Pipeline
This section contains auto-generated documentation for pipeline components.
Pipeline Module
- class routir.pipeline.pipeline.SearchPipeline(pipeline, collection=None, runtime_kwargs=None, verify=True, bytes_content_cache_max_bytes=None)[source]
Bases:
objectExecutes a RoutIR pipeline defined by the pipeline DSL.
A
SearchPipelineis constructed from a parsedPipelineComponenttree (or directly from a DSL string viafrom_string()) and runs all stages recursively, passing results between them.Pipeline DSL syntax (see
routir.pipeline.parserfor full reference):# Simple retrieval "bm25%20" # Sequential reranking: retrieve 100, rerank to top 20 "bm25%100 >> my-reranker%20" # Parallel retrieval + fusion "{dense, sparse}RRF%100" # Query expansion + parallel + fusion "expander{dense, sparse}RRF%100" # Named alias for per-stage runtime_kwargs "dense[d]%100 >> reranker%20"
runtime_kwargs
Pass per-stage parameters at query time by giving a stage an alias with
[alias]and providing a dict keyed by that alias:pipeline = SearchPipeline.from_string( "dense[d]%100 >> reranker%20", collection="my-docs", runtime_kwargs={"d": {"instruction": "Retrieve relevant passages"}}, )
Verification
At construction time the pipeline checks that every referenced service is registered in
ProcessorRegistry. Reranking stages additionally require a content service for the collection (to fetch document text for rescoring).- pipeline
Parsed pipeline AST root node.
- Type:
- runtime_kwargs
Per-alias extra parameters injected into service call payloads at execution time. Keys are pipeline aliases (defaulting to the service name); values are dicts merged into the request.
- Type:
- doc_content_cache
Per-run cache mapping document IDs to their text content, populated lazily during reranking and shared across all stages within a single
run()call.- Type:
- __init__(pipeline, collection=None, runtime_kwargs=None, verify=True, bytes_content_cache_max_bytes=None)[source]
Initialize search pipeline.
- Parameters:
pipeline (PipelineComponent) – Parsed pipeline AST, typically produced by
parse().collection (str, optional) – Document collection name. Required only when the pipeline contains a reranking stage (which fetches document text). Must be registered as a
"content"service inProcessorRegistry.runtime_kwargs (dict, optional) –
Per-alias extra parameters injected into individual service call payloads at execution time. Keys are pipeline aliases; values are dicts. Example:
{"dense": {"instruction": "Retrieve relevant passages"}}
verify (bool) – If
True(default), validate the pipeline at construction time. RaisesValueErrorfor bad input (e.g. the pipeline reranks but nocollectionwas provided) andRuntimeErrorfor operational issues (referenced service or content processor not registered).bytes_content_cache_max_bytes (int, optional) – Per-pipeline cap on the bytes payloads cached in
doc_content_cache. When set and an insertion would push the accumulated bytes-payload size over the cap, the cache evicts in insertion order (FIFO) until under the cap. Only bytes-view payloads count toward the cap; text entries are free.None(default) disables eviction.
- verify()[source]
Verify that all required services exist and views resolve.
PR4 reads view metadata from the registry slot rather than the content processor directly so the same code path validates both local
ContentProcessorand remoteRelayContentProcessorcollections. Each rerank stage’s slot-levelview_kindis also checked against the view’s declared kind so a bytes-modality view never gets routed to a text-only scorer (or vice versa).
- classmethod from_string(pipeline_string, collection=None, runtime_kwargs=None, verify=True, bytes_content_cache_max_bytes=None)[source]
Create a pipeline from a DSL string.
This is the primary constructor for most use cases.
- Parameters:
pipeline_string (str) –
Pipeline specification. Examples:
"bm25%20" # simple retrieval, top 20 "bm25%100 >> reranker%20" # sequential reranking "{dense, sparse}RRF%100" # parallel + fusion "expander{dense, sparse}RRF%100" # query expansion "dense[d]%100 >> reranker%20" # with alias for runtime_kwargs
collection (str, optional) – Document collection name for content lookup during reranking. Required only when the pipeline contains a reranking stage.
runtime_kwargs (dict, optional) –
Per-alias extra parameters injected into service payloads at query time. Keys are pipeline aliases (the name inside
[...]; defaults to service name). Example:{"d": {"instruction": "Given a query, retrieve passages…"}}
verify (bool) – Verify that all referenced services exist in the registry at construction time (default:
True).
- Returns:
Ready-to-run pipeline instance.
- Return type:
- async classmethod cached_run(pipeline_string, query, collection=None, runtime_kwargs=None, bytes_content_cache_max_bytes=None)[source]
Run a pipeline with optional process-wide result caching.
Single entry point used by both REST (
/pipeline) and gRPC (RoutirServicer.Pipeline) handlers. Behaviour:Parse the DSL and construct a
SearchPipeline. Any DSL or verification error propagates to the caller unchanged.If a pipeline cache is installed (see
routir.pipeline.cache.set_pipeline_cache()), look up the result by canonical key. On hit, return it withcached=Trueoverriding any stage-level flag.On miss, execute the pipeline; cache the response only when it has no
"error"key, mirroringProcessor.
- Parameters:
pipeline_string (str) – Pipeline DSL string.
query (str) – User query.
collection (str | None) – Collection name for reranking stages.
runtime_kwargs (Dict[str, Dict[str, Any]]) – Optional per-alias extra parameters.
bytes_content_cache_max_bytes (int | None) – Per-run cap on the bytes payloads cached during pipeline execution; see
__init__()for details. Typically sourced frombytes_content_cache_max_bytes.
- Returns:
Final pipeline result dict. Includes a top-level
cachedflag indicating whether the pipeline-level cache was hit (stage-level cache flags are not surfaced when the pipeline cache is in use).- Return type:
- async get_doc_content(doc_id, view)[source]
Retrieve and cache document content for a specific view.
The cache is keyed by
(view, doc_id)so different views of the same document are stored independently. Bytes payloads are subject to the optional per-pipeline cap configured viabytes_content_cache_max_bytes.
- async run(query, last_output=None, current_node=None, scratch=None)[source]
Recursively execute the pipeline for a single query.
For most callers, only
queryneeds to be provided — the remaining parameters are used internally during recursive traversal of the AST.The method dispatches on the type of
current_node:CallSequence— iterates over stages, passinglast_outputforward at each step.ParallelCallSequences— optionally runs the expander first, then fans out to all branches concurrently, and finally calls the merger with all ranked lists.SystemCall— submits one request to the appropriate processor and applies the%limitif set.
- Parameters:
query (str) – The user query string.
last_output (dict, optional) – Output dict from the previous pipeline stage, threaded automatically during recursion. Callers should leave this as
None. After a search stage it contains{"scores": {docid: float, …}}; after an expander stage it contains{"queries": [str, …]}.current_node (PipelineComponent, optional) – The AST node to execute. Defaults to the root of the pipeline. Set automatically during recursion; callers should leave this as
None.scratch (dict, optional) – Internal cache of intermediate stage results, keyed by
(alias, role, view)tuples. Populated during execution so the same stage is not re-run if referenced multiple times; includingviewkeeps the same alias+role under two different views from collapsing. Callers should leave this asNone.
- Returns:
Result from the final pipeline stage. Typically contains:
"scores"—{docid: float}ranked results."cached"— whether the result was served from cache."timestamp"— Unix timestamp of processing.
- Return type:
Parser
Pipeline DSL parser for RoutIR search pipelines.
The pipeline DSL composes retrieval and reranking services into multi-stage, parallel, or query-expansion pipelines using a concise string syntax.
Syntax reference:
# Single service — retrieve top 20
"bm25%20"
# Sequential — retrieve 100, rerank to top 20
"bm25%100 >> cross-encoder%20"
# Parallel retrieval + fusion — dense and sparse run concurrently, fused by RRF
"{dense, sparse}RRF%100"
# Query expansion — expander generates sub-queries, each runs the parallel branches
"expander{dense, sparse}RRF%100"
# Alias — name a stage so runtime_kwargs can target it by alias
"dense[d]%100 >> cross-encoder%20"
Grammar operators:
service%N— call service, keep top N results.service[alias]— give this call the name alias forruntime_kwargstargeting.A >> B— sequential: run A, pass output to B (B auto-assigned role"rerank").{A, B}Merger— parallel: run A and B concurrently, fuse with Merger.Expander{A, B}Merger— query expansion: Expander produces sub-queries, each dispatched to A and B, all results fused by Merger.
The module-level parser singleton parses DSL strings into
PipelineComponent AST nodes consumed by SearchPipeline.
- class routir.pipeline.parser.SystemCall(name, alias=None, limit=None, role='search', view=None)[source]
Bases:
objectAn AST node representing a single service call in the pipeline.
- alias
Label used to target this stage with
runtime_kwargs. Defaults tonamewhen not specified with the[alias]syntax.- Type:
- limit
Maximum results to return (the
%Nsuffix).Nonemeans no explicit limit; the service uses its own default.- Type:
int or None
- role
Role assigned during pipeline construction. One of:
"search"— first-stage retrieval; receives the raw query."rerank"— later stage; receives previous results and document text fetched from the collection."expander"— generates sub-queries for parallel execution."merger"— fuses multiple ranked lists into one final ranking.
- Type:
- view
View name selector (the
@viewsuffix) used to pick which view of the collection’s content service to fetch when this stage reranks.Nonemeans fall back to the collection’sdefault_viewat run time. Only meaningful for"rerank"stages; ignored forsearch/merger/expanderroles.- Type:
str or None
- property all_calls
- __init__(name, alias=None, limit=None, role='search', view=None)
- class routir.pipeline.parser.CallSequence(stages)[source]
Bases:
objectAn AST node for a sequential chain of pipeline stages (
A >> B >> C).During construction, all stages after the first are automatically assigned role
"rerank": they receive the previous stage’s scored results together with document text fetched from the collection and are expected to rescore them.- stages
Ordered list of
SystemCallorParallelCallSequencesnodes executed left-to-right.stages[0]has role"search"; all later stages have role"rerank".- Type:
- stages: List[SystemCall | ParallelCallSequences]
- property all_calls
- __init__(stages)
- class routir.pipeline.parser.ParallelCallSequences(sequences, merger, expander=None)[source]
Bases:
objectAn AST node for parallel retrieval with a merger (
{A, B}Merger).Runs multiple retrieval branches concurrently and fuses their results. Optionally preceded by a query-expansion service (
E{A, B}Merger).- sequences
Independent retrieval pipelines run in parallel; each is a
CallSequence.- Type:
- merger
Service that fuses the parallel results. Its role is automatically set to
"merger"; it must implementfuse_batch().- Type:
- expander
Optional query-expansion service. When present, its role is
"expander"and it must implementdecompose_query_batch(). The expanded sub-queries are each dispatched to every sequence, and all results are merged bymerger. WhenNone, the original query is sent directly to all sequences.- Type:
SystemCall or None
- sequences: List[CallSequence]
- merger: SystemCall
- expander: SystemCall | None = None
- property all_calls
- __init__(sequences, merger, expander=None)
- routir.pipeline.parser.PipelineComponent
Type alias for any top-level pipeline AST node.
alias of
SystemCall|CallSequence|ParallelCallSequences
- class routir.pipeline.parser.PipelineTransformer(visit_tokens=True)[source]
Bases:
TransformerLark
Transformerthat converts a parse tree into RoutIR AST nodes.Each method corresponds to a grammar rule and returns the appropriate
SystemCall,CallSequence, orParallelCallSequencesinstance. Used internally by theparsersingleton; not normally called directly.
- routir.pipeline.parser.expand_aliases(node, aliases)[source]
Substitute alias
SystemCallnodes with their pre-parsed bodies.Walks the AST and, for each
SystemCallwhosenameis a key inaliases, replaces it with a deep copy of the corresponding alias body. The original call’sroleis propagated into the substituted subtree viaas_role(), and the original call’slimit(when set) is applied to the outermost (final-output) node via_apply_outer_limit().Merger and expander positions inside a
ParallelCallSequencesare leaf calls and are not substituted; aliases only apply at pipeline-stage or parallel-branch positions.- Parameters:
node (SystemCall | CallSequence | ParallelCallSequences) – AST root to expand.
aliases (Dict[str, SystemCall | CallSequence | ParallelCallSequences]) – Mapping of alias name to fully-expanded alias body AST. When empty,
nodeis returned unchanged.
- Returns:
A new AST with all alias references substituted. The input is not mutated.
- Return type: