Source code for FunTuple.common_util

###############################################################################
# (c) Copyright 2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################

from .FunctorCollection import FunctorCollection
import re
from DaVinciTools import SimplifiedDecayParser  # type: ignore[import]
import logging

[docs] log = logging.getLogger(__name__)
[docs] def prepend_fieldname_to_arrayindex(functorcoll, field_name): # define an inner function returning a new key with prepending field_name def _prepend_fieldname(k_old): return re.sub( r"\[[^]]*\]", lambda x: x.group(0).replace("[", f"[{field_name}_"), k_old ) # define an inner function that adds a new item, removing the old one def _addnew_popold(k_new, k_old): # get the value val = functorcoll[k_old] # add the new key with same value functorcoll[k_new] = val # remove the old item functorcoll.pop(k_old) # loop through all the keys of the functorcollection making a copy with list(). # Cannot simply do ".keys()" here since we end removing it dict_oldkey_newkey = {} for key in functorcoll.functor_dict.keys(): if "[" in key and "]" in key: # modify the old key i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]" key_new = _prepend_fieldname(key) # print(f'Prepending the field name to the array variable {key}. The new key is {key_new}.') dict_oldkey_newkey[key] = key_new elif "[" in key and "]" not in key: # complain if no opening square brackets are given raise ValueError( f'Name for the array variable {key} contains only opening square brackets i.e. "[". Please specify a name for the array index within square brackets e.g. "PVX[nPV]"' ) from None elif "[" not in key and "]" in key: # complain if no opening square brackets are given raise ValueError( f'Name for the array variable {key} contains only closing square brackets i.e. "]". Please specify a name for the array index within square brackets e.g. "PVX[nPV]"' ) from None # add the new key removing the old one if dict_oldkey_newkey: for key in dict_oldkey_newkey: _addnew_popold(dict_oldkey_newkey[key], key)
[docs] def check_type(var, attribute_name, required_type): """ Internal function to check that the type of 'var' is the required type 'required_type'. TODO: This function is not needed if we use 'typing' module, first check what python3 version is assumed for testing. """ if not isinstance(var, required_type): raise TypeError( f"The '{attribute_name}' should be of type {required_type}. Instead it is of type {type(var)}. Please check!" ) from None return True
[docs] def conduct_checks_and_transform(name, tuple_name, fields, variables, loki_preamble): """ Internal function that takes as input various arguments needed by algorithms `FunTupleBase_Particles` and `FunTupleBase_MCParticles` and: - Conducts various type checks on the arguments. (TODO: need to replace with 'typing' module depending on what python3 version is assuming for testing) - Transforms the inputs into an output tuple then directly used to set the attributes of the FunTupleBase classes `FunTupleBase_Particles` or `FunTupleBase_MCParticles`. """ # Type checks for the attributes (replace the type checking with typing module introduced in python 3.5 onwards, check what is the default python3 version used for tests) _ = check_type(name, "name", str) _ = check_type(tuple_name, "tuple_name", str) _ = check_type(fields, "fields", dict) _ = check_type(variables, "variables", dict) _ = check_type(loki_preamble, "loki_preamble", list) # A dummy tuple with no fields is not allowed except if fields for all particles # are going to be defined via the content of variables["ALL"] if len(fields) == 0: if "ALL" not in variables: raise RuntimeError("Nothing is defined to be tupled. Please check!") # checks pertaining to the keys of the fields and make list of fields names field_names_prefix = [ k for i, k in enumerate(fields.keys()) if check_type(k, f'"field" key "{i}"', str) ] if "all" in [bn.lower() for bn in field_names_prefix]: raise ValueError( "The decay_descriptors contains a special string ('ALL') as key. Such a key should not be used here but rather when setting the 'variables' attribute." ) from None # checks pertaining to the keys of the variables variables_keys = [ k for i, k in enumerate(variables.keys()) if check_type(k, "variable key_" + str(i), str) ] # get the FunctorCollections for the special field name "ALL" (used to add to other field names) # and the list of variables keys without ALL (used later for checks) variables_all = None variables_keys_wo_ALL = [] for vk in variables_keys: if vk == "ALL": variables_all = variables[vk] if not isinstance(variables_all, FunctorCollection): raise TypeError( f"The value of variables['{vk}'], for field name '{vk}', is not of type FunctorCollection but of type '{type(variables_all)}'. Please check!" ) from None else: variables_keys_wo_ALL += [vk] # raise an error if the variable attribute contains field names that are not defined in the actual field names of fields attribute vbn_nofields = [ vbn for vbn in variables_keys_wo_ALL if vbn not in field_names_prefix ] if vbn_nofields: raise RuntimeError( f"Defined variables for field name(s) '{vbn_nofields}' but they do no match the specified field names '{field_names_prefix}'. Please check!" ) # checks pertaining to the values of the fields and make list of decay descriptors decay_descriptors = [ fields[k] for k in field_names_prefix if check_type(fields[k], "fields['" + str(k) + "']", str) ] # check that the decay descriptors do not contain more than two caret symbols (i.e. "^") for indx, dd in enumerate(decay_descriptors): if dd.count("^") > 1: raise SyntaxError( f"The decay descriptor '{dd}' for the field name '{field_names_prefix[indx]}' contains more than one caret symbol ('^'). Please check!" ) # check for []CC all_list_of_cc_in_self_conjugate = [] for decay_descriptor in decay_descriptors: # Drop the hat here descriptor = decay_descriptor.replace("^", "") # Add to result try: all_list_of_cc_in_self_conjugate += SimplifiedDecayParser( descriptor ).list_cc_in_self_conjugate() except (RuntimeError, ValueError, AttributeError) as e: log.warning( f"{name}: SimplifiedDecayParser can not parse the decay descriptor {descriptor}, error message: {e}." ) # Throw an warning if all_list_of_cc_in_self_conjugate: unique_list_of_cc_in_self_conjugate = list( set(all_list_of_cc_in_self_conjugate) ) log.warning( f"{name}: Particles {unique_list_of_cc_in_self_conjugate} are marked with `[]CC` and self-conjugated. Are you sure this is really what you meant?" ) # checks pertaining to loki_preamble that all entries must be strings _ = [ k for i, k in enumerate(loki_preamble) if check_type(k, "loki_entry_" + str(i), str) ] # make list of loki and thor functors. LoKi functor codes are strings loki_functors, loki_functor_field_names = [], [] thor_functors, thor_functor_field_names = [], [] # make a list of fields key for which there are no variables specified (used to remove the fields and decay descriptor later) remove_indices = [] for indx, k in enumerate(field_names_prefix): funct_coll_k = None if k in variables: funct_coll_k = variables[k] if not isinstance(funct_coll_k, FunctorCollection): raise TypeError( f"The variables for the field name '{k}' is not of type FunctorCollection. Please check!" ) from None if variables_all: # if ALL special key is specified and it has array variables then append the # field name to the array index i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]" new_var_all = FunctorCollection(variables_all.functor_dict) prepend_fieldname_to_arrayindex(new_var_all, k) # Merge the field specific functors and ALL functors into one. # A warning is printed if common items in both. In such a case, the common entry from funct_coll_k is kept (should this throw an error instead?) funct_coll_k += new_var_all else: if variables_all: # if ALL special key is specified and it has array variables then append the # field name to the array index i.e. "PVX[nPV]" -> "PVX[<field_name>_nPV]" funct_coll_k = FunctorCollection(variables_all.functor_dict) prepend_fieldname_to_arrayindex(funct_coll_k, k) else: # collect indices for field names for which there are no variables specified remove_indices += [indx] continue # get the field specific loki and thor functors and append to the list bn_loki = funct_coll_k.get_loki_functors() bn_thor = funct_coll_k.get_thor_functors() loki_functor_field_names += [list(bn_loki.keys())] loki_functors += [list(bn_loki.values())] thor_functor_field_names += [list(bn_thor.keys())] thor_functors += [list(bn_thor.values())] # If field names are defined and no variables are added remove the field names and corresponding decay descriptors if remove_indices: for index in sorted(remove_indices, reverse=True): del field_names_prefix[index] del decay_descriptors[index] # pack the output output_args = ( field_names_prefix, decay_descriptors, loki_functors, loki_functor_field_names, thor_functors, thor_functor_field_names, ) return output_args