28 std::shared_ptr<NexusLoader> loader, std::vector<int> &workspaceIndices,
29 std::vector<SpectraProcessingData> &processingDatas,
31 const size_t grainsize_event, std::shared_ptr<API::Progress> &progress)
32 :
ProcessBankTaskBase(bankEntryNames, loader, calibFactory), m_h5file(h5file), m_workspaceIndices(workspaceIndices),
33 m_processingDatas(processingDatas), m_events_per_chunk(events_per_chunk), m_grainsize_event(grainsize_event),
34 m_progress(progress) {}
36 auto entry =
m_h5file.openGroup(
"entry");
37 for (
size_t bank_index = range.begin(); bank_index < range.end(); ++bank_index) {
47 auto event_group = entry.openGroup(
bankName);
51 const int64_t total_events =
static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
52 if (total_events == 0) {
70 auto event_detid = std::make_unique<std::vector<uint32_t>>();
71 auto event_time_of_flight = std::make_unique<std::vector<float>>();
74 while (!eventSplitRanges.empty()) {
78 std::vector<size_t> offsets;
79 std::vector<size_t> slabsizes;
80 std::vector<std::pair<int, EventROI>> relative_target_ranges;
82 size_t total_events_to_read = 0;
86 auto [target, eventRange] = eventSplitRanges.top();
87 eventSplitRanges.pop();
89 size_t range_size = eventRange.second - eventRange.first;
93 if (range_size > remaining_chunk) {
95 relative_target_ranges.emplace_back(target,
96 EventROI(total_events_to_read, total_events_to_read + remaining_chunk));
97 offsets.push_back(eventRange.first);
98 slabsizes.push_back(remaining_chunk);
99 total_events_to_read += remaining_chunk;
101 eventSplitRanges.emplace(target,
EventROI(eventRange.first + remaining_chunk, eventRange.second));
104 relative_target_ranges.emplace_back(target,
105 EventROI(total_events_to_read, total_events_to_read + range_size));
106 offsets.push_back(eventRange.first);
107 slabsizes.push_back(range_size);
108 total_events_to_read += range_size;
116 if (total_events_to_read == 0) {
121 this->
loadEvents(detID_SDS, tof_SDS, offsets, slabsizes, event_detid, event_time_of_flight);
125 tbb::blocked_range<size_t>(0,
m_workspaceIndices.size()), [&](
const tbb::blocked_range<size_t> &r) {
126 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
127 int i = m_workspaceIndices[idx];
130 std::vector<size_t> indices;
131 for (const auto &pair : relative_target_ranges) {
132 if (pair.first == static_cast<int>(i)) {
133 auto [start, end] = pair.second;
134 for (size_t k = start; k < end; ++k) {
135 indices.push_back(k);
140 auto event_id_view_for_target =
141 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
142 auto event_tof_view_for_target = indices | std::views::transform([&event_time_of_flight](const auto &k) {
143 return (*event_time_of_flight)[k];
147 tbb::parallel_for(tbb::blocked_range<size_t>(0, m_processingDatas[i].counts.size()),
148 [&](const tbb::blocked_range<size_t> &output_range) {
149 for (size_t output_index = output_range.begin(); output_index < output_range.end();
151 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target,
152 &calibrations.at(output_index),
153 m_processingDatas[i].binedges[output_index]);
155 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
156 tbb::parallel_reduce(range_info, task);
160 for (size_t j = 0; j < m_processingDatas[i].counts[output_index].size(); ++j) {
161 m_processingDatas[i].counts[output_index][j].fetch_add(task.y_temp[j],
162 std::memory_order_relaxed);