Mantid
Loading...
Searching...
No Matches
ProcessBankSplitTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
10#include "MantidKernel/Logger.h"
12#include "MantidKernel/Timer.h"
13#include "MantidKernel/Unit.h"
14#include "MantidNexus/H5Util.h"
15#include "tbb/parallel_for.h"
16#include "tbb/parallel_reduce.h"
17
18#include <ranges>
19
21
22namespace {
23
24const std::string MICROSEC("microseconds");
25
26// Logger for this class
27auto g_log = Kernel::Logger("ProcessBankSplitTask");
28
29} // namespace
31 std::vector<std::string> &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered,
32 std::vector<int> &workspaceIndices, std::vector<API::MatrixWorkspace_sptr> &wksps,
33 const std::map<detid_t, double> &calibration, const std::map<detid_t, double> &scale_at_sample,
34 const std::set<detid_t> &masked, const size_t events_per_chunk, const size_t grainsize_event,
35 std::vector<std::pair<int, PulseROI>> target_to_pulse_indices, std::shared_ptr<API::Progress> &progress)
36 : m_h5file(h5file), m_bankEntries(bankEntryNames), m_loader(is_time_filtered, {}, target_to_pulse_indices),
37 m_workspaceIndices(workspaceIndices), m_wksps(wksps), m_calibration(calibration),
38 m_scale_at_sample(scale_at_sample), m_masked(masked), m_events_per_chunk(events_per_chunk),
39 m_grainsize_event(grainsize_event), m_progress(progress) {}
40
41void ProcessBankSplitTask::operator()(const tbb::blocked_range<size_t> &range) const {
42 auto entry = m_h5file.openGroup("entry"); // type=NXentry
43 for (size_t wksp_index = range.begin(); wksp_index < range.end(); ++wksp_index) {
44 const auto &bankName = m_bankEntries[wksp_index];
45 // empty bank names indicate spectra to skip; control should never get here, but just in case
46 if (bankName.empty()) {
47 continue;
48 }
49 Kernel::Timer timer;
50 g_log.debug() << bankName << " start" << std::endl;
51
52 // open the bank
53 auto event_group = entry.openGroup(bankName); // type=NXevent_data
54
55 // skip empty dataset
56 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
57 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
58 if (total_events == 0) {
59 m_progress->report();
60 continue;
61 }
62
63 auto eventSplitRanges = m_loader.getEventIndexSplitRanges(event_group, total_events);
64
65 // Get all spectra for this bank.
66 // Create temporary y arrays for each workspace.
67 std::vector<API::ISpectrum *> spectra;
68 std::vector<std::vector<uint32_t>> y_temps;
69 for (const auto &wksp : m_wksps) {
70 spectra.push_back(&wksp->getSpectrum(wksp_index));
71 y_temps.emplace_back(spectra.back()->dataY().size());
72 }
73
74 // create object so bank calibration can be re-used
75 std::unique_ptr<BankCalibration> calibration = nullptr;
76
77 // get handle to the data
78 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
79 // auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
80 // and the units
81 std::string tof_unit;
82 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
83 const double time_conversion = Kernel::Units::timeConversionValue(tof_unit, MICROSEC);
84
85 // declare arrays once so memory can be reused
86 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
87 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
88
89 // read parts of the bank at a time until all events are processed
90 while (!eventSplitRanges.empty()) {
91 // Create offsets and slab sizes for the next chunk of events.
92 // This will read at most m_events_per_chunk events from the file
93 // and will split the ranges if necessary for the next iteration.
94 std::vector<size_t> offsets;
95 std::vector<size_t> slabsizes;
96 std::vector<std::pair<int, EventROI>> relative_target_ranges;
97
98 size_t total_events_to_read = 0;
99 // Process the event ranges until we reach the desired number of events to read or run out of ranges
100 while (!eventSplitRanges.empty() && total_events_to_read < m_events_per_chunk) {
101 // Get the next event range from the stack
102 auto [target, eventRange] = eventSplitRanges.top();
103 eventSplitRanges.pop();
104
105 size_t range_size = eventRange.second - eventRange.first;
106 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
107
108 // If the range size is larger than the remaining chunk, we need to split it
109 if (range_size > remaining_chunk) {
110 // Split the range: process only part of it now, push the rest back for later
111 relative_target_ranges.emplace_back(target,
112 EventROI(total_events_to_read, total_events_to_read + remaining_chunk));
113 offsets.push_back(eventRange.first);
114 slabsizes.push_back(remaining_chunk);
115 total_events_to_read += remaining_chunk;
116 // Push the remainder of the range back to the front for next iteration
117 eventSplitRanges.emplace(target, EventROI(eventRange.first + remaining_chunk, eventRange.second));
118 break;
119 } else {
120 relative_target_ranges.emplace_back(target,
121 EventROI(total_events_to_read, total_events_to_read + range_size));
122 offsets.push_back(eventRange.first);
123 slabsizes.push_back(range_size);
124 total_events_to_read += range_size;
125 // Continue to next range
126 }
127 }
128
129 // log the event ranges being processed
130 std::ostringstream oss;
131 oss << "Processing " << bankName << " with " << total_events_to_read << " events in the ranges: ";
132 for (size_t i = 0; i < offsets.size(); ++i) {
133 oss << "[" << offsets[i] << ", " << (offsets[i] + slabsizes[i]) << "), ";
134 }
135 oss << "\n";
136 g_log.debug() << oss.str();
137
138 // load detid and tof at the same time
139 tbb::parallel_invoke(
140 [&] { // load detid
141 // event_detid->clear();
142 m_loader.loadData(detID_SDS, event_detid, offsets, slabsizes);
143 // immediately find min/max to allow for other things to read disk
145 // only recreate calibration if it doesn't already have the useful information
146 if ((!calibration) || (calibration->idmin() > static_cast<detid_t>(minval)) ||
147 (calibration->idmax() < static_cast<detid_t>(maxval))) {
148 calibration =
149 std::make_unique<BankCalibration>(static_cast<detid_t>(minval), static_cast<detid_t>(maxval),
150 time_conversion, m_calibration, m_scale_at_sample, m_masked);
151 }
152 },
153 [&] { // load time-of-flight
154 // event_time_of_flight->clear();
155 m_loader.loadData(tof_SDS, event_time_of_flight, offsets, slabsizes);
156 });
157
158 // loop over targets
159 tbb::parallel_for(
160 tbb::blocked_range<size_t>(0, m_workspaceIndices.size()), [&](const tbb::blocked_range<size_t> &r) {
161 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
162 int i = m_workspaceIndices[idx];
163
164 // Precompute indices for this target
165 std::vector<size_t> indices;
166 for (const auto &pair : relative_target_ranges) {
167 if (pair.first == static_cast<int>(i)) {
168 auto [start, end] = pair.second;
169 for (size_t k = start; k < end; ++k) {
170 indices.push_back(k);
171 }
172 }
173 }
174
175 auto event_id_view_for_target =
176 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
177 auto event_tof_view_for_target = indices | std::views::transform([&event_time_of_flight](const auto &k) {
178 return (*event_time_of_flight)[k];
179 });
180
181 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target, calibration.get(),
182 &spectra[idx]->readX());
183
184 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
185 tbb::parallel_reduce(range_info, task);
186
187 std::transform(y_temps[idx].begin(), y_temps[idx].end(), task.y_temp.begin(), y_temps[idx].begin(),
188 std::plus<uint32_t>());
189 }
190 });
191 }
192
193 // copy the data out into the correct spectrum
194 tbb::parallel_for(size_t(0), m_wksps.size(), [&](size_t i) {
195 auto &y_values = spectra[i]->dataY();
196 std::copy(y_temps[i].cbegin(), y_temps[i].cend(), y_values.begin());
197 });
198
199 g_log.debug() << bankName << " stop " << timer << std::endl;
200 m_progress->report();
201 }
202}
203}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
T maxval
T minval
std::stack< std::pair< int, EventROI > > getEventIndexSplitRanges(H5::Group &event_group, const uint64_t number_events)
virtual void loadData(H5::DataSet &SDS, std::unique_ptr< std::vector< uint32_t > > &data, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
const size_t m_grainsize_event
number of events to histogram in a single thread
ProcessBankSplitTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered, std::vector< int > &workspaceIndices, std::vector< API::MatrixWorkspace_sptr > &wksps, const std::map< detid_t, double > &calibration, const std::map< detid_t, double > &scale_at_sample, const std::set< detid_t > &masked, const size_t events_per_chunk, const size_t grainsize_event, std::vector< std::pair< int, PulseROI > > target_to_pulse_indices, std::shared_ptr< API::Progress > &progress)
const size_t m_events_per_chunk
number of events to read from disk at one time
void operator()(const tbb::blocked_range< size_t > &range) const
std::map< detid_t, double > m_scale_at_sample
multiplicative 0<value<1 to move neutron TOF at sample
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
std::pair< uint64_t, uint64_t > EventROI
Definition NexusLoader.h:26
MANTID_KERNEL_DLL double timeConversionValue(const std::string &input_unit, const std::string &output_unit)
Definition Unit.cpp:1430
std::pair< T, T > parallel_minmax(std::vector< T > const *const vec, size_t const grainsize=1000)
parallel_minmax
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342
int32_t detid_t
Typedef for a detector ID.