Source code for GaudiConf.LbExec.options

###############################################################################
# (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import glob
import logging
import math
import re
from contextlib import contextmanager
from enum import Enum
from itertools import product
from typing import Annotated, Optional

from DDDB.CheckDD4Hep import UseDD4Hep
from Gaudi.Configuration import INFO
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from PyConf.packing import default_persistreco_version


class HltSourceID(str, Enum):
    """possible values of the Hlt source ID used to identify the origin of
    the data in the various rawbanks produced by Hlt and/or Sprucing
    """

    Hlt1 = ("Hlt1", ("/Event/HLT1/",))
    Hlt2 = ("Hlt2", ("/Event/HLT2/",))
    Spruce = ("Spruce", ("/Event/Spruce/", "/Event/Turbo/"))

    def __new__(cls, value: str, prefixes: tuple[str, ...]):
        obj = str.__new__(cls, value)
        obj._value_ = value
        obj._prefixes = prefixes
        return obj

    @classmethod
    def from_location(cls, location: str):
        # deal with the fact that sourceID is eg. {"Spruce","Hlt1","Hlt2",...},
        # but the TES prefixes are "/Event/Spruce", "/Event/HLT1", "/Event/HLT2",... sigh...
        # and for "Spruce", there are _two_ valid prefixes: `Spruce' and 'Turbo'
        for src in cls:
            if location.startswith(src._prefixes):
                return src
        raise KeyError(f"Unable to derive source id from location: '{location}'")


class InputProcessTypes(str, Enum):
    """
    `TurboPass`, `Hlt2` and `Spruce` are standard run-3 input process types.
    The extra types are available to effectively flag special input types
    and be able to apply a filter only to processes where a `DstData` bank is produced (the standard 3 types).
    See discussions at https://gitlab.cern.ch/lhcb/LHCb/-/merge_requests/3990 and related MR for further details.
    """

    #  InputProcessType ->  SourceID, tes_root, tes_root_for_tistos
    Spruce = (
        "Spruce",
        HltSourceID.Spruce,
        "/Event/Spruce/HLT2",
        "/Event/Spruce/HLT2/TISTOS",
    )
    TurboSpruce = (
        "TurboSpruce",
        HltSourceID.Spruce,
        "/Event/Turbo/HLT2",
        "/Event/Turbo",
    )
    Hlt1 = ("Hlt1", HltSourceID.Hlt1, "/Event/HLT1", None)
    # This option should be removed when all data goes through the Sprucing
    Hlt2 = ("Hlt2", HltSourceID.Hlt2, "/Event/HLT2", None)
    TurboPass = ("TurboPass", HltSourceID.Hlt2, "/Event/HLT2", "/Event/HLT2")
    Gen = ("Gen", None, None, None)
    # This option should be removed once support for old MC files is fully removed.
    Brunel = ("Brunel", None, None, None)
    Boole = ("Boole", None, None, None)

    def __new__(
        cls, value: str, source: Optional[HltSourceID], prefix: str, tistos: str
    ):
        obj = str.__new__(cls, value)
        obj._value_ = value
        obj._sourceID = source
        obj._prefix = prefix
        obj._tistos_prefix = tistos
        return obj

    @property
    def sourceID(self) -> Optional[HltSourceID]:
        return self._sourceID

    @property
    def tes_root(self) -> Optional[str]:
        return self._prefix

    @property
    def tes_root_for_tistos(self) -> Optional[str]:
        return self._tistos_prefix

    @classmethod
    def _missing_(cls, value: object):
        """Map alternative names to canonical members."""
        if value == "Turbo":
            return cls.TurboPass
        return super()._missing_(value)


[docs] class DataTypeEnum(Enum): Upgrade = "Upgrade" # Run 3 DT2022 = "2022" DT2023 = "2023" DT2024 = "2024" DT2025 = "2025"
[docs] class FileFormats(str, Enum): NONE = "NONE" RAW = "RAW" ROOT = "ROOT"
[docs] class EventStores(str, Enum): EvtStoreSvc = "EvtStoreSvc" HiveWhiteBoard = "HiveWhiteBoard"
class CompressionAlgs(str, Enum): ZLIB = "ZLIB" LZMA = "LZMA" LZ4 = "LZ4" ZSTD = "ZSTD" class CompressionSettings(BaseModel): model_config = ConfigDict(frozen=True, extra="forbid", use_enum_values=True) algorithm: CompressionAlgs = CompressionAlgs.ZSTD level: int = 4 min_buffer_size: Optional[int] = None max_buffer_size: Optional[int] = None approx_events_per_basket: Optional[int] = None def as_gaudi_config(self): config = {"GlobalCompression": f"{self.algorithm}:{self.level}"} if self.min_buffer_size is not None: config["MinBufferSize"] = self.min_buffer_size if self.max_buffer_size is not None: config["MaxBufferSize"] = self.max_buffer_size if self.approx_events_per_basket is not None: config["ApproxEventsPerBasket"] = self.approx_events_per_basket return config class Options(BaseModel): model_config = ConfigDict(use_enum_values=True, frozen=True, extra="forbid") """Conditions""" data_type: Optional[DataTypeEnum] = "Upgrade" simulation: bool dddb_tag: Optional[str] = None conddb_tag: Optional[str] = None geometry_version: Optional[str] = None # Backend to be used for the geometry, can be DD4Hep or DetDesc. Use "NONE" for disabling the geometry' geometry_backend: str = "DD4Hep" if UseDD4Hep else "DetDesc" conditions_version: Optional[str] = None """Input""" input_files: list[str] = [] input_type: FileFormats = FileFormats.NONE input_raw_format: float = 0.5 input_process: Optional[InputProcessTypes] = None input_manifest_file: Optional[str] = None input_stream: Optional[str] = "" input_run_number: Optional[int] = None persistreco_version: float = default_persistreco_version() xml_file_catalog: Optional[str] = None evt_max: int = -1 first_evt: int = 0 # number of events to pre-fetch, the default value (20) is reasonable for HLT2/Analysis. It needs to be increased for HLT1, typically to 20000 ioalg_buffer_nb_events: int = 20 # name of the ioalg to be used when input_type is MDF. Choices currently are IOAlgMemoryMap and IOAlgFileRead, the later being the default mdf_ioalg_name: str = "IOAlgFileRead" # name of the ioalg to be used when input_type is ROOT. Choices currently are RootIOAlg and RootIOAlgExt, the former being the default root_ioalg_name: str = "RootIOAlg" # dictionnary of properties to be passed to the RootIOAlg at construction time root_ioalg_opts: dict = {} # in case GaudiPython is used, set this to true in order to change # the behavior of the scheduler accordingly gaudipython_mode: bool = False """Output""" output_file: Optional[str] = None output_type: FileFormats = FileFormats.ROOT output_manifest_file: Optional[str] = None append_decoding_keys_to_output_manifest: bool = True write_decoding_keys_to_git: Optional[bool] = None write_options_to_fsr: bool = False require_specific_decoding_keys: list[str] = [] compression: Optional[CompressionSettings] = None histo_file: Optional[str] = None ntuple_file: Optional[str] = None ntuple_basketsize: Optional[int] = 32000 # ROOT default xml_summary_file: Optional[str] = None xml_summary_svc: Optional[str] = "XMLSummarySvc" # write configuration of lines in streams to json at runtime write_streams_attributes_to_json: bool = False # Output json file for configuration of lines in streams output_streams_attributes_file: Optional[str] = "line_attribute_dict.json" # Input json file for configuration of lines in streams input_streams_attributes_file: Optional[str] = "line_attribute_dict.json" """Processing""" n_threads: int = 1 # defaults to 1.2 * n_threads n_event_slots: Annotated[int, Field(validate_default=True)] # Event store implementation: HiveWhiteBoard (default) or EvtStoreSvc (faster). event_store: EventStores = EventStores.HiveWhiteBoard # Estimated size of the per-event memory pool, zero disables the pool memory_pool_size: int = 10 * 1024 * 1024 # If False, scheduler calls Algorithm::execute instead of # Algorithm::sysExecute which breaks some non-functional algorithms scheduler_legacy_mode: bool = True """Logging""" print_freq: int = 10_000 output_level: int = INFO msg_svc_format: str = "% F%35W%S %7W%R%T %0W%M" msg_svc_time_format: str = "%Y-%m-%d %H:%M:%S UTC" python_logging_level: int = logging.INFO """Debugging""" # Dump monitoring entities (counters, histograms, etc.) monitoring_file: Optional[str] = None control_flow_file: Optional[str] = None data_flow_file: Optional[str] = None phoenix_filename: Optional[str] = None preamble_algs: list = [] # Define list of auditors to run. Possible common choices include # "NameAuditor", "MemoryAuditor" or "ChronoAuditor". # For a full list see Gaudi documentation. auditors: list[str] = [] event_timeout: Optional[int] = None # if set, we will call make_odin even if simulation is true. Else we use fake_odin in such case force_odin: bool = False # FIXME should be replaced by opt-in force_odin if not UseDD4Hep: velo_motion_system_yaml: Optional[str] = None
[docs] @model_validator(mode="before") @classmethod def n_event_slots_default(cls, data): if "n_event_slots" not in data: n_threads = data.get("n_threads", 1) data["n_event_slots"] = math.ceil(1.2 * n_threads) if n_threads > 1 else 1 return data
[docs] @field_validator("input_files", mode="before") def glob_input_files(cls, input_files): if isinstance(input_files, str): resolved_input_files = [] for pattern in _expand_braces(input_files): if "*" not in pattern: resolved_input_files.append(pattern) continue if pattern.startswith("root://"): raise NotImplementedError("Cannot glob with XRootD URLs") matches = glob.glob(pattern, recursive=True) if not matches: raise ValueError(f"No input files found matching {pattern!r}") resolved_input_files += matches return resolved_input_files return input_files
[docs] @field_validator("compression", mode="before") def parse_compression_str(cls, compression): if isinstance(compression, str): alg, level = compression.split(":", 1) alg = CompressionAlgs(alg) return CompressionSettings(algorithm=alg, level=int(level)) return compression
[docs] @model_validator(mode="after") def validate_input(self): if self.input_type == FileFormats.NONE: if self.evt_max < 0: raise ValueError( f"When running with input_type={self.input_type}, 'evt_max' must be >=0" ) elif not self.input_files: raise ValueError( f"'input_files' is required when input_type={self.input_type}" ) return self
@contextmanager def apply_binds(self): """Context manager to apply binds before the user function is called To avoid having to pass properties on the options object down many layers of functions applications can use this context manager to bind values before the user provided function is called. """ yield
[docs] def finalize(self): # HACK: Required for compatibility with the old options object pass
def _expand_braces(text): """Perform bash-like brace expansion See: https://www.gnu.org/software/bash/manual/html_node/Brace-Expansion.html There are two notable deviations from the bash behaviour: * Duplicates are removed from the output * The order of the returned results can differ """ seen = set() # HACK: Use a reserved unicode page to substitute patterns like {abc} that # don't contain a comma and should therefore have the curly braces preserved # in the output substitutions = {"\ue000": ""} for s in _expand_braces_impl(text, seen, substitutions): for k, v in reversed(substitutions.items()): s = s.replace(k, v) if s: yield s def _expand_braces_impl(text, seen, substitutions): int_range_pattern = r"[\-\+]?[0-9]+(\.[0-9]+)?(\.\.[\-\+]?[0-9]+(\.[0-9]+)?){1,2}" char_range_pattern = r"([a-z]\.\.[a-z]|[A-Z]\.\.[A-Z])(\.\.[\-\+]?[0-9]+)?" patterns = [ ",", r"([^{}]|{})*,([^{}]|{})+", r"([^{}]|{})+,([^{}]|{})*", int_range_pattern, char_range_pattern, r"([^{},]|{})+", ] spans = [m.span() for m in re.finditer(rf"{{({'|'.join(patterns)})}}", text)][::-1] if len(spans) == 0: if text not in seen: yield text seen.add(text) return alts = [] for start, stop in spans: alt_full = text[start:stop] alt = alt_full[1:-1].split(",") is_int_range = re.fullmatch(rf"{{{int_range_pattern}}}", alt_full) is_char_range = re.fullmatch(rf"{{{char_range_pattern}}}", alt_full) if is_int_range or is_char_range: range_args = alt[0].split("..") leading_zeros = 0 if any( len(x) > 1 and x.strip("-")[0] == "0" and x.strip("-") != "0" for x in range_args[:2] ): leading_zeros = max(map(len, range_args[:2])) start, stop = map(int if is_int_range else ord, range_args[:2]) step = int(range_args[2]) if len(range_args) == 3 else 0 step = 1 if step == 0 else abs(int(step)) if stop < start: step = -step stop = stop + int(step / abs(step)) alt = [ f"{s:0{leading_zeros}d}" if is_int_range else chr(s) for s in range(start, stop, step) ] elif len(alt) == 1: substitution = chr(0xE000 + len(substitutions)) substitutions[substitution] = alt_full alt = [substitution] alts.append(alt) for combo in product(*alts): replaced = list(text) for (start, stop), replacement in zip(spans, combo): # Add dummy charactors to prevent brace expansion being applied recursively # i.e. "{{0..1}2}" should become "{02}" "{12}" not "02" "12" replaced[start:stop] = f"\ue000{replacement}\ue000" yield from _expand_braces_impl("".join(replaced), seen, substitutions) class TestOptionsBase(BaseModel): """Specialized Options class only to be inherited when building an Option class dedicated to tests Essentially allows to use the TestFileDB for inputs, setting a number of options (input files, geometry and condition version, ...) from the given entry in TestFileDB """ testfiledb_key: str @model_validator(mode="before") @classmethod def validate_input(cls, data): from PRConfig import TestFileDB if not isinstance(data, dict): return data if "input_files" in data: raise ValueError( "Cannot set input_files directly, set testfiledb_key instead" ) if "testfiledb_key" not in data: raise ValueError("testfiledb_key is missing") tfdb_entry = TestFileDB.test_file_db.get(data["testfiledb_key"]) qualifiers = tfdb_entry.qualifiers data["input_files"] = tfdb_entry.filenames # for all other fields, allow overriding by the original yaml # so only set the value to the TestFileDB version is not already present if "input_type" not in data: file_format = qualifiers["Format"] data["input_type"] = "ROOT" if file_format != "MDF" else "RAW" if "data_type" not in data: data["data_type"] = qualifiers["DataType"] if "simulation" not in data: data["simulation"] = qualifiers["Simulation"] if "dddb_tag" not in data: data["dddb_tag"] = qualifiers["DDDB"] if "conddb_tag" not in data: data["conddb_tag"] = qualifiers["CondDB"] if "GeometryVersion" in qualifiers and "geometry_version" not in data: data["geometry_version"] = qualifiers["GeometryVersion"] if "ConditionsVersion" in qualifiers and "conditions_version" not in data: data["conditions_version"] = qualifiers["ConditionsVersion"] return data class TestOptions(TestOptionsBase, Options): """Specialized Options class for LHCb Tests""" pass