Pipeline

This section contains auto-generated documentation for pipeline components.

Pipeline Module

class routir.pipeline.pipeline.SearchPipeline(pipeline, collection, runtime_kwargs=None, verify=True)[source]

Bases: object

Executes search pipelines defined using a custom DSL.

Supports sequential and parallel execution of search, rerank, query expansion, and fusion operations.

pipeline

Parsed pipeline component tree

collection

Document collection to search

runtime_kwargs

Runtime parameters passed to pipeline components

doc_content_cache

Cache for retrieved document content

__init__(pipeline, collection, runtime_kwargs=None, verify=True)[source]

Initialize search pipeline.

Parameters:
verify()[source]

Verify that all required services exist in the registry.

classmethod from_string(pipeline_string, collection, runtime_kwargs=None, verify=True)[source]

Create pipeline from string specification.

Parameters:
  • pipeline_string (str) – Pipeline DSL string

  • collection (str) – Collection name

  • runtime_kwargs (Dict[str, Dict[str, Any]]) – Runtime parameters

  • verify (bool) – Whether to verify services

Returns:

SearchPipeline instance

Return type:

SearchPipeline

async get_doc_content(doc_id)[source]

Retrieve and cache document content.

Parameters:

doc_id (str) – Document identifier

Returns:

Document text content

async run(query, last_output=None, current_node=None, scratch=None)[source]

Recursively execute the pipeline for a query.

Parameters:
Returns:

Final search results

Return type:

List[Dict[str, float]]

Parser

class routir.pipeline.parser.SystemCall(name: str, alias: str | None = None, limit: int | None = None, role: str | None = 'search')[source]

Bases: object

name: str
alias: str | None = None
limit: int | None = None
role: str | None = 'search'
property all_calls
as_role(role)[source]
__init__(name, alias=None, limit=None, role='search')
class routir.pipeline.parser.CallSequence(stages: List[routir.pipeline.parser.SystemCall | ForwardRef('ParallelCallSequences')])[source]

Bases: object

stages: List[SystemCall | ParallelCallSequences]
property all_calls
as_role(role)[source]
__init__(stages)
class routir.pipeline.parser.ParallelCallSequences(sequences: List[routir.pipeline.parser.CallSequence], merger: routir.pipeline.parser.SystemCall, expander: routir.pipeline.parser.SystemCall | None = None)[source]

Bases: object

sequences: List[CallSequence]
merger: SystemCall
expander: SystemCall | None = None
property all_calls
as_role(role)[source]
__init__(sequences, merger, expander=None)
class routir.pipeline.parser.PipelineTransformer(visit_tokens=True)[source]

Bases: Transformer

seq(stages)[source]
system_call(tokens)[source]
alias(tokens)[source]
parallel_seq(tokens)[source]
stage(tokens)[source]
seq_list(tokens)[source]