Source code for GaudiConf.LbExec.options

###############################################################################
# (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import glob
import logging
import math
import re
from contextlib import contextmanager
from enum import Enum
from itertools import product
from typing import Annotated, Optional

from DDDB.CheckDD4Hep import UseDD4Hep
from Gaudi.Configuration import INFO
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from PyConf.packing import default_persistreco_version


class HltSourceID(str, Enum):
    """possible values of the Hlt source ID used to identify the origin of
    the data in the various rawbanks produced by Hlt and/or Sprucing
    """

    Hlt1 = "Hlt1"
    Hlt2 = "Hlt2"
    Spruce = "Spruce"


class InputProcessTypes(str, Enum):
    """
    `TurboPass`, `Hlt2` and `Spruce` are standard run-3 input process types.
    The extra types are available to effectively flag special input types
    and be able to apply a filter only to processes where a `DstData` bank is produced (the standard 3 types).
    See discussions at https://gitlab.cern.ch/lhcb/LHCb/-/merge_requests/3990 and related MR for further details.
    """

    Spruce = "Spruce"
    TurboPass = "TurboPass"
    Hlt2 = (
        "Hlt2"  # This option should be removed when all data goes through the Sprucing
    )
    TurboSpruce = "TurboSpruce"
    Gen = "Gen"
    Brunel = "Brunel"  # This option should be removed once support for old MC files is fully removed.
    Boole = "Boole"
    Hlt1 = "Hlt1"

    def sourceID(self):
        if self.value == InputProcessTypes.Spruce:
            return HltSourceID.Spruce
        if self.value == InputProcessTypes.TurboSpruce:
            return HltSourceID.Spruce
        if self.value == InputProcessTypes.Hlt1:
            return HltSourceID.Hlt1
        if self.value == InputProcessTypes.Hlt2:
            return HltSourceID.Hlt2
        if self.value == InputProcessTypes.TurboPass:
            return HltSourceID.Hlt2
        # raise ValueError("no sourceID known for input process {}".format(self.name))

        # in the remaining caess, there is typically no DstData rawbank, but packed objects in a DST
        # and no decreports -- so this method should not even be called. But it is, and the result
        # is not actually used. Hence raising an exception isn't acceptable (yet!), and we just
        # return something which if ever really used will become an error...
        return "InvalidNotToBeUsed"


[docs] class DataTypeEnum(Enum): Upgrade = "Upgrade" # Run 3 DT2022 = "2022" DT2023 = "2023" DT2024 = "2024" DT2025 = "2025"
[docs] class FileFormats(str, Enum): NONE = "NONE" RAW = "RAW" ROOT = "ROOT"
[docs] class EventStores(str, Enum): EvtStoreSvc = "EvtStoreSvc" HiveWhiteBoard = "HiveWhiteBoard"
class CompressionAlgs(str, Enum): ZLIB = "ZLIB" LZMA = "LZMA" LZ4 = "LZ4" ZSTD = "ZSTD" class CompressionSettings(BaseModel): model_config = ConfigDict(frozen=True, extra="forbid", use_enum_values=True) algorithm: CompressionAlgs = CompressionAlgs.ZSTD level: int = 4 min_buffer_size: Optional[int] = None max_buffer_size: Optional[int] = None approx_events_per_basket: Optional[int] = None def as_gaudi_config(self): config = {"GlobalCompression": f"{self.algorithm}:{self.level}"} if self.min_buffer_size is not None: config["MinBufferSize"] = self.min_buffer_size if self.max_buffer_size is not None: config["MaxBufferSize"] = self.max_buffer_size if self.approx_events_per_basket is not None: config["ApproxEventsPerBasket"] = self.approx_events_per_basket return config class Options(BaseModel): model_config = ConfigDict(use_enum_values=True, frozen=True, extra="forbid") """Conditions""" data_type: Optional[DataTypeEnum] = "Upgrade" simulation: bool dddb_tag: Optional[str] = None conddb_tag: Optional[str] = None geometry_version: Optional[str] = None # Backend to be used for the geometry, can be DD4Hep or DetDesc. Use "NONE" for disabling the geometry' geometry_backend: str = "DD4Hep" if UseDD4Hep else "DetDesc" conditions_version: Optional[str] = None """Input""" input_files: list[str] = [] input_type: FileFormats = FileFormats.NONE input_raw_format: float = 0.5 input_process: Optional[InputProcessTypes] = None input_manifest_file: Optional[str] = None input_stream: Optional[str] = "" input_run_number: Optional[int] = None persistreco_version: float = default_persistreco_version() xml_file_catalog: Optional[str] = None evt_max: int = -1 first_evt: int = 0 # number of events to pre-fetch, the default value (20) is reasonable for HLT2/Analysis. It needs to be increased for HLT1, typically to 20000 ioalg_buffer_nb_events: int = 20 # name of the ioalg to be used when input_type is MDF. Choices currently are IOAlgMemoryMap and IOAlgFileRead, the later being the default mdf_ioalg_name: str = "IOAlgFileRead" # name of the ioalg to be used when input_type is ROOT. Choices currently are RootIOAlg and RootIOAlgExt, the former being the default root_ioalg_name: str = "RootIOAlg" # dictionnary of properties to be passed to the RootIOAlg at construction time root_ioalg_opts: dict = {} # in case GaudiPython is used, set this to true in order to change # the behavior of the scheduler accordingly gaudipython_mode: bool = False """Output""" output_file: Optional[str] = None output_type: FileFormats = FileFormats.ROOT output_manifest_file: Optional[str] = None append_decoding_keys_to_output_manifest: bool = True write_decoding_keys_to_git: Optional[bool] = None write_options_to_fsr: bool = False require_specific_decoding_keys: list[str] = [] compression: Optional[CompressionSettings] = None histo_file: Optional[str] = None ntuple_file: Optional[str] = None ntuple_basketsize: Optional[int] = 32000 # ROOT default xml_summary_file: Optional[str] = None xml_summary_svc: Optional[str] = "XMLSummarySvc" # write configuration of lines in streams to json at runtime write_streams_attributes_to_json: bool = False # Output json file for configuration of lines in streams output_streams_attributes_file: Optional[str] = "line_attribute_dict.json" # Input json file for configuration of lines in streams input_streams_attributes_file: Optional[str] = "line_attribute_dict.json" """Processing""" n_threads: int = 1 # defaults to 1.2 * n_threads n_event_slots: Annotated[int, Field(validate_default=True)] # Event store implementation: HiveWhiteBoard (default) or EvtStoreSvc (faster). event_store: EventStores = EventStores.HiveWhiteBoard # Estimated size of the per-event memory pool, zero disables the pool memory_pool_size: int = 10 * 1024 * 1024 # If False, scheduler calls Algorithm::execute instead of # Algorithm::sysExecute which breaks some non-functional algorithms scheduler_legacy_mode: bool = True """Logging""" print_freq: int = 10_000 output_level: int = INFO msg_svc_format: str = "% F%35W%S %7W%R%T %0W%M" msg_svc_time_format: str = "%Y-%m-%d %H:%M:%S UTC" python_logging_level: int = logging.INFO """Debugging""" # Dump monitoring entities (counters, histograms, etc.) monitoring_file: Optional[str] = None control_flow_file: Optional[str] = None data_flow_file: Optional[str] = None phoenix_filename: Optional[str] = None preamble_algs: list = [] # Define list of auditors to run. Possible common choices include # "NameAuditor", "MemoryAuditor" or "ChronoAuditor". # For a full list see Gaudi documentation. auditors: list[str] = [] event_timeout: Optional[int] = None # if set, we will call make_odin even if simulation is true. Else we use fake_odin in such case force_odin: bool = False # FIXME should be replaced by opt-in force_odin if not UseDD4Hep: velo_motion_system_yaml: Optional[str] = None
[docs] @model_validator(mode="before") @classmethod def n_event_slots_default(cls, data): if "n_event_slots" not in data: n_threads = data.get("n_threads", 1) data["n_event_slots"] = math.ceil(1.2 * n_threads) if n_threads > 1 else 1 return data
[docs] @field_validator("input_files", mode="before") def glob_input_files(cls, input_files): if isinstance(input_files, str): resolved_input_files = [] for pattern in _expand_braces(input_files): if "*" not in pattern: resolved_input_files.append(pattern) continue if pattern.startswith("root://"): raise NotImplementedError("Cannot glob with XRootD URLs") matches = glob.glob(pattern, recursive=True) if not matches: raise ValueError(f"No input files found matching {pattern!r}") resolved_input_files += matches return resolved_input_files return input_files
[docs] @field_validator("compression", mode="before") def parse_compression_str(cls, compression): if isinstance(compression, str): alg, level = compression.split(":", 1) alg = CompressionAlgs(alg) return CompressionSettings(algorithm=alg, level=int(level)) return compression
[docs] @model_validator(mode="after") def validate_input(self): if self.input_type == FileFormats.NONE: if self.evt_max < 0: raise ValueError( f"When running with input_type={self.input_type}, 'evt_max' must be >=0" ) elif not self.input_files: raise ValueError( f"'input_files' is required when input_type={self.input_type}" ) return self
@contextmanager def apply_binds(self): """Context manager to apply binds before the user function is called To avoid having to pass properties on the options object down many layers of functions applications can use this context manager to bind values before the user provided function is called. """ yield
[docs] def finalize(self): # HACK: Required for compatibility with the old options object pass
def _expand_braces(text): """Perform bash-like brace expansion See: https://www.gnu.org/software/bash/manual/html_node/Brace-Expansion.html There are two notable deviations from the bash behaviour: * Duplicates are removed from the output * The order of the returned results can differ """ seen = set() # HACK: Use a reserved unicode page to substitute patterns like {abc} that # don't contain a comma and should therefore have the curly braces preserved # in the output substitutions = {"\ue000": ""} for s in _expand_braces_impl(text, seen, substitutions): for k, v in reversed(substitutions.items()): s = s.replace(k, v) if s: yield s def _expand_braces_impl(text, seen, substitutions): int_range_pattern = r"[\-\+]?[0-9]+(\.[0-9]+)?(\.\.[\-\+]?[0-9]+(\.[0-9]+)?){1,2}" char_range_pattern = r"([a-z]\.\.[a-z]|[A-Z]\.\.[A-Z])(\.\.[\-\+]?[0-9]+)?" patterns = [ ",", r"([^{}]|{})*,([^{}]|{})+", r"([^{}]|{})+,([^{}]|{})*", int_range_pattern, char_range_pattern, r"([^{},]|{})+", ] spans = [m.span() for m in re.finditer(rf"{{({'|'.join(patterns)})}}", text)][::-1] if len(spans) == 0: if text not in seen: yield text seen.add(text) return alts = [] for start, stop in spans: alt_full = text[start:stop] alt = alt_full[1:-1].split(",") is_int_range = re.fullmatch(rf"{{{int_range_pattern}}}", alt_full) is_char_range = re.fullmatch(rf"{{{char_range_pattern}}}", alt_full) if is_int_range or is_char_range: range_args = alt[0].split("..") leading_zeros = 0 if any( len(x) > 1 and x.strip("-")[0] == "0" and x.strip("-") != "0" for x in range_args[:2] ): leading_zeros = max(map(len, range_args[:2])) start, stop = map(int if is_int_range else ord, range_args[:2]) step = int(range_args[2]) if len(range_args) == 3 else 0 step = 1 if step == 0 else abs(int(step)) if stop < start: step = -step stop = stop + int(step / abs(step)) alt = [ f"{s:0{leading_zeros}d}" if is_int_range else chr(s) for s in range(start, stop, step) ] elif len(alt) == 1: substitution = chr(0xE000 + len(substitutions)) substitutions[substitution] = alt_full alt = [substitution] alts.append(alt) for combo in product(*alts): replaced = list(text) for (start, stop), replacement in zip(spans, combo): # Add dummy charactors to prevent brace expansion being applied recursively # i.e. "{{0..1}2}" should become "{02}" "{12}" not "02" "12" replaced[start:stop] = f"\ue000{replacement}\ue000" yield from _expand_braces_impl("".join(replaced), seen, substitutions) class TestOptionsBase(BaseModel): """Specialized Options class only to be inherited when building an Option class dedicated to tests Essentially allows to use the TestFileDB for inputs, setting a number of options (input files, geometry and condition version, ...) from the given entry in TestFileDB """ testfiledb_key: str @model_validator(mode="before") @classmethod def validate_input(cls, data): from PRConfig import TestFileDB if not isinstance(data, dict): return data if "input_files" in data: raise ValueError( "Cannot set input_files directly, set testfiledb_key instead" ) if "testfiledb_key" not in data: raise ValueError("testfiledb_key is missing") tfdb_entry = TestFileDB.test_file_db.get(data["testfiledb_key"]) qualifiers = tfdb_entry.qualifiers data["input_files"] = tfdb_entry.filenames # for all other fields, allow overriding by the original yaml # so only set the value to the TestFileDB version is not already present if "input_type" not in data: file_format = qualifiers["Format"] data["input_type"] = "ROOT" if file_format != "MDF" else "RAW" if "data_type" not in data: data["data_type"] = qualifiers["DataType"] if "simulation" not in data: data["simulation"] = qualifiers["Simulation"] if "dddb_tag" not in data: data["dddb_tag"] = qualifiers["DDDB"] if "conddb_tag" not in data: data["conddb_tag"] = qualifiers["CondDB"] if "GeometryVersion" in qualifiers and "geometry_version" not in data: data["geometry_version"] = qualifiers["GeometryVersion"] if "ConditionsVersion" in qualifiers and "conditions_version" not in data: data["conditions_version"] = qualifiers["ConditionsVersion"] return data class TestOptions(TestOptionsBase, Options): """Specialized Options class for LHCb Tests""" pass