Mantid
Loading...
Searching...
No Matches
ProcessBankSplitFullTimeTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
9#include "MantidAPI/Run.h"
11#include "MantidKernel/Logger.h"
14#include "MantidKernel/Timer.h"
15#include "MantidNexus/H5Util.h"
16#include "tbb/parallel_for.h"
17#include "tbb/parallel_reduce.h"
18
19#include <ranges>
20
22
23namespace {
24
25// Logger for this class
26auto g_log = Kernel::Logger("ProcessBankSplitFullTimeTask");
27
28} // namespace
29
31 std::vector<std::string> &bankEntryNames, H5::H5File &h5file, std::shared_ptr<NexusLoader> loader,
32 std::vector<int> &workspaceIndices, std::vector<SpectraProcessingData> &processingDatas,
33 const BankCalibrationFactory &calibFactory, const size_t events_per_chunk, const size_t grainsize_event,
34 const std::map<Mantid::Types::Core::DateAndTime, int> &splitterMap,
35 std::shared_ptr<std::vector<Types::Core::DateAndTime>> pulse_times, std::shared_ptr<API::Progress> &progress)
36 : ProcessBankTaskBase(bankEntryNames, loader, calibFactory), m_h5file(h5file), m_workspaceIndices(workspaceIndices),
37 m_processingDatas(processingDatas), m_events_per_chunk(events_per_chunk), m_splitterMap(splitterMap),
38 m_grainsize_event(grainsize_event), m_pulse_times(pulse_times), m_progress(progress) {}
39
40void ProcessBankSplitFullTimeTask::operator()(const tbb::blocked_range<size_t> &range) const {
41 auto entry = m_h5file.openGroup("entry"); // type=NXentry
42 for (size_t bank_index = range.begin(); bank_index < range.end(); ++bank_index) {
43 const auto &bankName = this->bankName(bank_index);
44 // empty bank names indicate spectra to skip; control should never get here, but just in case
45 if (bankName.empty()) {
46 continue;
47 }
48 Kernel::Timer timer;
49 g_log.debug() << bankName << " start" << std::endl;
50
51 // open the bank
52 auto event_group = entry.openGroup(bankName); // type=NXevent_data
53
54 // skip empty dataset
55 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
56 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
57 if (total_events == 0) {
58 m_progress->report();
59 continue;
60 }
61
62 std::unique_ptr<std::vector<uint64_t>> event_index = std::make_unique<std::vector<uint64_t>>();
63 auto eventRanges = this->getEventIndexRanges(event_group, total_events, &event_index);
64
65 // get handle to the data
66 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
67 std::string tof_unit;
68 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
69 // now the calibration for the output group can be created
70 // which detectors go into the current group - assumes ouput spectrum number is one more than workspace index
71 const auto calibrations = this->getCalibrations(tof_unit, bank_index);
72
73 // declare arrays once so memory can be reused
74 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
75 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
76 auto pulse_times_idx = std::make_unique<std::vector<size_t>>(); // index into pulse_times for every event
77
78 // read parts of the bank at a time until all events are processed
79 while (!eventRanges.empty()) {
80 // Create offsets and slab sizes for the next chunk of events.
81 // This will read at most m_events_per_chunk events from the file
82 // and will split the ranges if necessary for the next iteration.
83 std::vector<size_t> offsets;
84 std::vector<size_t> slabsizes;
85
86 size_t total_events_to_read = 0;
87 // Process the event ranges until we reach the desired number of events to read or run out of ranges
88 while (!eventRanges.empty() && total_events_to_read < m_events_per_chunk) {
89 // Get the next event range from the stack
90 auto eventRange = eventRanges.top();
91 eventRanges.pop();
92
93 size_t range_size = eventRange.second - eventRange.first;
94 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
95
96 // If the range size is larger than the remaining chunk, we need to split it
97 if (range_size > remaining_chunk) {
98 // Split the range: process only part of it now, push the rest back for later
99 offsets.push_back(eventRange.first);
100 slabsizes.push_back(remaining_chunk);
101 total_events_to_read += remaining_chunk;
102 // Push the remainder of the range back to the front for next iteration
103 eventRanges.emplace(eventRange.first + remaining_chunk, eventRange.second);
104 break;
105 } else {
106 offsets.push_back(eventRange.first);
107 slabsizes.push_back(range_size);
108 total_events_to_read += range_size;
109 // Continue to next range
110 }
111 }
112
113 // log the event ranges being processed
114 g_log.debug(toLogString(bankName, total_events_to_read, offsets, slabsizes));
115
116 if (total_events_to_read == 0) {
117 continue; // nothing to do
118 }
119
120 // load detid and tof at the same time
121 this->loadEvents(detID_SDS, tof_SDS, offsets, slabsizes, event_detid, event_time_of_flight);
122
123 pulse_times_idx->resize(total_events_to_read);
124 // get the pulsetime of every event, event_index maps the first event of each pulse
125 size_t pos = 0;
126 auto event_index_it = event_index->cbegin();
127 for (size_t i = 0; i < offsets.size(); ++i) {
128 const size_t off = offsets[i];
129 const size_t slab = slabsizes[i];
130 for (size_t j = 0; j < slab; ++j) {
131 const uint64_t global_idx = static_cast<uint64_t>(off + j);
132 // find pulse: event_index holds the first event index of each pulse
133 while (event_index_it != event_index->cend() && *event_index_it <= global_idx) {
134 ++event_index_it;
135 }
136 size_t pulse_idx = 0;
137 if (event_index_it != event_index->cbegin()) {
138 pulse_idx = static_cast<size_t>(std::distance(event_index->cbegin(), event_index_it) - 1);
139 }
140 (*pulse_times_idx)[pos++] = pulse_idx;
141 }
142 }
143
144 // Loop over all output spectra / groups
145 tbb::parallel_for(
146 tbb::blocked_range<size_t>(0, m_processingDatas[0].counts.size()),
147 [&](const tbb::blocked_range<size_t> &output_range) {
148 for (size_t output_index = output_range.begin(); output_index < output_range.end();
149 ++output_index) { // loop over targets
150 const auto calibration = calibrations.at(output_index);
151 tbb::parallel_for(
152 tbb::blocked_range<size_t>(0, m_workspaceIndices.size()), [&](const tbb::blocked_range<size_t> &r) {
153 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
154 int i = m_workspaceIndices[idx];
155
156 // Precompute indices for this target
157 std::vector<size_t> indices;
158 auto splitter_it = m_splitterMap.cbegin();
159 for (size_t k = 0; k < event_detid->size(); ++k) {
160 // Calculate the full time at sample: full_time = pulse_time + (tof * correctionFactor), where
161 // correctionFactor is either scale_at_sample[detid] or 1.0
162 const double correctionFactor =
163 calibration.value_scale_at_sample(static_cast<detid_t>((*event_detid)[k]));
164
165 const auto tof_in_nanoseconds =
166 static_cast<int64_t>(static_cast<double>((*event_time_of_flight)[k]) * correctionFactor);
167 const auto pulsetime = (*m_pulse_times)[(*pulse_times_idx)[k]];
168 const Mantid::Types::Core::DateAndTime full_time = pulsetime + tof_in_nanoseconds;
169 // Linear search for pulsetime in splitter map, assume pulsetime and splitter map are both
170 // sorted. This is the starting point for the full_time search so we need to subtract some time
171 // (66.6ms) to ensure we don't skip it when adding the tof. Advance splitter_it until it points
172 // to the first element greater than (pulsetime - offset). This gives us a reasonable starting
173 // point for the full_time search.
174 while (splitter_it != m_splitterMap.end() &&
175 splitter_it->first <= pulsetime - static_cast<int64_t>(PULSETIME_OFFSET)) {
176 ++splitter_it;
177 }
178
179 // Now, starting at splitter_it, find full_time (TOFs will be unsorted)
180 // Advance until we find the first element > full_time, then step back to get the greatest key
181 // <= full_time.
182 auto full_time_it = splitter_it;
183 while (full_time_it != m_splitterMap.end() && full_time_it->first <= full_time) {
184 ++full_time_it;
185 }
186
187 // If there is no element <= full_time then full_time_it will be begin(); otherwise step back to
188 // the element that is <= full_time.
189 if (full_time_it == m_splitterMap.begin()) {
190 // no splitter entry <= full_time; skip this event
191 } else {
192 --full_time_it;
193 if (full_time_it != m_splitterMap.end() && full_time_it->second == i) {
194 indices.push_back(k);
195 }
196 }
197 }
198
199 auto event_id_view_for_target =
200 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
201 auto event_tof_view_for_target =
202 indices | std::views::transform(
203 [&event_time_of_flight](const auto &k) { return (*event_time_of_flight)[k]; });
204
205 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target, &calibration,
206 m_processingDatas[i].binedges[output_index]);
207
208 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
209 tbb::parallel_reduce(range_info, task);
210
211 // Accumulate results into shared y_temp to combine local histograms
212 // Use atomic fetch_add to accumulate results into shared vectors
213 for (size_t j = 0; j < m_processingDatas[i].counts[output_index].size(); ++j) {
214 m_processingDatas[i].counts[output_index][j].fetch_add(task.y_temp[j],
215 std::memory_order_relaxed);
216 }
217 }
218 });
219 }
220 });
221 }
222 g_log.debug() << bankName << " stop" << timer << std::endl;
223 m_progress->report();
224 }
225}
226}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
const size_t m_events_per_chunk
number of events to read from disk at one time
ProcessBankSplitFullTimeTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, std::shared_ptr< NexusLoader > loader, std::vector< int > &workspaceIndices, std::vector< SpectraProcessingData > &processingDatas, const BankCalibrationFactory &calibFactory, const size_t events_per_chunk, const size_t grainsize_event, const std::map< Mantid::Types::Core::DateAndTime, int > &splitterMap, std::shared_ptr< std::vector< Types::Core::DateAndTime > > pulse_times, std::shared_ptr< API::Progress > &progress)
const std::string & bankName(const size_t wksp_index) const
std::stack< EventROI > getEventIndexRanges(H5::Group &event_group, const uint64_t number_events, std::unique_ptr< std::vector< uint64_t > > *event_index=nullptr) const
std::vector< BankCalibration > getCalibrations(const std::string &tof_unit, const size_t bank_index) const
void loadEvents(H5::DataSet &detID_SDS, H5::DataSet &tof_SDS, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes, std::unique_ptr< std::vector< uint32_t > > &detId_vec, std::unique_ptr< std::vector< float > > &tof_vec) const
Load detid and tof at the same time.
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
std::string toLogString(const std::string &bankName, const size_t total_events_to_read, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342