###############################################################################
# (c) Copyright 2020-2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
import logging
import itertools
from collections import OrderedDict
from typing import Any
from GaudiConf.LbExec import InputProcessTypes # type: ignore[import]
from PyConf.Algorithms import VoidFilter # type: ignore[import]
from PyConf.reading import get_particles, get_decreports, dstdata_filter # type: ignore[import]
import Functors as F # type: ignore[import]
from DaVinci.LbExec import Options
[docs]
log = logging.getLogger(__name__)
[docs]
def create_lines_filter(name: str, lines: list[str]) -> VoidFilter:
"""
Create an event pre-filter requiring the specified lines.
Note that the lines are 'split' according to their source (Hlt1, Hlt2, Spruce),
and for each thus specified source, at least one of the corresponding lines must
have passed.
Args:
name (str): filter's name.
lines (list): trigger lines requested.
Returns:
`VoidFilter` filter instance with name and code defined by the user.
"""
l = OrderedDict(
(k, [line for line in lines if line.startswith(k)])
for k in ("Hlt1", "Hlt2", "Spruce")
)
bad = set(lines) - set(itertools.chain(*l.values()))
if bad:
raise Exception(
f"line names which do not start with an expected source name: {bad}"
)
add_decision = lambda i: i if i.endswith("Decision") else i + "Decision"
cuts = [
F.DECREPORTS_FILTER(
Lines=list(map(add_decision, lines)), DecReports=get_decreports(source)
)
for source, lines in l.items()
if lines
]
return VoidFilter(name=name, Cut=F.require_all(*cuts))
[docs]
def apply_filters(
options: Options, algs_dict: dict[str, list[str]]
) -> dict[str, list[VoidFilter]]:
"""
Adding filter algorithms.
Args:
options (DaVinci.Options): lbexec provided options object
algs_dict (dict): dict of the user algorithms.
Returns:
Dict where at each node filters are prepended to the initial list of user algorithms.
"""
alg_filterd_dict: dict[str, list[VoidFilter]] = {}
for name, algs in algs_dict.items():
algs_list = []
# This filter will skip events with empty DstData but a positive line decision.
# It will only be applied to the input_process where DstData bank is produced (Hlt2, Turbo, Spruce).
if options.input_process in {"TurboPass", "Spruce", "Hlt2"}:
algs_list += [
dstdata_filter(
source=InputProcessTypes(options.input_process).sourceID()
)
]
algs_list += algs
alg_filterd_dict[name] = algs_list
return alg_filterd_dict
[docs]
def make_fsr_algs(options: Options) -> dict[str, list[Any]]:
"""
Make FSR related algorithms.
Args:
options (DaVinci.Options): lbexec provided options object
Returns:
Dist with list of FSR algorithm instances to be configured.
"""
from PyConf.Algorithms import EventAccounting, GenFSRMerge, RecordStream
from PyConf.application import create_or_reuse_rootIOAlg # type: ignore[import]
fsrAlgs: dict[str, list[Any]] = {}
if options.simulation:
algs: list[Any] = []
if options.merge_genfsr:
# we need to precreate RootIOAlg with options here as it will
# be reused (triggered by GenFSRMerge) without passing over the options
algs.append(create_or_reuse_rootIOAlg(options))
algs.append(GenFSRMerge(name="GenFSRMerge"))
if options.write_fsr and options.output_file:
if not options.merge_genfsr:
# I have no idea if this is safe to do so raise an exception
raise NotImplementedError(
"FSR output requested but no merging of FSRs performed."
)
algs.append(
RecordStream(
name="FSROutputStreamDstWriter",
OutputLevel=options.output_level,
ItemList=["/FileRecords#999"],
EvtDataSvc="FileRecordDataSvc",
Output=f"DATAFILE='{options.ntuple_file}' SVC='Gaudi::RootCnvSvc' OPT='REC'",
)
)
if algs:
fsrAlgs.update({"GenFSR": algs})
if options.lumi:
fsrAlgs.update({"Lumi": [EventAccounting(name="EventAccount")]})
# this should be modified to reflect LumiAlgsConf (configured separately?)
return fsrAlgs