31 std::vector<std::string> &bankEntryNames, H5::H5File &h5file,
const bool is_time_filtered,
32 std::vector<int> &workspaceIndices, std::vector<API::MatrixWorkspace_sptr> &wksps,
33 const std::map<detid_t, double> &calibration,
const std::map<detid_t, double> &scale_at_sample,
34 const std::set<detid_t> &masked,
const size_t events_per_chunk,
const size_t grainsize_event,
35 std::vector<std::pair<int, PulseROI>> target_to_pulse_indices, std::shared_ptr<API::Progress> &progress)
36 : m_h5file(h5file), m_bankEntries(bankEntryNames), m_loader(is_time_filtered, {}, target_to_pulse_indices),
37 m_workspaceIndices(workspaceIndices), m_wksps(wksps), m_calibration(calibration),
38 m_scale_at_sample(scale_at_sample), m_masked(masked), m_events_per_chunk(events_per_chunk),
39 m_grainsize_event(grainsize_event), m_progress(progress) {}
42 auto entry =
m_h5file.openGroup(
"entry");
43 for (
size_t wksp_index = range.begin(); wksp_index < range.end(); ++wksp_index) {
46 if (bankName.empty()) {
50 g_log.
debug() << bankName <<
" start" << std::endl;
53 auto event_group = entry.openGroup(bankName);
57 const int64_t total_events =
static_cast<size_t>(tof_SDS.getSpace().getSelectNpoints());
58 if (total_events == 0) {
67 std::vector<API::ISpectrum *> spectra;
68 std::vector<std::vector<uint32_t>> y_temps;
69 for (
const auto &wksp :
m_wksps) {
70 spectra.push_back(&wksp->getSpectrum(wksp_index));
71 y_temps.emplace_back(spectra.back()->dataY().size());
75 std::unique_ptr<BankCalibration> calibration =
nullptr;
86 auto event_detid = std::make_unique<std::vector<uint32_t>>();
87 auto event_time_of_flight = std::make_unique<std::vector<float>>();
90 while (!eventSplitRanges.empty()) {
94 std::vector<size_t> offsets;
95 std::vector<size_t> slabsizes;
96 std::vector<std::pair<int, EventROI>> relative_target_ranges;
98 size_t total_events_to_read = 0;
102 auto [target, eventRange] = eventSplitRanges.top();
103 eventSplitRanges.pop();
105 size_t range_size = eventRange.second - eventRange.first;
109 if (range_size > remaining_chunk) {
111 relative_target_ranges.emplace_back(target,
112 EventROI(total_events_to_read, total_events_to_read + remaining_chunk));
113 offsets.push_back(eventRange.first);
114 slabsizes.push_back(remaining_chunk);
115 total_events_to_read += remaining_chunk;
117 eventSplitRanges.emplace(target,
EventROI(eventRange.first + remaining_chunk, eventRange.second));
120 relative_target_ranges.emplace_back(target,
121 EventROI(total_events_to_read, total_events_to_read + range_size));
122 offsets.push_back(eventRange.first);
123 slabsizes.push_back(range_size);
124 total_events_to_read += range_size;
130 std::ostringstream oss;
131 oss <<
"Processing " << bankName <<
" with " << total_events_to_read <<
" events in the ranges: ";
132 for (
size_t i = 0; i < offsets.size(); ++i) {
133 oss <<
"[" << offsets[i] <<
", " << (offsets[i] + slabsizes[i]) <<
"), ";
139 tbb::parallel_invoke(
146 if ((!calibration) || (calibration->idmin() >
static_cast<detid_t>(
minval)) ||
160 tbb::blocked_range<size_t>(0,
m_workspaceIndices.size()), [&](
const tbb::blocked_range<size_t> &r) {
161 for (size_t idx = r.begin(); idx != r.end(); ++idx) {
162 int i = m_workspaceIndices[idx];
165 std::vector<size_t> indices;
166 for (const auto &pair : relative_target_ranges) {
167 if (pair.first == static_cast<int>(i)) {
168 auto [start, end] = pair.second;
169 for (size_t k = start; k < end; ++k) {
170 indices.push_back(k);
175 auto event_id_view_for_target =
176 indices | std::views::transform([&event_detid](const auto &k) { return (*event_detid)[k]; });
177 auto event_tof_view_for_target = indices | std::views::transform([&event_time_of_flight](const auto &k) {
178 return (*event_time_of_flight)[k];
181 ProcessEventsTask task(&event_id_view_for_target, &event_tof_view_for_target, calibration.get(),
182 &spectra[idx]->readX());
184 const tbb::blocked_range<size_t> range_info(0, indices.size(), m_grainsize_event);
185 tbb::parallel_reduce(range_info, task);
187 std::transform(y_temps[idx].begin(), y_temps[idx].end(), task.y_temp.begin(), y_temps[idx].begin(),
188 std::plus<uint32_t>());
194 tbb::parallel_for(
size_t(0),
m_wksps.size(), [&](
size_t i) {
195 auto &y_values = spectra[i]->dataY();
196 std::copy(y_temps[i].cbegin(), y_temps[i].cend(), y_values.begin());
199 g_log.
debug() << bankName <<
" stop " << timer << std::endl;
ProcessBankSplitTask(std::vector< std::string > &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered, std::vector< int > &workspaceIndices, std::vector< API::MatrixWorkspace_sptr > &wksps, const std::map< detid_t, double > &calibration, const std::map< detid_t, double > &scale_at_sample, const std::set< detid_t > &masked, const size_t events_per_chunk, const size_t grainsize_event, std::vector< std::pair< int, PulseROI > > target_to_pulse_indices, std::shared_ptr< API::Progress > &progress)