###############################################################################
# (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
import glob
import logging
import math
import re
from contextlib import contextmanager
from enum import Enum
from itertools import product
from typing import Annotated, Optional
from DDDB.CheckDD4Hep import UseDD4Hep
from Gaudi.Configuration import INFO
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from PyConf.packing import default_persistreco_version
class HltSourceID(str, Enum):
"""possible values of the Hlt source ID used to identify the origin of
the data in the various rawbanks produced by Hlt and/or Sprucing
"""
Hlt1 = "Hlt1"
Hlt2 = "Hlt2"
Spruce = "Spruce"
class InputProcessTypes(str, Enum):
"""
`TurboPass`, `Hlt2` and `Spruce` are standard run-3 input process types.
The extra types are available to effectively flag special input types
and be able to apply a filter only to processes where a `DstData` bank is produced (the standard 3 types).
See discussions at https://gitlab.cern.ch/lhcb/LHCb/-/merge_requests/3990 and related MR for further details.
"""
Spruce = "Spruce"
TurboPass = "TurboPass"
Hlt2 = (
"Hlt2" # This option should be removed when all data goes through the Sprucing
)
TurboSpruce = "TurboSpruce"
Gen = "Gen"
Brunel = "Brunel" # This option should be removed once support for old MC files is fully removed.
Boole = "Boole"
Hlt1 = "Hlt1"
def sourceID(self):
if self.value == InputProcessTypes.Spruce:
return HltSourceID.Spruce
if self.value == InputProcessTypes.TurboSpruce:
return HltSourceID.Spruce
if self.value == InputProcessTypes.Hlt1:
return HltSourceID.Hlt1
if self.value == InputProcessTypes.Hlt2:
return HltSourceID.Hlt2
if self.value == InputProcessTypes.TurboPass:
return HltSourceID.Hlt2
# raise ValueError("no sourceID known for input process {}".format(self.name))
# in the remaining caess, there is typically no DstData rawbank, but packed objects in a DST
# and no decreports -- so this method should not even be called. But it is, and the result
# is not actually used. Hence raising an exception isn't acceptable (yet!), and we just
# return something which if ever really used will become an error...
return "InvalidNotToBeUsed"
[docs]
class DataTypeEnum(Enum):
Upgrade = "Upgrade"
# Run 3
DT2022 = "2022"
DT2023 = "2023"
DT2024 = "2024"
DT2025 = "2025"
[docs]
class EventStores(str, Enum):
EvtStoreSvc = "EvtStoreSvc"
HiveWhiteBoard = "HiveWhiteBoard"
class CompressionAlgs(str, Enum):
ZLIB = "ZLIB"
LZMA = "LZMA"
LZ4 = "LZ4"
ZSTD = "ZSTD"
class CompressionSettings(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid", use_enum_values=True)
algorithm: CompressionAlgs = CompressionAlgs.ZSTD
level: int = 4
min_buffer_size: Optional[int] = None
max_buffer_size: Optional[int] = None
approx_events_per_basket: Optional[int] = None
def as_gaudi_config(self):
config = {"GlobalCompression": f"{self.algorithm}:{self.level}"}
if self.min_buffer_size is not None:
config["MinBufferSize"] = self.min_buffer_size
if self.max_buffer_size is not None:
config["MaxBufferSize"] = self.max_buffer_size
if self.approx_events_per_basket is not None:
config["ApproxEventsPerBasket"] = self.approx_events_per_basket
return config
class Options(BaseModel):
model_config = ConfigDict(use_enum_values=True, frozen=True, extra="forbid")
"""Conditions"""
data_type: Optional[DataTypeEnum] = "Upgrade"
simulation: bool
dddb_tag: Optional[str] = None
conddb_tag: Optional[str] = None
geometry_version: Optional[str] = None
# Backend to be used for the geometry, can be DD4Hep or DetDesc. Use "NONE" for disabling the geometry'
geometry_backend: str = "DD4Hep" if UseDD4Hep else "DetDesc"
conditions_version: Optional[str] = None
"""Input"""
input_files: list[str] = []
input_type: FileFormats = FileFormats.NONE
input_raw_format: float = 0.5
input_process: Optional[InputProcessTypes] = None
input_manifest_file: Optional[str] = None
input_stream: Optional[str] = ""
input_run_number: Optional[int] = None
persistreco_version: float = default_persistreco_version()
xml_file_catalog: Optional[str] = None
evt_max: int = -1
first_evt: int = 0
# number of events to pre-fetch, the default value (20) is reasonable for HLT2/Analysis. It needs to be increased for HLT1, typically to 20000
ioalg_buffer_nb_events: int = 20
# name of the ioalg to be used when input_type is MDF. Choices currently are IOAlgMemoryMap and IOAlgFileRead, the later being the default
mdf_ioalg_name: str = "IOAlgFileRead"
# name of the ioalg to be used when input_type is ROOT. Choices currently are RootIOAlg and RootIOAlgExt, the former being the default
root_ioalg_name: str = "RootIOAlg"
# dictionnary of properties to be passed to the RootIOAlg at construction time
root_ioalg_opts: dict = {}
# in case GaudiPython is used, set this to true in order to change
# the behavior of the scheduler accordingly
gaudipython_mode: bool = False
"""Output"""
output_file: Optional[str] = None
output_type: FileFormats = FileFormats.ROOT
output_manifest_file: Optional[str] = None
append_decoding_keys_to_output_manifest: bool = True
write_decoding_keys_to_git: Optional[bool] = None
write_options_to_fsr: bool = False
require_specific_decoding_keys: list[str] = []
compression: Optional[CompressionSettings] = None
histo_file: Optional[str] = None
ntuple_file: Optional[str] = None
ntuple_basketsize: Optional[int] = 32000 # ROOT default
xml_summary_file: Optional[str] = None
xml_summary_svc: Optional[str] = "XMLSummarySvc"
# write configuration of lines in streams to json at runtime
write_streams_attributes_to_json: bool = False
# Output json file for configuration of lines in streams
output_streams_attributes_file: Optional[str] = "line_attribute_dict.json"
# Input json file for configuration of lines in streams
input_streams_attributes_file: Optional[str] = "line_attribute_dict.json"
"""Processing"""
n_threads: int = 1
# defaults to 1.2 * n_threads
n_event_slots: Annotated[int, Field(validate_default=True)]
# Event store implementation: HiveWhiteBoard (default) or EvtStoreSvc (faster).
event_store: EventStores = EventStores.HiveWhiteBoard
# Estimated size of the per-event memory pool, zero disables the pool
memory_pool_size: int = 10 * 1024 * 1024
# If False, scheduler calls Algorithm::execute instead of
# Algorithm::sysExecute which breaks some non-functional algorithms
scheduler_legacy_mode: bool = True
"""Logging"""
print_freq: int = 10_000
output_level: int = INFO
msg_svc_format: str = "% F%35W%S %7W%R%T %0W%M"
msg_svc_time_format: str = "%Y-%m-%d %H:%M:%S UTC"
python_logging_level: int = logging.INFO
"""Debugging"""
# Dump monitoring entities (counters, histograms, etc.)
monitoring_file: Optional[str] = None
control_flow_file: Optional[str] = None
data_flow_file: Optional[str] = None
phoenix_filename: Optional[str] = None
preamble_algs: list = []
# Define list of auditors to run. Possible common choices include
# "NameAuditor", "MemoryAuditor" or "ChronoAuditor".
# For a full list see Gaudi documentation.
auditors: list[str] = []
event_timeout: Optional[int] = None
# if set, we will call make_odin even if simulation is true. Else we use fake_odin in such case
force_odin: bool = False # FIXME should be replaced by opt-in force_odin
if not UseDD4Hep:
velo_motion_system_yaml: Optional[str] = None
[docs]
@model_validator(mode="before")
@classmethod
def n_event_slots_default(cls, data):
if "n_event_slots" not in data:
n_threads = data.get("n_threads", 1)
data["n_event_slots"] = math.ceil(1.2 * n_threads) if n_threads > 1 else 1
return data
[docs]
@field_validator("compression", mode="before")
def parse_compression_str(cls, compression):
if isinstance(compression, str):
alg, level = compression.split(":", 1)
alg = CompressionAlgs(alg)
return CompressionSettings(algorithm=alg, level=int(level))
return compression
@contextmanager
def apply_binds(self):
"""Context manager to apply binds before the user function is called
To avoid having to pass properties on the options object down many
layers of functions applications can use this context manager to bind
values before the user provided function is called.
"""
yield
[docs]
def finalize(self):
# HACK: Required for compatibility with the old options object
pass
def _expand_braces(text):
"""Perform bash-like brace expansion
See: https://www.gnu.org/software/bash/manual/html_node/Brace-Expansion.html
There are two notable deviations from the bash behaviour:
* Duplicates are removed from the output
* The order of the returned results can differ
"""
seen = set()
# HACK: Use a reserved unicode page to substitute patterns like {abc} that
# don't contain a comma and should therefore have the curly braces preserved
# in the output
substitutions = {"\ue000": ""}
for s in _expand_braces_impl(text, seen, substitutions):
for k, v in reversed(substitutions.items()):
s = s.replace(k, v)
if s:
yield s
def _expand_braces_impl(text, seen, substitutions):
int_range_pattern = r"[\-\+]?[0-9]+(\.[0-9]+)?(\.\.[\-\+]?[0-9]+(\.[0-9]+)?){1,2}"
char_range_pattern = r"([a-z]\.\.[a-z]|[A-Z]\.\.[A-Z])(\.\.[\-\+]?[0-9]+)?"
patterns = [
",",
r"([^{}]|{})*,([^{}]|{})+",
r"([^{}]|{})+,([^{}]|{})*",
int_range_pattern,
char_range_pattern,
r"([^{},]|{})+",
]
spans = [m.span() for m in re.finditer(rf"{{({'|'.join(patterns)})}}", text)][::-1]
if len(spans) == 0:
if text not in seen:
yield text
seen.add(text)
return
alts = []
for start, stop in spans:
alt_full = text[start:stop]
alt = alt_full[1:-1].split(",")
is_int_range = re.fullmatch(rf"{{{int_range_pattern}}}", alt_full)
is_char_range = re.fullmatch(rf"{{{char_range_pattern}}}", alt_full)
if is_int_range or is_char_range:
range_args = alt[0].split("..")
leading_zeros = 0
if any(
len(x) > 1 and x.strip("-")[0] == "0" and x.strip("-") != "0"
for x in range_args[:2]
):
leading_zeros = max(map(len, range_args[:2]))
start, stop = map(int if is_int_range else ord, range_args[:2])
step = int(range_args[2]) if len(range_args) == 3 else 0
step = 1 if step == 0 else abs(int(step))
if stop < start:
step = -step
stop = stop + int(step / abs(step))
alt = [
f"{s:0{leading_zeros}d}" if is_int_range else chr(s)
for s in range(start, stop, step)
]
elif len(alt) == 1:
substitution = chr(0xE000 + len(substitutions))
substitutions[substitution] = alt_full
alt = [substitution]
alts.append(alt)
for combo in product(*alts):
replaced = list(text)
for (start, stop), replacement in zip(spans, combo):
# Add dummy charactors to prevent brace expansion being applied recursively
# i.e. "{{0..1}2}" should become "{02}" "{12}" not "02" "12"
replaced[start:stop] = f"\ue000{replacement}\ue000"
yield from _expand_braces_impl("".join(replaced), seen, substitutions)
class TestOptionsBase(BaseModel):
"""Specialized Options class only to be inherited when building an Option class dedicated to tests
Essentially allows to use the TestFileDB for inputs, setting a number
of options (input files, geometry and condition version, ...) from the
given entry in TestFileDB
"""
testfiledb_key: str
@model_validator(mode="before")
@classmethod
def validate_input(cls, data):
from PRConfig import TestFileDB
if not isinstance(data, dict):
return data
if "input_files" in data:
raise ValueError(
"Cannot set input_files directly, set testfiledb_key instead"
)
if "testfiledb_key" not in data:
raise ValueError("testfiledb_key is missing")
tfdb_entry = TestFileDB.test_file_db.get(data["testfiledb_key"])
qualifiers = tfdb_entry.qualifiers
data["input_files"] = tfdb_entry.filenames
# for all other fields, allow overriding by the original yaml
# so only set the value to the TestFileDB version is not already present
if "input_type" not in data:
file_format = qualifiers["Format"]
data["input_type"] = "ROOT" if file_format != "MDF" else "RAW"
if "data_type" not in data:
data["data_type"] = qualifiers["DataType"]
if "simulation" not in data:
data["simulation"] = qualifiers["Simulation"]
if "dddb_tag" not in data:
data["dddb_tag"] = qualifiers["DDDB"]
if "conddb_tag" not in data:
data["conddb_tag"] = qualifiers["CondDB"]
if "GeometryVersion" in qualifiers and "geometry_version" not in data:
data["geometry_version"] = qualifiers["GeometryVersion"]
if "ConditionsVersion" in qualifiers and "conditions_version" not in data:
data["conditions_version"] = qualifiers["ConditionsVersion"]
return data
class TestOptions(TestOptionsBase, Options):
"""Specialized Options class for LHCb Tests"""
pass