Source code for routir.processors.content_processors

import json
from typing import Any, Dict

from ..config import CollectionConfig
from ..utils import load_singleton
from ..utils.file_io import MSMARCOSegOffset, OffsetFile, RandomAccessReader
from .abstract import Processor


[docs] class ContentProcessor(Processor): """ Processor for retrieving document content by ID. Provides fast random access to documents in JSONL files using offset maps. Attributes: config: Collection configuration line_reader: Random access reader for document file content_field: Field(s) containing document text lang_mapping: Optional mapping of document IDs to language codes """
[docs] def __init__(self, collection_config: CollectionConfig, cache_size=0, cache_ttl=600): """ Initialize content processor. Args: collection_config: Collection configuration with doc_path, id_field, etc. cache_size: Maximum cache entries cache_ttl: Cache TTL in seconds """ # always use `id` from the request as the key, this is different from id_field in config super().__init__(cache_size, cache_ttl, lambda x: x["id"]) assert collection_config.doc_path is not None, "doc_path must be specified in collection_config" self.config = collection_config self.line_reader = self._load_reader(collection_config.force_load_all_documents) self.content_field = collection_config.content_field self.lang_mapping = None if collection_config.id_to_lang_mapping is not None: self.lang_mapping: Dict[str, str] = load_singleton(collection_config.id_to_lang_mapping)
def _load_reader(self, force_load_all: bool = False) -> RandomAccessReader: if self.config.offset_source == "offsetfile": return OffsetFile( self.config.doc_path, key=lambda line: json.loads(line)[self.config.id_field], offset_fn=self.config.cache_path, id_field=self.config.id_field, # Added for parallel offset map building ) elif self.config.offset_source == "msmarco_seg": return MSMARCOSegOffset(self.config.doc_path, force_load_all=force_load_all) def __getitem__(self, idx: str): doc = json.loads(self.line_reader[idx]) results = {"text": "\n".join(doc[c] for c in self.content_field)} if "title" in doc: results["title"] = doc["title"] if self.lang_mapping is not None: results["language"] = self.lang_mapping.get(idx, "") return results def __contains__(self, idx: str): return idx in self.line_reader async def _submit(self, item: Dict[str, Any]) -> Dict[str, str]: return ( self[self.cache_key(item)] if self.cache_key(item) in self else {"error": f"ID {self.cache_key(item)} is not found."} )
[docs] class IRDSProcessor(Processor): """ Processor for retrieving document content by ID from IRDS format. Inherits from ContentProcessor and uses IRDS-specific line reader. """
[docs] def __init__(self, collection_config: CollectionConfig, cache_size=0, cache_ttl=600): """ Initialize content processor. Args: collection_config: Collection configuration with doc_path, id_field, etc. cache_size: Maximum cache entries cache_ttl: Cache TTL in seconds """ # always use `id` from the request as the key, this is different from id_field in config super().__init__(cache_size, cache_ttl, lambda x: x["id"]) import ir_datasets as irds self.config = collection_config self.content_field = collection_config.content_field self.ds = irds.load(collection_config.name).docs
def __getitem__(self, idx: str): doc: dict[str, str] = self.ds.lookup(idx)._asdict() results = {"text": "\n".join(doc[c] for c in self.content_field)} if "title" in doc: results["title"] = doc["title"] return results def __contains__(self, idx: str): try: _ = self.ds.lookup(idx) return True except KeyError: return False async def _submit(self, item: Dict[str, Any]) -> Dict[str, str]: return ( self[self.cache_key(item)] if self.cache_key(item) in self else {"error": f"ID {self.cache_key(item)} is not found."} )