Mantid
Loading...
Searching...
No Matches
ProcessBankTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2025 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7
10#include "MantidKernel/Logger.h"
12#include "MantidKernel/Timer.h"
13#include "MantidNexus/H5Util.h"
14#include "tbb/parallel_for.h"
15#include "tbb/parallel_reduce.h"
16
18
19namespace {
20
21// Logger for this class
22auto g_log = Kernel::Logger("ProcessBankTask");
23
24} // namespace
25ProcessBankTask::ProcessBankTask(std::vector<std::string> &bankEntryNames, H5::H5File &h5file,
26 std::shared_ptr<NexusLoader> loader, SpectraProcessingData &processingData,
27 const BankCalibrationFactory &calibFactory, const size_t events_per_chunk,
28 const size_t grainsize_event, std::shared_ptr<API::Progress> &progress)
29 : ProcessBankTaskBase(bankEntryNames, loader, calibFactory), m_h5file(h5file), m_processingData(processingData),
30 m_events_per_chunk(events_per_chunk), m_grainsize_event(grainsize_event), m_progress(progress) {}
31
32void ProcessBankTask::operator()(const tbb::blocked_range<size_t> &range) const {
33 auto entry = m_h5file.openGroup("entry"); // type=NXentry
34 for (size_t bank_index = range.begin(); bank_index < range.end(); ++bank_index) {
35 const auto &bankName = this->bankName(bank_index);
36
37 // empty bank names indicate spectra to skip; control should never get here, but just in case
38 if (bankName.empty())
39 continue;
40
41 Kernel::Timer timer;
42 g_log.debug() << bankName << " start" << std::endl;
43
44 // open the bank
45 auto event_group = entry.openGroup(bankName); // type=NXevent_data
46
47 // skip empty dataset
48 auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT);
49 const int64_t total_events = static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
50 if (total_events == 0) {
51 m_progress->report();
52 g_log.debug() << bankName << " empty\n";
53 continue;
54 }
55 // and the units
56 std::string tof_unit;
57 Nexus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit);
58 // now the calibration for the output group can be created
59 // which detectors go into the current group - assumes ouput spectrum number is one more than workspace index
60 const auto calibrations = this->getCalibrations(tof_unit, bank_index);
61 if (calibrations.empty()) {
62 m_progress->report();
63 g_log.debug() << "skipping " << bankName << " because calibration is empty\n";
64 continue;
65 }
66
67 auto eventRanges = this->getEventIndexRanges(event_group, total_events);
68
69 // get handle to the detector IDs
70 auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID);
71
72 // declare arrays once so memory can be reused
73 auto event_detid = std::make_unique<std::vector<uint32_t>>(); // uint32 for ORNL nexus file
74 auto event_time_of_flight = std::make_unique<std::vector<float>>(); // float for ORNL nexus files
75
76 // read parts of the bank at a time until all events are processed
77 while (!eventRanges.empty()) {
78 // Create offsets and slab sizes for the next chunk of events.
79 // This will read at most m_events_per_chunk events from the file
80 // and will split the ranges if necessary for the next iteration.
81 std::vector<size_t> offsets;
82 std::vector<size_t> slabsizes;
83
84 size_t total_events_to_read = 0;
85 // Process the event ranges until we reach the desired number of events to read or run out of ranges
86 while (!eventRanges.empty() && total_events_to_read < m_events_per_chunk) {
87 // Get the next event range from the stack
88 auto eventRange = eventRanges.top();
89 eventRanges.pop();
90
91 size_t range_size = eventRange.second - eventRange.first;
92 size_t remaining_chunk = m_events_per_chunk - total_events_to_read;
93
94 // If the range size is larger than the remaining chunk, we need to split it
95 if (range_size > remaining_chunk) {
96 // Split the range: process only part of it now, push the rest back for later
97 offsets.push_back(eventRange.first);
98 slabsizes.push_back(remaining_chunk);
99 total_events_to_read += remaining_chunk;
100 // Push the remainder of the range back to the front for next iteration
101 eventRanges.emplace(eventRange.first + remaining_chunk, eventRange.second);
102 break;
103 } else {
104 offsets.push_back(eventRange.first);
105 slabsizes.push_back(range_size);
106 total_events_to_read += range_size;
107 // Continue to next range
108 }
109 }
110
111 // log the event ranges being processed
112 g_log.debug(toLogString(bankName, total_events_to_read, offsets, slabsizes));
113
114 if (total_events_to_read == 0) {
115 continue; // nothing to do
116 }
117
118 // load detid and tof at the same time
119 this->loadEvents(detID_SDS, tof_SDS, offsets, slabsizes, event_detid, event_time_of_flight);
120
121 // Loop over all output spectra / groups
122 tbb::parallel_for(
123 tbb::blocked_range<size_t>(0, m_processingData.counts.size()),
124 [&](const tbb::blocked_range<size_t> &output_range) {
125 for (size_t output_index = output_range.begin(); output_index < output_range.end(); ++output_index) {
126 // Create a local task for this thread
127 ProcessEventsTask task(event_detid.get(), event_time_of_flight.get(), &calibrations.at(output_index),
128 m_processingData.binedges[output_index]);
129
130 const tbb::blocked_range<size_t> range_info(0, event_time_of_flight->size(), m_grainsize_event);
131 tbb::parallel_reduce(range_info, task);
132
133 // Accumulate results into shared y_temp to combine local histograms
134 // Use atomic fetch_add to accumulate results into shared vectors
135 for (size_t i = 0; i < m_processingData.counts[output_index].size(); ++i) {
136 m_processingData.counts[output_index][i].fetch_add(task.y_temp[i], std::memory_order_relaxed);
137 }
138 }
139 });
140 }
141
142 g_log.debug() << bankName << " stop " << timer << std::endl;
143 m_progress->report();
144 }
145}
146}; // namespace Mantid::DataHandling::AlignAndFocusPowderSlim
const std::string & bankName(const size_t wksp_index) const
std::stack< EventROI > getEventIndexRanges(H5::Group &event_group, const uint64_t number_events, std::unique_ptr< std::vector< uint64_t > > *event_index=nullptr) const
std::vector< BankCalibration > getCalibrations(const std::string &tof_unit, const size_t bank_index) const
void loadEvents(H5::DataSet &detID_SDS, H5::DataSet &tof_SDS, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes, std::unique_ptr< std::vector< uint32_t > > &detId_vec, std::unique_ptr< std::vector< float > > &tof_vec) const
Load detid and tof at the same time.
const size_t m_events_per_chunk
number of events to read from disk at one time
ProcessBankTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, std::shared_ptr< NexusLoader > loader, SpectraProcessingData &processingData, const BankCalibrationFactory &calibFactory, const size_t events_per_chunk, const size_t grainsize_event, std::shared_ptr< API::Progress > &progress)
void operator()(const tbb::blocked_range< size_t > &range) const
void debug(const std::string &msg)
Logs at debug level.
Definition Logger.cpp:145
A simple class that provides a wall-clock (not processor time) timer.
Definition Timer.h:27
Kernel::Logger g_log("ExperimentInfo")
static logger object
const std::string TIME_OF_FLIGHT("event_time_offset")
std::string toLogString(const std::string &bankName, const size_t total_events_to_read, const std::vector< size_t > &offsets, const std::vector< size_t > &slabsizes)
MANTID_NEXUS_DLL void readStringAttribute(const H5::H5Object &object, const std::string &attributeName, std::string &output)
Definition H5Util.cpp:342
std::vector< std::vector< std::atomic_uint32_t > > counts