Source code for FunTuple.common_util
###############################################################################
# (c) Copyright 2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
from .FunctorCollection import FunctorCollection
import re
from DaVinciTools import SimplifiedDecayParser # type: ignore[import]
import logging
[docs]
def prepend_fieldname_to_arrayindex(functorcoll, field_name):
# define an inner function returning a new key with prepending field_name
def _prepend_fieldname(k_old):
return re.sub(
r"\[[^]]*\]", lambda x: x.group(0).replace("[", f"[{field_name}_"), k_old
)
# define an inner function that adds a new item, removing the old one
def _addnew_popold(k_new, k_old):
# get the value
val = functorcoll[k_old]
# add the new key with same value
functorcoll[k_new] = val
# remove the old item
functorcoll.pop(k_old)
# loop through all the keys of the functorcollection making a copy with list().
# Cannot simply do ".keys()" here since we end removing it
dict_oldkey_newkey = {}
for key in functorcoll.functor_dict.keys():
if "[" in key and "]" in key:
# modify the old key i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]"
key_new = _prepend_fieldname(key)
# print(f'Prepending the field name to the array variable {key}. The new key is {key_new}.')
dict_oldkey_newkey[key] = key_new
elif "[" in key and "]" not in key:
# complain if no opening square brackets are given
raise ValueError(
f'Name for the array variable {key} contains only opening square brackets i.e. "[". Please specify a name for the array index within square brackets e.g. "PVX[nPV]"'
) from None
elif "[" not in key and "]" in key:
# complain if no opening square brackets are given
raise ValueError(
f'Name for the array variable {key} contains only closing square brackets i.e. "]". Please specify a name for the array index within square brackets e.g. "PVX[nPV]"'
) from None
# add the new key removing the old one
if dict_oldkey_newkey:
for key in dict_oldkey_newkey:
_addnew_popold(dict_oldkey_newkey[key], key)
[docs]
def check_type(var, attribute_name, required_type):
"""
Internal function to check that the type of 'var' is the required type 'required_type'.
TODO: This function is not needed if we use 'typing' module, first check what python3 version is assumed for testing.
"""
if not isinstance(var, required_type):
raise TypeError(
f"The '{attribute_name}' should be of type {required_type}. Instead it is of type {type(var)}. Please check!"
) from None
return True
[docs]
def conduct_checks_and_transform(name, tuple_name, fields, variables, loki_preamble):
"""
Internal function that takes as input various arguments needed by algorithms
`FunTupleBase_Particles` and `FunTupleBase_MCParticles` and:
- Conducts various type checks on the arguments.
(TODO: need to replace with 'typing' module depending on what python3 version is assuming for testing)
- Transforms the inputs into an output tuple then directly used to set the attributes of the FunTupleBase classes
`FunTupleBase_Particles` or `FunTupleBase_MCParticles`.
"""
# Type checks for the attributes (replace the type checking with typing module introduced in python 3.5 onwards, check what is the default python3 version used for tests)
_ = check_type(name, "name", str)
_ = check_type(tuple_name, "tuple_name", str)
_ = check_type(fields, "fields", dict)
_ = check_type(variables, "variables", dict)
_ = check_type(loki_preamble, "loki_preamble", list)
# A dummy tuple with no fields is not allowed except if fields for all particles
# are going to be defined via the content of variables["ALL"]
if len(fields) == 0:
if "ALL" not in variables:
raise RuntimeError("Nothing is defined to be tupled. Please check!")
# checks pertaining to the keys of the fields and make list of fields names
field_names_prefix = [
k
for i, k in enumerate(fields.keys())
if check_type(k, f'"field" key "{i}"', str)
]
if "all" in [bn.lower() for bn in field_names_prefix]:
raise ValueError(
"The decay_descriptors contains a special string ('ALL') as key. Such a key should not be used here but rather when setting the 'variables' attribute."
) from None
# checks pertaining to the keys of the variables
variables_keys = [
k
for i, k in enumerate(variables.keys())
if check_type(k, "variable key_" + str(i), str)
]
# get the FunctorCollections for the special field name "ALL" (used to add to other field names)
# and the list of variables keys without ALL (used later for checks)
variables_all = None
variables_keys_wo_ALL = []
for vk in variables_keys:
if vk == "ALL":
variables_all = variables[vk]
if not isinstance(variables_all, FunctorCollection):
raise TypeError(
f"The value of variables['{vk}'], for field name '{vk}', is not of type FunctorCollection but of type '{type(variables_all)}'. Please check!"
) from None
else:
variables_keys_wo_ALL += [vk]
# raise an error if the variable attribute contains field names that are not defined in the actual field names of fields attribute
vbn_nofields = [
vbn for vbn in variables_keys_wo_ALL if vbn not in field_names_prefix
]
if vbn_nofields:
raise RuntimeError(
f"Defined variables for field name(s) '{vbn_nofields}' but they do no match the specified field names '{field_names_prefix}'. Please check!"
)
# checks pertaining to the values of the fields and make list of decay descriptors
decay_descriptors = [
fields[k]
for k in field_names_prefix
if check_type(fields[k], "fields['" + str(k) + "']", str)
]
# check that the decay descriptors do not contain more than two caret symbols (i.e. "^")
for indx, dd in enumerate(decay_descriptors):
if dd.count("^") > 1:
raise SyntaxError(
f"The decay descriptor '{dd}' for the field name '{field_names_prefix[indx]}' contains more than one caret symbol ('^'). Please check!"
)
# check for []CC
all_list_of_cc_in_self_conjugate = []
for decay_descriptor in decay_descriptors:
# Drop the hat here
descriptor = decay_descriptor.replace("^", "")
# Add to result
try:
all_list_of_cc_in_self_conjugate += SimplifiedDecayParser(
descriptor
).list_cc_in_self_conjugate()
except (RuntimeError, ValueError, AttributeError) as e:
log.warning(
f"{name}: SimplifiedDecayParser can not parse the decay descriptor {descriptor}, error message: {e}."
)
# Throw an warning
if all_list_of_cc_in_self_conjugate:
unique_list_of_cc_in_self_conjugate = list(
set(all_list_of_cc_in_self_conjugate)
)
log.warning(
f"{name}: Particles {unique_list_of_cc_in_self_conjugate} are marked with `[]CC` and self-conjugated. Are you sure this is really what you meant?"
)
# checks pertaining to loki_preamble that all entries must be strings
_ = [
k
for i, k in enumerate(loki_preamble)
if check_type(k, "loki_entry_" + str(i), str)
]
# make list of loki and thor functors. LoKi functor codes are strings
loki_functors, loki_functor_field_names = [], []
thor_functors, thor_functor_field_names = [], []
# make a list of fields key for which there are no variables specified (used to remove the fields and decay descriptor later)
remove_indices = []
for indx, k in enumerate(field_names_prefix):
funct_coll_k = None
if k in variables:
funct_coll_k = variables[k]
if not isinstance(funct_coll_k, FunctorCollection):
raise TypeError(
f"The variables for the field name '{k}' is not of type FunctorCollection. Please check!"
) from None
if variables_all:
# if ALL special key is specified and it has array variables then append the
# field name to the array index i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]"
new_var_all = FunctorCollection(variables_all.functor_dict)
prepend_fieldname_to_arrayindex(new_var_all, k)
# Merge the field specific functors and ALL functors into one.
# A warning is printed if common items in both. In such a case, the common entry from funct_coll_k is kept (should this throw an error instead?)
funct_coll_k += new_var_all
else:
if variables_all:
# if ALL special key is specified and it has array variables then append the
# field name to the array index i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]"
funct_coll_k = FunctorCollection(variables_all.functor_dict)
prepend_fieldname_to_arrayindex(funct_coll_k, k)
else:
# collect indices for field names for which there are no variables specified
remove_indices += [indx]
continue
# get the field specific loki and thor functors and append to the list
bn_loki = funct_coll_k.get_loki_functors()
bn_thor = funct_coll_k.get_thor_functors()
loki_functor_field_names += [list(bn_loki.keys())]
loki_functors += [list(bn_loki.values())]
thor_functor_field_names += [list(bn_thor.keys())]
thor_functors += [list(bn_thor.values())]
# If field names are defined and no variables are added remove the field names and corresponding decay descriptors
if remove_indices:
for index in sorted(remove_indices, reverse=True):
del field_names_prefix[index]
del decay_descriptors[index]
# pack the output
output_args = (
field_names_prefix,
decay_descriptors,
loki_functors,
loki_functor_field_names,
thor_functors,
thor_functor_field_names,
)
return output_args