Mantid
Loading...
Searching...
No Matches
ProcessBankTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
10#include "MantidKernel/Logger.h"
12#include "MantidKernel/Timer.h"
13#include "MantidKernel/Unit.h"
14#include "MantidNexus/H5Util.h"
15#include "tbb/parallel_for.h"
16#include "tbb/parallel_reduce.h"
17
19
20namespace {
21
22const std::string MICROSEC("microseconds");
23
24// Logger for this class
25auto g_log = Kernel::Logger("ProcessBankTask");
26
27} // namespace
28ProcessBankTask::ProcessBankTask(std::vector<std::string> &bankEntryNames, H5::H5File &h5file,
29 const bool is_time_filtered, API::MatrixWorkspace_sptr &wksp,
30 const std::map<detid_t, double> &calibration,
31 const std::map<detid_t, double> &scale_at_sample, const std::set<detid_t> &masked,
32 const size_t events_per_chunk, const size_t grainsize_event,
33 std::vector<PulseROI> pulse_indices, std::shared_ptr<API::Progress> &progress)
34 : m_h5file(h5file), m_bankEntries(bankEntryNames), m_loader(is_time_filtered, pulse_indices), m_wksp(wksp),
35 m_calibration(calibration), m_scale_at_sample(scale_at_sample), m_masked(masked),
36 m_events_per_chunk(events_per_chunk), m_grainsize_event(grainsize_event), m_progress(progress) {}
37
38void ProcessBankTask::operator()(const tbb::blocked_range<size_t> &range) const {
39 auto entry = m_h5file.openGroup("entry"); // type=NXentry
40 for (size_t wksp_index = range.begin(); wksp_index < range.end(); ++wksp_index) {
41 const auto &bankName = m_bankEntries[wksp_index];
42 // empty bank names indicate spectra to skip; control should never get here, but just in case
43 if (bankName.empty()) {
44 continue;
45 }
46 Kernel::Timer timer;
47 g_log.debug() << bankName << " start" << std::endl;
48
49 // open the bank
50 auto event_group = entry.openGroup(bankName); // type=NXevent_data
51
52 // skip empty dataset
53 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
54 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
55 if (total_events == 0) {
56 m_progress->report();
57 continue;
58 }
59
60 auto eventRanges = m_loader.getEventIndexRanges(event_group, total_events);
61
62 // create a histogrammer to process the events
63 auto &spectrum = m_wksp->getSpectrum(wksp_index);
64
65 // std::atomic allows for multi-threaded accumulation and who cares about floats when you are just
66 // counting things
67 // std::vector<std::atomic_uint32_t> y_temp(spectrum.dataY().size())
68 std::vector<uint32_t> y_temp(spectrum.dataY().size());
69
70 // create object so bank calibration can be re-used
71 std::unique_ptr<BankCalibration> calibration = nullptr;
72
73 // get handle to the data
74 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
75 // auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
76 // and the units
77 std::string tof_unit;
78 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
79 const double time_conversion = Kernel::Units::timeConversionValue(tof_unit, MICROSEC);
80
81 // declare arrays once so memory can be reused
82 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
83 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
84
85 // read parts of the bank at a time until all events are processed
86 while (!eventRanges.empty()) {
87 // Create offsets and slab sizes for the next chunk of events.
88 // This will read at most m_events_per_chunk events from the file
89 // and will split the ranges if necessary for the next iteration.
90 std::vector<size_t> offsets;
91 std::vector<size_t> slabsizes;
92
93 size_t total_events_to_read = 0;
94 // Process the event ranges until we reach the desired number of events to read or run out of ranges
95 while (!eventRanges.empty() && total_events_to_read < m_events_per_chunk) {
96 // Get the next event range from the stack
97 auto eventRange = eventRanges.top();
98 eventRanges.pop();
99
100 size_t range_size = eventRange.second - eventRange.first;
101 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
102
103 // If the range size is larger than the remaining chunk, we need to split it
104 if (range_size > remaining_chunk) {
105 // Split the range: process only part of it now, push the rest back for later
106 offsets.push_back(eventRange.first);
107 slabsizes.push_back(remaining_chunk);
108 total_events_to_read += remaining_chunk;
109 // Push the remainder of the range back to the front for next iteration
110 eventRanges.emplace(eventRange.first + remaining_chunk, eventRange.second);
111 break;
112 } else {
113 offsets.push_back(eventRange.first);
114 slabsizes.push_back(range_size);
115 total_events_to_read += range_size;
116 // Continue to next range
117 }
118 }
119
120 // log the event ranges being processed
121 std::ostringstream oss;
122 oss << "Processing " << bankName << " with " << total_events_to_read << " events in the ranges: ";
123 for (size_t i = 0; i < offsets.size(); ++i) {
124 oss << "[" << offsets[i] << ", " << (offsets[i] + slabsizes[i]) << "), ";
125 }
126 oss << "\n";
127 g_log.debug() << oss.str();
128
129 // load detid and tof at the same time
130 tbb::parallel_invoke(
131 [&] { // load detid
132 // event_detid->clear();
133 m_loader.loadData(detID_SDS, event_detid, offsets, slabsizes);
134 // immediately find min/max to allow for other things to read disk
136 // only recreate calibration if it doesn't already have the useful information
137 if ((!calibration) || (calibration->idmin() > static_cast<detid_t>(minval)) ||
138 (calibration->idmax() < static_cast<detid_t>(maxval))) {
139 calibration =
140 std::make_unique<BankCalibration>(static_cast<detid_t>(minval), static_cast<detid_t>(maxval),
141 time_conversion, m_calibration, m_scale_at_sample, m_masked);
142 }
143 },
144 [&] { // load time-of-flight
145 // event_time_of_flight->clear();
146 m_loader.loadData(tof_SDS, event_time_of_flight, offsets, slabsizes);
147 });
148
149 // Create a local task for this thread
150 ProcessEventsTask task(event_detid.get(), event_time_of_flight.get(), calibration.get(), &spectrum.readX());
151
152 // Non-blocking processing of the events
153 const tbb::blocked_range<size_t> range_info(0, event_time_of_flight->size(), m_grainsize_event);
154 tbb::parallel_reduce(range_info, task);
155
156 // Accumulate results into shared y_temp to combine local histograms
157 std::transform(y_temp.begin(), y_temp.end(), task.y_temp.begin(), y_temp.begin(), std::plus<uint32_t>());
158 }
159
160 // copy the data out into the correct spectrum
161 auto &y_values = spectrum.dataY();
162 std::copy(y_temp.cbegin(), y_temp.cend(), y_values.begin());
163
164 g_log.debug() << bankName << " stop " << timer << std::endl;
165 m_progress->report();
166 }
167}
168}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
T maxval
T minval
virtual std::stack< EventROI > getEventIndexRanges(H5::Group &event_group, const uint64_t number_events, std::unique_ptr< std::vector< uint64_t > > *event_index=nullptr) const
virtual void loadData(H5::DataSet &SDS, std::unique_ptr< std::vector< uint32_t > > &data, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
const size_t m_grainsize_event
number of events to histogram in a single thread
const size_t m_events_per_chunk
number of events to read from disk at one time
const std::map< detid_t, double > m_calibration
detid: 1/difc
std::map< detid_t, double > m_scale_at_sample
multiplicative 0<value<1 to move neutron TOF at sample
void operator()(const tbb::blocked_range< size_t > &range) const
ProcessBankTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered, API::MatrixWorkspace_sptr &wksp, const std::map< detid_t, double > &calibration, const std::map< detid_t, double > &scale_at_sample, const std::set< detid_t > &masked, const size_t events_per_chunk, const size_t grainsize_event, std::vector< PulseROI > pulse_indices, std::shared_ptr< API::Progress > &progress)
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
std::shared_ptr< MatrixWorkspace > MatrixWorkspace_sptr
shared pointer to the matrix workspace base class
const std::string TIME_OF_FLIGHT("event_time_offset")
MANTID_KERNEL_DLL double timeConversionValue(const std::string &input_unit, const std::string &output_unit)
Definition Unit.cpp:1430
std::pair< T, T > parallel_minmax(std::vector< T > const *const vec, size_t const grainsize=1000)
parallel_minmax
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342
int32_t detid_t
Typedef for a detector ID.