Mantid
Loading...
Searching...
No Matches
ProcessBankTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
10#include "MantidKernel/Logger.h"
12#include "MantidKernel/Timer.h"
13#include "MantidNexus/H5Util.h"
14#include "tbb/parallel_for.h"
15#include "tbb/parallel_reduce.h"
16
18
19namespace {
20
21// Logger for this class
22auto g_log = Kernel::Logger("ProcessBankTask");
23
24} // namespace
25ProcessBankTask::ProcessBankTask(std::vector<std::string> &bankEntryNames, H5::H5File &h5file,
26 std::shared_ptr<NexusLoader> loader, SpectraProcessingData &processingData,
27 const BankCalibrationFactory &calibFactory, const size_t events_per_chunk,
28 const size_t grainsize_event, std::shared_ptr<API::Progress> &progress)
29 : ProcessBankTaskBase(bankEntryNames, loader, calibFactory), m_h5file(h5file), m_processingData(processingData),
30 m_events_per_chunk(events_per_chunk), m_grainsize_event(grainsize_event), m_progress(progress) {}
31
32void ProcessBankTask::operator()(const tbb::blocked_range<size_t> &range) const {
33 auto entry = m_h5file.openGroup("entry"); // type=NXentry
34 for (size_t bank_index = range.begin(); bank_index < range.end(); ++bank_index) {
35 const auto &bankName = this->bankName(bank_index);
36
37 // empty bank names indicate spectra to skip; control should never get here, but just in case
38 if (bankName.empty())
39 continue;
40
41 Kernel::Timer timer;
42 g_log.debug() << bankName << " start" << std::endl;
43
44 // open the bank
45 auto event_group = entry.openGroup(bankName); // type=NXevent_data
46
47 // skip empty dataset
48 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
49 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
50 if (total_events == 0) {
51 m_progress->report();
52 continue;
53 }
54
55 auto eventRanges = this->getEventIndexRanges(event_group, total_events);
56
57 // get handle to the data
58 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
59 // auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
60 // and the units
61 std::string tof_unit;
62 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
63 // now the calibration for the output group can be created
64 // which detectors go into the current group - assumes ouput spectrum number is one more than workspace index
65 const auto calibrations = this->getCalibrations(tof_unit, bank_index);
66
67 // declare arrays once so memory can be reused
68 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
69 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
70
71 // read parts of the bank at a time until all events are processed
72 while (!eventRanges.empty()) {
73 // Create offsets and slab sizes for the next chunk of events.
74 // This will read at most m_events_per_chunk events from the file
75 // and will split the ranges if necessary for the next iteration.
76 std::vector<size_t> offsets;
77 std::vector<size_t> slabsizes;
78
79 size_t total_events_to_read = 0;
80 // Process the event ranges until we reach the desired number of events to read or run out of ranges
81 while (!eventRanges.empty() && total_events_to_read < m_events_per_chunk) {
82 // Get the next event range from the stack
83 auto eventRange = eventRanges.top();
84 eventRanges.pop();
85
86 size_t range_size = eventRange.second - eventRange.first;
87 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
88
89 // If the range size is larger than the remaining chunk, we need to split it
90 if (range_size > remaining_chunk) {
91 // Split the range: process only part of it now, push the rest back for later
92 offsets.push_back(eventRange.first);
93 slabsizes.push_back(remaining_chunk);
94 total_events_to_read += remaining_chunk;
95 // Push the remainder of the range back to the front for next iteration
96 eventRanges.emplace(eventRange.first + remaining_chunk, eventRange.second);
97 break;
98 } else {
99 offsets.push_back(eventRange.first);
100 slabsizes.push_back(range_size);
101 total_events_to_read += range_size;
102 // Continue to next range
103 }
104 }
105
106 // log the event ranges being processed
107 g_log.debug(toLogString(bankName, total_events_to_read, offsets, slabsizes));
108
109 if (total_events_to_read == 0) {
110 continue; // nothing to do
111 }
112
113 // load detid and tof at the same time
114 this->loadEvents(detID_SDS, tof_SDS, offsets, slabsizes, event_detid, event_time_of_flight);
115
116 // Loop over all output spectra / groups
117 tbb::parallel_for(
118 tbb::blocked_range<size_t>(0, m_processingData.counts.size()),
119 [&](const tbb::blocked_range<size_t> &output_range) {
120 for (size_t output_index = output_range.begin(); output_index < output_range.end(); ++output_index) {
121 // Create a local task for this thread
122 ProcessEventsTask task(event_detid.get(), event_time_of_flight.get(), &calibrations.at(output_index),
123 m_processingData.binedges[output_index]);
124
125 const tbb::blocked_range<size_t> range_info(0, event_time_of_flight->size(), m_grainsize_event);
126 tbb::parallel_reduce(range_info, task);
127
128 // Accumulate results into shared y_temp to combine local histograms
129 // Use atomic fetch_add to accumulate results into shared vectors
130 for (size_t i = 0; i < m_processingData.counts[output_index].size(); ++i) {
131 m_processingData.counts[output_index][i].fetch_add(task.y_temp[i], std::memory_order_relaxed);
132 }
133 }
134 });
135 }
136
137 g_log.debug() << bankName << " stop " << timer << std::endl;
138 m_progress->report();
139 }
140}
141}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
const std::string & bankName(const size_t wksp_index) const
std::stack< EventROI > getEventIndexRanges(H5::Group &event_group, const uint64_t number_events, std::unique_ptr< std::vector< uint64_t > > *event_index=nullptr) const
std::vector< BankCalibration > getCalibrations(const std::string &tof_unit, const size_t bank_index) const
void loadEvents(H5::DataSet &detID_SDS, H5::DataSet &tof_SDS, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes, std::unique_ptr< std::vector< uint32_t > > &detId_vec, std::unique_ptr< std::vector< float > > &tof_vec) const
Load detid and tof at the same time.
const size_t m_events_per_chunk
number of events to read from disk at one time
ProcessBankTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, std::shared_ptr< NexusLoader > loader, SpectraProcessingData &processingData, const BankCalibrationFactory &calibFactory, const size_t events_per_chunk, const size_t grainsize_event, std::shared_ptr< API::Progress > &progress)
void operator()(const tbb::blocked_range< size_t > &range) const
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
std::string toLogString(const std::string &bankName, const size_t total_events_to_read, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342
std::vector< std::vector< std::atomic_uint32_t > > counts