###############################################################################
# (c) Copyright 2021-2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""
DaVinci configured using PyConf components.
"""
import logging, json
from pathlib import Path
from collections import namedtuple, OrderedDict
from Configurables import ApplicationMgr # type: ignore[import]
from PyConf.application import ( # type: ignore[import]
configure,
configured_ann_svc,
metainfo_repos,
) # type: ignore[import]
from PyConf.control_flow import CompositeNode, NodeLogic # type: ignore[import]
from PyConf.components import setup_component # type: ignore[import]
from PyConf.utils import get_fsr_configurations # type: ignore[import]
from DaVinci.algorithms import make_fsr_algs, apply_filters
[docs]
log = logging.getLogger(__name__)
[docs]
class DVNode(namedtuple("DVNode", ["node", "extra_outputs"])): # noqa
"""Immutable object fully qualifying a DaVinci node.
Copied from the `HltLine` class without the prescaler argument.
Attributes:
node (CompositeNode): the control flow node of the line
"""
[docs]
__slots__ = () # do not add __dict__ (preserve immutability)
def __new__(cls, name, algs, extra_outputs=None):
"""Initialize a DaVinci node from name and a set of algorithms.
Creates a control flow `CompositeNode` with the given `algs`
combined with `LAZY_AND` logic.
Args:
name (str): name of the line
algs: iterable of algorithms
extra_outputs (iterable of 2-tuple): List of (name, DataHandle) pairs.
"""
node = CompositeNode(
name, tuple(algs), combine_logic=NodeLogic.LAZY_AND, force_order=True
)
if extra_outputs is None:
extra_outputs = []
return super().__new__(cls, node, frozenset(extra_outputs))
@property
[docs]
def name(self):
"""DVNode (CompositeNode instance) name."""
return self.node.name
[docs]
def davinci_control_flow(options, user_analysis_nodes=[], fsr_nodes=[]):
"""
DaVinci control flow is split in a few sections as described in DaVinci/issue#2 (then simplified)
.. code-block:: text
DaVinci (LAZY_AND)
├── LuminosityNode (NONLAZY_OR)
│ └── EventAccounting/EventAccount
└── UserAnalysisNode (NONLAZY_OR)
├── DVUser1Node (LAZY_AND)
│ ├── PVFilter
│ ├── ParticlesFilter
│ ├── Candidate1Combiner
│ ├── AlgorithmsForTuple1
│ ├── Tuple1
└── DVUser2Node (LAZY_AND)
├── PVFilter
├── ParticlesFilter
├── Candidate2Combiner
├── AlgorithmsForTuple2
└── Tuple2
The main parts are
* LuminosityNode
* UserAnalysisNode
who are in AND among each other and can accommodate nodes (defined with the class DVNode)
in OR among themselves.
To prepare the control flow there are a few options:
1. take a dictionary as input where all the nodes are listed in their categories
2. take a various lists as input each related to a specific category of nodes
This function is then used to fill the control flow
"""
options.finalize()
dv_top_children = []
ordered_nodes = OrderedDict()
ordered_nodes["FileSummaryRecords"] = fsr_nodes
ordered_nodes["UserAnalysis"] = user_analysis_nodes
for k, v in ordered_nodes.items():
if len(v):
cnode = CompositeNode(
k,
combine_logic=NodeLogic.NONLAZY_OR,
children=[dv_node.node for dv_node in v],
force_order=False,
)
dv_top_children += [cnode]
return CompositeNode(
"DaVinci",
combine_logic=NodeLogic.LAZY_AND,
children=dv_top_children,
force_order=True,
)
[docs]
def prepare_davinci_nodes(user_algs):
"""
This helper function takes as input a dictionary of user algorithms in the form
.. code-block:: python
{
'DVNode1' : [<list of algs>],
'DVNode2' : [<list of algs>],
}
and creates the node to send to `davinci_control_flow`.
"""
dv_nodes = []
for k, algs in user_algs.items():
if not type(algs) == list:
raise TypeError("Dict values should all be lists of algorithms!")
dv_nodes += [DVNode(k, algs)]
return dv_nodes
[docs]
def validate_algorithms(options, algs_dict):
"""
Validate algorithms passed to the DaVinci job.
Args:
options (DaVinci.Options): lbexec provided options object
algs_dict: dict containing the user algorithms to be validated.
"""
bad_extensions = [".xdigi", ".digi"]
# TO BE UPDATED AS SOON AS INFORMATION OF THE INPUT FILE EXTENSION IS AVAILABLE IN FSRs
if any(Path(ifile).suffix in bad_extensions for ifile in options.input_files):
for algs_list in list(algs_dict.values()):
if any("FunTupleBase_Particles" in alg.fullname for alg in algs_list):
raise ValueError(
f"FunTuple_Particles algorithm can be used only with DST/LDST files but a file with {bad_extensions} extension is found."
)
[docs]
def add_davinci_configurables(options, user_algorithms, public_tools):
"""
Run the job adding the specific Davinci configurables to the standard PyConf ones.
Algorithms developed by users are also included.
Args:
options (DaVinci.Options): lbexec provided options object
Returns:
ComponentConfig instance, a dict of configured Gaudi and DaVinci Configurable instances and user algorithms.
"""
if not public_tools:
public_tools = []
config = options._input_config
# Configuration for the creation of the lumiTree TTree
# with the total count of lumievents per run
if (not options.simulation) and options.lumi:
config_fsr = str(json.dumps(dict(get_fsr_configurations())))
ApplicationMgr().ExtSvc += [
config.add(
setup_component(
"LHCb__FSR__Sink",
AcceptRegex=r"^LumiCounter\.eventsByRun$|^HltANNSvc\.DecodingKeys$",
instance_name="FileSummaryRecord",
ConfigurationFSRs=config_fsr,
)
),
config.add(
setup_component(
"LHCb__FSR__LumiFSRtoTTree", instance_name="LumiFSRtoTTree"
)
),
]
if options.input_manifest_file:
if options.metainfo_additional_tags:
metainfo_repos.global_bind(
extra_central_tags=options.metainfo_additional_tags
)
ApplicationMgr().ExtSvc += [
configured_ann_svc(json_file=options.input_manifest_file),
]
dvMainFlow = apply_filters(options, user_algorithms)
validate_algorithms(options, dvMainFlow)
fsrAlgs = make_fsr_algs(options)
dvMainNode = davinci_control_flow(
options, prepare_davinci_nodes(dvMainFlow), prepare_davinci_nodes(fsrAlgs)
)
config.update(configure(options, dvMainNode, public_tools=public_tools))
return config
[docs]
def make_config(options, user_algorithms, *, public_tools=None):
if isinstance(user_algorithms, list):
user_algorithms = {"default": user_algorithms}
return add_davinci_configurables(options, user_algorithms, public_tools)