Mantid
Loading...
Searching...
No Matches
ProcessBankSplitFullTimeTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
9#include "MantidAPI/Run.h"
11#include "MantidKernel/Logger.h"
14#include "MantidKernel/Timer.h"
15#include "MantidKernel/Unit.h"
16#include "MantidNexus/H5Util.h"
17#include "tbb/parallel_for.h"
18#include "tbb/parallel_reduce.h"
19
20#include <ranges>
21
23
24namespace {
25
26const std::string MICROSEC("microseconds");
27
28// Logger for this class
29auto g_log = Kernel::Logger("ProcessBankSplitFullTimeTask");
30
31} // namespace
33 std::vector<std::string> &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered,
34 std::vector<int> &workspaceIndices, std::vector<API::MatrixWorkspace_sptr> &wksps,
35 const std::map<detid_t, double> &calibration, const std::map<detid_t, double> &scale_at_sample,
36 const std::set<detid_t> &masked, const size_t events_per_chunk, const size_t grainsize_event,
37 const std::vector<PulseROI> &pulse_indices, const std::map<Mantid::Types::Core::DateAndTime, int> &splitterMap,
38 std::shared_ptr<API::Progress> &progress)
39 : m_h5file(h5file), m_bankEntries(bankEntryNames),
40 m_loader(std::make_shared<NexusLoader>(is_time_filtered, pulse_indices)), m_workspaceIndices(workspaceIndices),
41 m_wksps(wksps), m_calibration(calibration), m_scale_at_sample(scale_at_sample), m_masked(masked),
42 m_events_per_chunk(events_per_chunk), m_splitterMap(splitterMap), m_grainsize_event(grainsize_event),
43 m_progress(progress) {}
44
46 std::vector<std::string> &bankEntryNames, H5::H5File &h5file, std::shared_ptr<NexusLoader> loader,
47 std::vector<int> &workspaceIndices, std::vector<API::MatrixWorkspace_sptr> &wksps,
48 const std::map<detid_t, double> &calibration, const std::map<detid_t, double> &scale_at_sample,
49 const std::set<detid_t> &masked, const size_t events_per_chunk, const size_t grainsize_event,
50 const std::map<Mantid::Types::Core::DateAndTime, int> &splitterMap, std::shared_ptr<API::Progress> &progress)
51 : m_h5file(h5file), m_bankEntries(bankEntryNames), m_loader(std::move(loader)),
52 m_workspaceIndices(workspaceIndices), m_wksps(wksps), m_calibration(calibration),
53 m_scale_at_sample(scale_at_sample), m_masked(masked), m_events_per_chunk(events_per_chunk),
54 m_splitterMap(splitterMap), m_grainsize_event(grainsize_event), m_progress(progress) {}
55
56void ProcessBankSplitFullTimeTask::operator()(const tbb::blocked_range<size_t> &range) const {
57 auto entry = m_h5file.openGroup("entry"); // type=NXentry
58 for (size_t wksp_index = range.begin(); wksp_index < range.end(); ++wksp_index) {
59 const auto &bankName = m_bankEntries[wksp_index];
60 // empty bank names indicate spectra to skip; control should never get here, but just in case
61 if (bankName.empty()) {
62 continue;
63 }
64 Kernel::Timer timer;
65 g_log.debug() << bankName << " start" << std::endl;
66
67 // open the bank
68 auto event_group = entry.openGroup(bankName); // type=NXevent_data
69
70 // skip empty dataset
71 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
72 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
73 if (total_events == 0) {
74 m_progress->report();
75 continue;
76 }
77
78 std::unique_ptr<std::vector<uint64_t>> event_index = std::make_unique<std::vector<uint64_t>>();
79 auto eventRanges = m_loader->getEventIndexRanges(event_group, total_events, &event_index);
80
81 // Get all spectra for this bank.
82 // Create temporary y arrays for each workspace.
83 std::vector<API::ISpectrum *> spectra;
84 std::vector<std::vector<uint32_t>> y_temps;
85 for (const auto &wksp : m_wksps) {
86 spectra.push_back(&wksp->getSpectrum(wksp_index));
87 y_temps.emplace_back(spectra.back()->dataY().size());
88 }
89
90 // create object so bank calibration can be re-used
91 std::unique_ptr<BankCalibration> calibration = nullptr;
92
93 // get handle to the data
94 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
95 std::string tof_unit;
96 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
97 const double time_conversion = Kernel::Units::timeConversionValue(tof_unit, MICROSEC);
98
99 const auto frequency_log =
100 dynamic_cast<const Kernel::TimeSeriesProperty<double> *>(m_wksps.at(0)->run().getProperty("frequency"));
101 if (!frequency_log) {
102 throw std::runtime_error("Frequency log not found");
103 }
104 const auto pulse_times =
105 std::make_unique<std::vector<Mantid::Types::Core::DateAndTime>>(frequency_log->timesAsVector());
106
107 // declare arrays once so memory can be reused
108 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
109 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
110 auto pulse_times_idx = std::make_unique<std::vector<size_t>>(); // index into pulse_times for every event
111
112 // read parts of the bank at a time until all events are processed
113 while (!eventRanges.empty()) {
114 // Create offsets and slab sizes for the next chunk of events.
115 // This will read at most m_events_per_chunk events from the file
116 // and will split the ranges if necessary for the next iteration.
117 std::vector<size_t> offsets;
118 std::vector<size_t> slabsizes;
119
120 size_t total_events_to_read = 0;
121 // Process the event ranges until we reach the desired number of events to read or run out of ranges
122 while (!eventRanges.empty() && total_events_to_read < m_events_per_chunk) {
123 // Get the next event range from the stack
124 auto eventRange = eventRanges.top();
125 eventRanges.pop();
126
127 size_t range_size = eventRange.second - eventRange.first;
128 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
129
130 // If the range size is larger than the remaining chunk, we need to split it
131 if (range_size > remaining_chunk) {
132 // Split the range: process only part of it now, push the rest back for later
133 offsets.push_back(eventRange.first);
134 slabsizes.push_back(remaining_chunk);
135 total_events_to_read += remaining_chunk;
136 // Push the remainder of the range back to the front for next iteration
137 eventRanges.emplace(eventRange.first + remaining_chunk, eventRange.second);
138 break;
139 } else {
140 offsets.push_back(eventRange.first);
141 slabsizes.push_back(range_size);
142 total_events_to_read += range_size;
143 // Continue to next range
144 }
145 }
146
147 // log the event ranges being processed
148 std::ostringstream oss;
149 oss << "Processing " << bankName << " with " << total_events_to_read << " events in the ranges: ";
150 for (size_t i = 0; i < offsets.size(); ++i) {
151 oss << "[" << offsets[i] << ", " << (offsets[i] + slabsizes[i]) << "), ";
152 }
153 oss << "\n";
154 g_log.debug() << oss.str();
155
156 // load detid and tof at the same time
157 tbb::parallel_invoke(
158 [&] { // load detid
159 m_loader->loadData(detID_SDS, event_detid, offsets, slabsizes);
160 // immediately find min/max to allow for other things to read disk
162 // only recreate calibration if it doesn't already have the useful information
163 if ((!calibration) || (calibration->idmin() > static_cast<detid_t>(minval)) ||
164 (calibration->idmax() < static_cast<detid_t>(maxval))) {
165 calibration =
166 std::make_unique<BankCalibration>(static_cast<detid_t>(minval), static_cast<detid_t>(maxval),
167 time_conversion, m_calibration, m_scale_at_sample, m_masked);
168 }
169 },
170 [&] { // load time-of-flight
171 m_loader->loadData(tof_SDS, event_time_of_flight, offsets, slabsizes);
172 });
173
174 pulse_times_idx->resize(total_events_to_read);
175 // get the pulsetime of every event, event_index maps the first event of each pulse
176 size_t pos = 0;
177 auto event_index_it = event_index->cbegin();
178 for (size_t i = 0; i < offsets.size(); ++i) {
179 const size_t off = offsets[i];
180 const size_t slab = slabsizes[i];
181 for (size_t j = 0; j < slab; ++j) {
182 const uint64_t global_idx = static_cast<uint64_t>(off + j);
183 // find pulse: event_index holds the first event index of each pulse
184 while (event_index_it != event_index->cend() && *event_index_it <= global_idx) {
185 ++event_index_it;
186 }
187 size_t pulse_idx = 0;
188 if (event_index_it != event_index->cbegin()) {
189 pulse_idx = static_cast<size_t>(std::distance(event_index->cbegin(), event_index_it) - 1);
190 }
191 (*pulse_times_idx)[pos++] = pulse_idx;
192 }
193 }
194
195 // loop over targets
196 tbb::parallel_for(
197 tbb::blocked_range<size_t>(0, m_workspaceIndices.size()), [&](const tbb::blocked_range<size_t> &r) {
198 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
199 int i = m_workspaceIndices[idx];
200
201 // Precompute indices for this target
202 std::vector<size_t> indices;
203 auto splitter_it = m_splitterMap.cbegin();
204 for (size_t k = 0; k < event_detid->size(); ++k) {
205 // Calculate the full time at sample: full_time = pulse_time + (tof * correctionFactor), where
206 // correctionFactor is either scale_at_sample[detid] or 1.0
207 const double correctionFactor =
208 calibration->value_scale_at_sample(static_cast<detid_t>((*event_detid)[k]));
209
210 const auto tof_in_nanoseconds =
211 static_cast<int64_t>(static_cast<double>((*event_time_of_flight)[k]) * correctionFactor);
212 const auto pulsetime = (*pulse_times)[(*pulse_times_idx)[k]];
213 const Mantid::Types::Core::DateAndTime full_time = pulsetime + tof_in_nanoseconds;
214 // Linear search for pulsetime in splitter map, assume pulsetime and splitter map are both sorted. This
215 // is the starting point for the full_time search so we need to subtract some time (66.6ms) to ensure we
216 // don't skip it when adding the tof.
217 // Advance splitter_it until it points to the first element greater than (pulsetime - offset).
218 // This gives us a reasonable starting point for the full_time search.
219 while (splitter_it != m_splitterMap.end() &&
220 splitter_it->first <= pulsetime - static_cast<int64_t>(PULSETIME_OFFSET)) {
221 ++splitter_it;
222 }
223
224 // Now, starting at splitter_it, find full_time (TOFs will be unsorted)
225 // Advance until we find the first element > full_time, then step back to get the greatest key <=
226 // full_time.
227 auto full_time_it = splitter_it;
228 while (full_time_it != m_splitterMap.end() && full_time_it->first <= full_time) {
229 ++full_time_it;
230 }
231
232 // If there is no element <= full_time then full_time_it will be begin(); otherwise step back to the
233 // element that is <= full_time.
234 if (full_time_it == m_splitterMap.begin()) {
235 // no splitter entry <= full_time; skip this event
236 } else {
237 --full_time_it;
238 if (full_time_it != m_splitterMap.end() && full_time_it->second == i) {
239 indices.push_back(k);
240 }
241 }
242 }
243
244 auto event_id_view_for_target =
245 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
246 auto event_tof_view_for_target = indices | std::views::transform([&event_time_of_flight](const auto &k) {
247 return (*event_time_of_flight)[k];
248 });
249
250 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target, calibration.get(),
251 &spectra[idx]->readX());
252
253 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
254 tbb::parallel_reduce(range_info, task);
255
256 std::transform(y_temps[idx].begin(), y_temps[idx].end(), task.y_temp.begin(), y_temps[idx].begin(),
257 std::plus<uint32_t>());
258 }
259 });
260 }
261
262 // copy the data out into the correct spectrum
263 tbb::parallel_for(size_t(0), m_wksps.size(), [&](size_t i) {
264 auto &y_values = spectra[i]->dataY();
265 std::copy(y_temps[i].cbegin(), y_temps[i].cend(), y_values.begin());
266 });
267
268 g_log.debug() << bankName << " stop" << timer << std::endl;
269 m_progress->report();
270 }
271}
272}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
T maxval
T minval
const size_t m_grainsize_event
number of events to histogram in a single thread
const size_t m_events_per_chunk
number of events to read from disk at one time
ProcessBankSplitFullTimeTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered, std::vector< int > &workspaceIndices, std::vector< API::MatrixWorkspace_sptr > &wksps, const std::map< detid_t, double > &calibration, const std::map< detid_t, double > &scale_at_sample, const std::set< detid_t > &masked, const size_t events_per_chunk, const size_t grainsize_event, const std::vector< PulseROI > &pulse_indices, const std::map< Mantid::Types::Core::DateAndTime, int > &splitterMap, std::shared_ptr< API::Progress > &progress)
std::map< detid_t, double > m_scale_at_sample
multiplicative 0<value<1 to move neutron TOF at sample
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A specialised Property class for holding a series of time-value pairs.
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
MANTID_KERNEL_DLL double timeConversionValue(const std::string &input_unit, const std::string &output_unit)
Definition Unit.cpp:1430
std::pair< T, T > parallel_minmax(std::vector< T > const *const vec, size_t const grainsize=1000)
parallel_minmax
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342
int32_t detid_t
Typedef for a detector ID.
STL namespace.