Source code for DaVinci.config

###############################################################################
# (c) Copyright 2021-2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
"""
DaVinci configured using PyConf components.
"""

import logging, json
from pathlib import Path
from collections import namedtuple, OrderedDict
from Configurables import ApplicationMgr  # type: ignore[import]
from PyConf.application import (  # type: ignore[import]
    configure,
    configured_ann_svc,
    metainfo_repos,
)  # type: ignore[import]
from PyConf.control_flow import CompositeNode, NodeLogic  # type: ignore[import]
from PyConf.components import setup_component  # type: ignore[import]
from PyConf.utils import get_fsr_configurations  # type: ignore[import]
from DaVinci.algorithms import make_fsr_algs, apply_filters

[docs] log = logging.getLogger(__name__)
[docs] class DVNode(namedtuple("DVNode", ["node", "extra_outputs"])): # noqa """Immutable object fully qualifying a DaVinci node. Copied from the `HltLine` class without the prescaler argument. Attributes: node (CompositeNode): the control flow node of the line """
[docs] __slots__ = () # do not add __dict__ (preserve immutability)
def __new__(cls, name, algs, extra_outputs=None): """Initialize a DaVinci node from name and a set of algorithms. Creates a control flow `CompositeNode` with the given `algs` combined with `LAZY_AND` logic. Args: name (str): name of the line algs: iterable of algorithms extra_outputs (iterable of 2-tuple): List of (name, DataHandle) pairs. """ node = CompositeNode( name, tuple(algs), combine_logic=NodeLogic.LAZY_AND, force_order=True ) if extra_outputs is None: extra_outputs = [] return super().__new__(cls, node, frozenset(extra_outputs)) @property
[docs] def name(self): """DVNode (CompositeNode instance) name.""" return self.node.name
[docs] def davinci_control_flow(options, user_analysis_nodes=[], fsr_nodes=[]): """ DaVinci control flow is split in a few sections as described in DaVinci/issue#2 (then simplified) .. code-block:: text DaVinci (LAZY_AND) ├── LuminosityNode (NONLAZY_OR) │ └── EventAccounting/EventAccount └── UserAnalysisNode (NONLAZY_OR) ├── DVUser1Node (LAZY_AND) │ ├── PVFilter │ ├── ParticlesFilter │ ├── Candidate1Combiner │ ├── AlgorithmsForTuple1 │ ├── Tuple1 └── DVUser2Node (LAZY_AND) ├── PVFilter ├── ParticlesFilter ├── Candidate2Combiner ├── AlgorithmsForTuple2 └── Tuple2 The main parts are * LuminosityNode * UserAnalysisNode who are in AND among each other and can accommodate nodes (defined with the class DVNode) in OR among themselves. To prepare the control flow there are a few options: 1. take a dictionary as input where all the nodes are listed in their categories 2. take a various lists as input each related to a specific category of nodes This function is then used to fill the control flow """ options.finalize() dv_top_children = [] ordered_nodes = OrderedDict() ordered_nodes["FileSummaryRecords"] = fsr_nodes ordered_nodes["UserAnalysis"] = user_analysis_nodes for k, v in ordered_nodes.items(): if len(v): cnode = CompositeNode( k, combine_logic=NodeLogic.NONLAZY_OR, children=[dv_node.node for dv_node in v], force_order=False, ) dv_top_children += [cnode] return CompositeNode( "DaVinci", combine_logic=NodeLogic.LAZY_AND, children=dv_top_children, force_order=True, )
[docs] def prepare_davinci_nodes(user_algs): """ This helper function takes as input a dictionary of user algorithms in the form .. code-block:: python { 'DVNode1' : [<list of algs>], 'DVNode2' : [<list of algs>], } and creates the node to send to `davinci_control_flow`. """ dv_nodes = [] for k, algs in user_algs.items(): if not type(algs) == list: raise TypeError("Dict values should all be lists of algorithms!") dv_nodes += [DVNode(k, algs)] return dv_nodes
[docs] def validate_algorithms(options, algs_dict): """ Validate algorithms passed to the DaVinci job. Args: options (DaVinci.Options): lbexec provided options object algs_dict: dict containing the user algorithms to be validated. """ bad_extensions = [".xdigi", ".digi"] # TO BE UPDATED AS SOON AS INFORMATION OF THE INPUT FILE EXTENSION IS AVAILABLE IN FSRs if any(Path(ifile).suffix in bad_extensions for ifile in options.input_files): for algs_list in list(algs_dict.values()): if any("FunTupleBase_Particles" in alg.fullname for alg in algs_list): raise ValueError( f"FunTuple_Particles algorithm can be used only with DST/LDST files but a file with {bad_extensions} extension is found." )
[docs] def add_davinci_configurables(options, user_algorithms, public_tools): """ Run the job adding the specific Davinci configurables to the standard PyConf ones. Algorithms developed by users are also included. Args: options (DaVinci.Options): lbexec provided options object Returns: ComponentConfig instance, a dict of configured Gaudi and DaVinci Configurable instances and user algorithms. """ if not public_tools: public_tools = [] config = options._input_config # Configuration for the creation of the lumiTree TTree # with the total count of lumievents per run if (not options.simulation) and options.lumi: config_fsr = str(json.dumps(dict(get_fsr_configurations()))) ApplicationMgr().ExtSvc += [ config.add( setup_component( "LHCb__FSR__Sink", AcceptRegex=r"^LumiCounter\.eventsByRun$|^HltANNSvc\.DecodingKeys$", instance_name="FileSummaryRecord", ConfigurationFSRs=config_fsr, ) ), config.add( setup_component( "LHCb__FSR__LumiFSRtoTTree", instance_name="LumiFSRtoTTree" ) ), ] if options.input_manifest_file: if options.metainfo_additional_tags: metainfo_repos.global_bind( extra_central_tags=options.metainfo_additional_tags ) ApplicationMgr().ExtSvc += [ configured_ann_svc(json_file=options.input_manifest_file), ] dvMainFlow = apply_filters(options, user_algorithms) validate_algorithms(options, dvMainFlow) fsrAlgs = make_fsr_algs(options) dvMainNode = davinci_control_flow( options, prepare_davinci_nodes(dvMainFlow), prepare_davinci_nodes(fsrAlgs) ) config.update(configure(options, dvMainNode, public_tools=public_tools)) return config
[docs] def make_config(options, user_algorithms, *, public_tools=None): if isinstance(user_algorithms, list): user_algorithms = {"default": user_algorithms} return add_davinci_configurables(options, user_algorithms, public_tools)