Mantid
Loading...
Searching...
No Matches
ProcessBankSplitTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
10#include "MantidKernel/Logger.h"
12#include "MantidKernel/Timer.h"
13#include "MantidNexus/H5Util.h"
14#include "tbb/parallel_for.h"
15#include "tbb/parallel_reduce.h"
16
17#include <ranges>
18
20
21namespace {
22
23// Logger for this class
24auto g_log = Kernel::Logger("ProcessBankSplitTask");
25
26} // namespace
27ProcessBankSplitTask::ProcessBankSplitTask(std::vector<std::string> &bankEntryNames, H5::H5File &h5file,
28 std::shared_ptr<NexusLoader> loader, std::vector<int> &workspaceIndices,
29 std::vector<SpectraProcessingData> &processingDatas,
30 const BankCalibrationFactory &calibFactory, const size_t events_per_chunk,
31 const size_t grainsize_event, std::shared_ptr<API::Progress> &progress)
32 : ProcessBankTaskBase(bankEntryNames, loader, calibFactory), m_h5file(h5file), m_workspaceIndices(workspaceIndices),
33 m_processingDatas(processingDatas), m_events_per_chunk(events_per_chunk), m_grainsize_event(grainsize_event),
34 m_progress(progress) {}
35void ProcessBankSplitTask::operator()(const tbb::blocked_range<size_t> &range) const {
36 auto entry = m_h5file.openGroup("entry"); // type=NXentry
37 for (size_t bank_index = range.begin(); bank_index < range.end(); ++bank_index) {
38 const auto &bankName = this->bankName(bank_index);
39 // empty bank names indicate spectra to skip; control should never get here, but just in case
40 if (bankName.empty()) {
41 continue;
42 }
43 Kernel::Timer timer;
44 g_log.debug() << bankName << " start" << std::endl;
45
46 // open the bank
47 auto event_group = entry.openGroup(bankName); // type=NXevent_data
48
49 // skip empty dataset
50 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
51 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
52 if (total_events == 0) {
53 m_progress->report();
54 continue;
55 }
56
57 auto eventSplitRanges = this->getEventIndexSplitRanges(event_group, total_events);
58
59 // get handle to the data
60 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
61 // auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
62 // and the units
63 std::string tof_unit;
64 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
65 // now the calibration for the output group can be created
66 // which detectors go into the current group - assumes ouput spectrum number is one more than workspace index
67 const auto calibrations = this->getCalibrations(tof_unit, bank_index);
68
69 // declare arrays once so memory can be reused
70 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
71 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
72
73 // read parts of the bank at a time until all events are processed
74 while (!eventSplitRanges.empty()) {
75 // Create offsets and slab sizes for the next chunk of events.
76 // This will read at most m_events_per_chunk events from the file
77 // and will split the ranges if necessary for the next iteration.
78 std::vector<size_t> offsets;
79 std::vector<size_t> slabsizes;
80 std::vector<std::pair<int, EventROI>> relative_target_ranges;
81
82 size_t total_events_to_read = 0;
83 // Process the event ranges until we reach the desired number of events to read or run out of ranges
84 while (!eventSplitRanges.empty() && total_events_to_read < m_events_per_chunk) {
85 // Get the next event range from the stack
86 auto [target, eventRange] = eventSplitRanges.top();
87 eventSplitRanges.pop();
88
89 size_t range_size = eventRange.second - eventRange.first;
90 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
91
92 // If the range size is larger than the remaining chunk, we need to split it
93 if (range_size > remaining_chunk) {
94 // Split the range: process only part of it now, push the rest back for later
95 relative_target_ranges.emplace_back(target,
96 EventROI(total_events_to_read, total_events_to_read + remaining_chunk));
97 offsets.push_back(eventRange.first);
98 slabsizes.push_back(remaining_chunk);
99 total_events_to_read += remaining_chunk;
100 // Push the remainder of the range back to the front for next iteration
101 eventSplitRanges.emplace(target, EventROI(eventRange.first + remaining_chunk, eventRange.second));
102 break;
103 } else {
104 relative_target_ranges.emplace_back(target,
105 EventROI(total_events_to_read, total_events_to_read + range_size));
106 offsets.push_back(eventRange.first);
107 slabsizes.push_back(range_size);
108 total_events_to_read += range_size;
109 // Continue to next range
110 }
111 }
112
113 // log the event ranges being processed
114 g_log.debug(toLogString(bankName, total_events_to_read, offsets, slabsizes));
115
116 if (total_events_to_read == 0) {
117 continue; // nothing to do
118 }
119
120 // load detid and tof at the same time
121 this->loadEvents(detID_SDS, tof_SDS, offsets, slabsizes, event_detid, event_time_of_flight);
122
123 // loop over targets
124 tbb::parallel_for(
125 tbb::blocked_range<size_t>(0, m_workspaceIndices.size()), [&](const tbb::blocked_range<size_t> &r) {
126 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
127 int i = m_workspaceIndices[idx];
128
129 // Precompute indices for this target
130 std::vector<size_t> indices;
131 for (const auto &pair : relative_target_ranges) {
132 if (pair.first == static_cast<int>(i)) {
133 auto [start, end] = pair.second;
134 for (size_t k = start; k < end; ++k) {
135 indices.push_back(k);
136 }
137 }
138 }
139
140 auto event_id_view_for_target =
141 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
142 auto event_tof_view_for_target = indices | std::views::transform([&event_time_of_flight](const auto &k) {
143 return (*event_time_of_flight)[k];
144 });
145
146 // Loop over all output spectra / groups
147 tbb::parallel_for(tbb::blocked_range<size_t>(0, m_processingDatas[i].counts.size()),
148 [&](const tbb::blocked_range<size_t> &output_range) {
149 for (size_t output_index = output_range.begin(); output_index < output_range.end();
150 ++output_index) {
151 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target,
152 &calibrations.at(output_index),
153 m_processingDatas[i].binedges[output_index]);
154
155 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
156 tbb::parallel_reduce(range_info, task);
157
158 // Accumulate results into shared y_temp to combine local histograms
159 // Use atomic fetch_add to accumulate results into shared vectors
160 for (size_t j = 0; j < m_processingDatas[i].counts[output_index].size(); ++j) {
161 m_processingDatas[i].counts[output_index][j].fetch_add(task.y_temp[j],
162 std::memory_order_relaxed);
163 }
164 }
165 });
166 }
167 });
168 }
169 g_log.debug() << bankName << " stop " << timer << std::endl;
170 m_progress->report();
171 }
172}
173}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
const size_t m_events_per_chunk
number of events to read from disk at one time
void operator()(const tbb::blocked_range< size_t > &range) const
ProcessBankSplitTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, std::shared_ptr< NexusLoader > loader, std::vector< int > &workspaceIndices, std::vector< SpectraProcessingData > &processingDatas, const BankCalibrationFactory &calibFactory, const size_t events_per_chunk, const size_t grainsize_event, std::shared_ptr< API::Progress > &progress)
const std::string & bankName(const size_t wksp_index) const
std::stack< std::pair< int, EventROI > > getEventIndexSplitRanges(H5::Group &event_group, const uint64_t number_events) const
std::vector< BankCalibration > getCalibrations(const std::string &tof_unit, const size_t bank_index) const
void loadEvents(H5::DataSet &detID_SDS, H5::DataSet &tof_SDS, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes, std::unique_ptr< std::vector< uint32_t > > &detId_vec, std::unique_ptr< std::vector< float > > &tof_vec) const
Load detid and tof at the same time.
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
std::string toLogString(const std::string &bankName, const size_t total_events_to_read, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
std::pair< uint64_t, uint64_t > EventROI
Definition NexusLoader.h:26
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342