Mantid
Loading...
Searching...
No Matches
LoadBankFromDiskTask.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
12#include "MantidKernel/Unit.h"
14
15// clang-format off
16#include <nexus/NeXusFile.hpp>
17#include <nexus/NeXusException.hpp>
18// clang-format on
19
20#include <algorithm>
21#include <utility>
22
23namespace Mantid::DataHandling {
24
37LoadBankFromDiskTask::LoadBankFromDiskTask(DefaultEventLoader &loader, std::string entry_name, std::string entry_type,
38 const std::size_t numEvents, const bool oldNeXusFileNames,
39 API::Progress *prog, std::shared_ptr<std::mutex> ioMutex,
40 Kernel::ThreadScheduler &scheduler, std::vector<int> framePeriodNumbers)
41 : m_loader(loader), entry_name(std::move(entry_name)), entry_type(std::move(entry_type)), prog(prog),
42 scheduler(scheduler), m_loadError(false), m_oldNexusFileNames(oldNeXusFileNames), m_have_weight(false),
43 m_framePeriodNumbers(std::move(framePeriodNumbers)) {
44 setMutex(ioMutex);
45 m_cost = static_cast<double>(numEvents);
46 m_min_id = std::numeric_limits<uint32_t>::max();
47 m_max_id = 0;
48}
49
53void LoadBankFromDiskTask::loadPulseTimes(::NeXus::File &file) {
54 try {
55 // First, get info about the event_time_zero field in this bank
56 file.openData("event_time_zero");
57 } catch (::NeXus::Exception &) {
58 // Field not found error is most likely.
59 // Use the "proton_charge" das logs.
61 return;
62 }
63 std::string thisStartTime;
64 size_t thispulseTimes = 0;
65 // If the offset is not present, use Unix epoch
66 if (!file.hasAttr("offset")) {
67 thisStartTime = "1970-01-01T00:00:00Z";
68 m_loader.alg->getLogger().warning() << "In loadPulseTimes: no ISO8601 offset attribute provided for "
69 "event_time_zero, using UNIX epoch instead\n";
70 } else {
71 file.getAttr("offset", thisStartTime);
72 }
73
74 if (!file.getInfo().dims.empty())
75 thispulseTimes = file.getInfo().dims[0];
76 file.closeData();
77
78 // Now, we look through existing ones to see if it is already loaded
79 // thisBankPulseTimes = NULL;
80 for (auto &bankPulseTime : m_loader.m_bankPulseTimes) {
81 if (bankPulseTime->equals(thispulseTimes, thisStartTime)) {
82 thisBankPulseTimes = bankPulseTime;
83 return;
84 }
85 }
86
87 // Not found? Need to load and add it
88 thisBankPulseTimes = std::make_shared<BankPulseTimes>(boost::ref(file), m_framePeriodNumbers);
90}
91
97std::vector<uint64_t> LoadBankFromDiskTask::loadEventIndex(::NeXus::File &file) {
98 // Get the event_index (a list of size of # of pulses giving the index in
99 // the event list for that pulse) as a uint64 vector.
100 // The Nexus standard does not specify if this is to be 32-bit or 64-bit
101 // integers, so we use the NeXusIOHelper to do the conversion on the fly.
102 auto event_index = Mantid::NeXus::NeXusIOHelper::readNexusVector<uint64_t>(file, "event_index");
103
104 // Look for the sign that the bank is empty
105 if (event_index.size() == 1) {
106 if (event_index[0] == 0) {
107 // One entry, only zero. This means NO events in this bank.
108 m_loadError = true;
109 m_loader.alg->getLogger().debug() << "Bank " << entry_name << " is empty.\n";
110 }
111 }
112 return event_index;
113}
114
123void LoadBankFromDiskTask::prepareEventId(::NeXus::File &file, int64_t &start_event, int64_t &stop_event,
124 const std::vector<uint64_t> &event_index) {
125 // Get the list of pixel ID's
127 file.openData("event_pixel_id");
128 else
129 file.openData("event_id");
130
131 // By default, use all available indices
132 start_event = event_index[0];
133 ::NeXus::Info id_info = file.getInfo();
134 // dims[0] can be negative in ISIS meaning 2^32 + dims[0]. Take that into
135 // account
136 int64_t dim0 = recalculateDataSize(id_info.dims[0]);
137 stop_event = dim0;
138
139 // We are loading part - work out the event number range
140 if (m_loader.chunk != EMPTY_INT()) {
141 start_event = static_cast<int64_t>(m_loader.chunk - m_loader.firstChunkForBank) *
142 static_cast<int64_t>(m_loader.eventsPerChunk);
143 // Don't change stop_event for the final chunk
144 if (start_event + static_cast<int64_t>(m_loader.eventsPerChunk) < stop_event)
145 stop_event = start_event + static_cast<int64_t>(m_loader.eventsPerChunk);
146 }
147
148 // Make sure it is within range
149 if (stop_event > dim0)
150 stop_event = dim0;
151
152 m_loader.alg->getLogger().debug() << entry_name << ": start_event " << start_event << " stop_event " << stop_event
153 << "\n";
154}
155
160std::unique_ptr<std::vector<uint32_t>> LoadBankFromDiskTask::loadEventId(::NeXus::File &file) {
161 // This is the data size
162 ::NeXus::Info id_info = file.getInfo();
163 int64_t dim0 = recalculateDataSize(id_info.dims[0]);
164
165 // Now we allocate the required arrays
166 auto event_id = std::make_unique<std::vector<uint32_t>>(m_loadSize[0]);
167
168 // Check that the required space is there in the file.
169 if (dim0 < m_loadSize[0] + m_loadStart[0]) {
170 m_loader.alg->getLogger().warning() << "Entry " << entry_name << "'s event_id field is too small (" << dim0
171 << ") to load the desired data size (" << m_loadSize[0] + m_loadStart[0]
172 << ").\n";
173 m_loadError = true;
174 }
175
176 if (m_loader.alg->getCancel())
177 m_loadError = true; // To allow cancelling the algorithm
178
179 if (!m_loadError) {
180 // Must be uint32
181 if (id_info.type == ::NeXus::UINT32)
182 file.getSlab(event_id->data(), m_loadStart, m_loadSize);
183 else {
185 << "Entry " << entry_name << "'s event_id field is not UINT32! It will be skipped.\n";
186 m_loadError = true;
187 }
188 file.closeData();
189
190 // determine the range of pixel ids
191 m_min_id = *(std::min_element(event_id->data(), event_id->data() + m_loadSize[0]));
192 m_max_id = *(std::max_element(event_id->data(), event_id->data() + m_loadSize[0]));
193
194 if (m_min_id > static_cast<uint32_t>(m_loader.eventid_max)) {
195 // All the detector IDs in the bank are higher than the highest 'known'
196 // (from the IDF)
197 // ID. Setting this will abort the loading of the bank.
198 m_loadError = true;
199 }
200 // fixup the minimum pixel id in the case that it's lower than the lowest
201 // 'known' id. We test this by checking that when we add the offset we
202 // would not get a negative index into the vector. Note that m_min_id is
203 // a uint so we have to be cautious about adding it to an int which may be
204 // negative.
205 if (static_cast<int32_t>(m_min_id) + m_loader.pixelID_to_wi_offset < 0) {
206 m_min_id = static_cast<uint32_t>(abs(m_loader.pixelID_to_wi_offset));
207 }
208 // fixup the maximum pixel id in the case that it's higher than the
209 // highest 'known' id
210 if (m_max_id > static_cast<uint32_t>(m_loader.eventid_max))
211 m_max_id = static_cast<uint32_t>(m_loader.eventid_max);
212 }
213 return event_id;
214}
215
220std::unique_ptr<std::vector<float>> LoadBankFromDiskTask::loadTof(::NeXus::File &file) {
221 // Allocate the array
222 auto event_time_of_flight = std::make_unique<std::vector<float>>(m_loadSize[0]);
223
224 // Get the list of event_time_of_flight's
225 std::string key, tof_unit;
227 key = "event_time_offset";
228 else
229 key = "event_time_of_flight";
230 file.openData(key);
231
232 // Check that the required space is there in the file.
233 ::NeXus::Info tof_info = file.getInfo();
234 int64_t tof_dim0 = recalculateDataSize(tof_info.dims[0]);
235 if (tof_dim0 < m_loadSize[0] + m_loadStart[0]) {
236 m_loader.alg->getLogger().warning() << "Entry " << entry_name
237 << "'s event_time_offset field is too small "
238 "to load the desired data.\n";
239 m_loadError = true;
240 }
241
242 // Mantid assumes event_time_offset to be float.
243 // Nexus only requires event_time_offset to be a NXNumber.
244 // We thus have to consider 32-bit or 64-bit options, and we
245 // explicitly allow downcasting using the additional AllowDowncasting
246 // template argument.
247 auto vec = Mantid::NeXus::NeXusIOHelper::readNexusSlab<float, Mantid::NeXus::NeXusIOHelper::AllowNarrowing>(
248 file, key, m_loadStart, m_loadSize);
249 file.getAttr("units", tof_unit);
250 file.closeData();
251 // Convert Tof to microseconds
252 Kernel::Units::timeConversionVector(vec, tof_unit, "microseconds");
253 std::copy(vec.begin(), vec.end(), event_time_of_flight->data());
254
255 return event_time_of_flight;
256}
257
263std::unique_ptr<std::vector<float>> LoadBankFromDiskTask::loadEventWeights(::NeXus::File &file) {
264 try {
265 // First, get info about the event_weight field in this bank
266 file.openData("event_weight");
267 } catch (::NeXus::Exception &) {
268 // Field not found error is most likely.
269 m_have_weight = false;
270 return std::unique_ptr<std::vector<float>>();
271 }
272 // OK, we've got them
273 m_have_weight = true;
274
275 // Allocate the array
276 auto event_weight = std::make_unique<std::vector<float>>(m_loadSize[0]);
277
278 ::NeXus::Info weight_info = file.getInfo();
279 int64_t weight_dim0 = recalculateDataSize(weight_info.dims[0]);
280 if (weight_dim0 < m_loadSize[0] + m_loadStart[0]) {
281 m_loader.alg->getLogger().warning() << "Entry " << entry_name
282 << "'s event_weight field is too small to load the desired data.\n";
283 m_loadError = true;
284 }
285
286 // Check that the type is what it is supposed to be
287 if (weight_info.type == ::NeXus::FLOAT32)
288 file.getSlab(event_weight->data(), m_loadStart, m_loadSize);
289 else {
290 m_loader.alg->getLogger().warning() << "Entry " << entry_name
291 << "'s event_weight field is not FLOAT32! It will be skipped.\n";
292 m_loadError = true;
293 }
294
295 if (!m_loadError) {
296 file.closeData();
297 }
298 return event_weight;
299}
300
302 // These give the limits in each file as to which events we actually load
303 // (when filtering by time).
304 m_loadStart.resize(1, 0);
305 m_loadSize.resize(1, 0);
306
307 m_loadError = false;
309
310 prog->report(entry_name + ": load from disk");
311
312 // arrays to load into
313 std::unique_ptr<std::vector<uint32_t>> event_id;
314 std::unique_ptr<std::vector<float>> event_time_of_flight;
315 std::unique_ptr<std::vector<float>> event_weight;
316 std::vector<uint64_t> event_index;
317
318 // Open the file
319 ::NeXus::File file(m_loader.alg->m_filename);
320 try {
321 // Navigate into the file
322 file.openGroup(m_loader.alg->m_top_entry_name, "NXentry");
323 // Open the bankN_event group
324 file.openGroup(entry_name, entry_type);
325
326 // Load the event_index field.
327 event_index = this->loadEventIndex(file);
328
329 if (!m_loadError) {
330 // Load and validate the pulse times
331 this->loadPulseTimes(file);
332
333 // The event_index should be the same length as the pulse times from DAS
334 // logs.
335 if (event_index.size() != thisBankPulseTimes->pulseTimes.size())
336 m_loader.alg->getLogger().warning() << "Bank " << entry_name
337 << " has a mismatch between the number of event_index entries "
338 "and the number of pulse times in event_time_zero.\n";
339
340 // Open and validate event_id field.
341 int64_t start_event = 0;
342 int64_t stop_event = 0;
343 this->prepareEventId(file, start_event, stop_event, event_index);
344
345 // These are the arguments to getSlab()
346 m_loadStart[0] = start_event;
347 m_loadSize[0] = stop_event - start_event;
348
349 if ((m_loadSize[0] > 0) && (m_loadStart[0] >= 0)) {
350 // Load pixel IDs
351 event_id = this->loadEventId(file);
352 if (m_loader.alg->getCancel()) {
353 m_loader.alg->getLogger().error() << "Loading bank " << entry_name << " is cancelled.\n";
354 m_loadError = true; // To allow cancelling the algorithm
355 }
356
357 // And TOF.
358 if (!m_loadError) {
359 event_time_of_flight = this->loadTof(file);
360 if (m_have_weight) {
361 event_weight = this->loadEventWeights(file);
362 }
363 }
364 } // Size is at least 1
365 else {
366 // Found a size that was 0 or less; stop processing
368 << "Loading bank " << entry_name << " is stopped due to either zero/negative loading size ("
369 << m_loadStart[0] << ") or negative load start index (" << m_loadStart[0] << ")\n";
370 m_loadError = true;
371 }
372
373 } // no error
374
375 } // try block
376 catch (std::exception &e) {
377 m_loader.alg->getLogger().error() << "Error while loading bank " << entry_name << ":\n";
378 m_loader.alg->getLogger().error() << e.what() << '\n';
379 m_loadError = true;
380 } catch (...) {
381 m_loader.alg->getLogger().error() << "Unspecified error while loading bank " << entry_name << '\n';
382 m_loadError = true;
383 }
384
385 // Close up the file even if errors occured.
386 file.closeGroup();
387 file.close();
388
389 // Abort if anything failed
390 if (m_loadError) {
391 return;
392 }
393
394 const auto bank_size = m_max_id - m_min_id;
395 const auto minSpectraToLoad = static_cast<uint32_t>(m_loader.alg->m_specMin);
396 const auto maxSpectraToLoad = static_cast<uint32_t>(m_loader.alg->m_specMax);
397 const auto emptyInt = static_cast<uint32_t>(EMPTY_INT());
398 // check that if a range of spectra were requested that these fit within
399 // this bank
400 if (minSpectraToLoad != emptyInt && m_min_id < minSpectraToLoad) {
401 if (minSpectraToLoad > m_max_id) { // the minimum spectra to load is more
402 // than the max of this bank
403 return;
404 }
405 // the min spectra to load is higher than the min for this bank
406 m_min_id = minSpectraToLoad;
407 }
408 if (maxSpectraToLoad != emptyInt && m_max_id > maxSpectraToLoad) {
409 if (maxSpectraToLoad < m_min_id) {
410 // the maximum spectra to load is less than the minimum of this bank
411 return;
412 }
413 // the max spectra to load is lower than the max for this bank
414 m_max_id = maxSpectraToLoad;
415 }
416 if (m_min_id > m_max_id) {
417 // the min is now larger than the max, this means the entire block of
418 // spectra to load is outside this bank
419 return;
420 }
421
422 // schedule the job to generate the event lists
423 auto mid_id = m_max_id;
424 if (m_loader.splitProcessing && m_max_id > (m_min_id + (bank_size / 4)))
425 // only split if told to and the section to load is at least 1/4 the size
426 // of the whole bank
427 mid_id = (m_max_id + m_min_id) / 2;
428
429 // No error? Launch a new task to process that data.
430 auto numEvents = static_cast<size_t>(m_loadSize[0]);
431 auto startAt = static_cast<size_t>(m_loadStart[0]);
432
433 // convert things to shared_arrays to share between tasks
434 std::shared_ptr<std::vector<uint32_t>> event_id_shrd(event_id.release());
435 std::shared_ptr<std::vector<float>> event_time_of_flight_shrd(event_time_of_flight.release());
436 std::shared_ptr<std::vector<float>> event_weight_shrd(event_weight.release());
437 auto event_index_shrd = std::make_shared<std::vector<uint64_t>>(std::move(event_index));
438
439 std::shared_ptr<Task> newTask1 = std::make_shared<ProcessBankData>(
440 m_loader, entry_name, prog, event_id_shrd, event_time_of_flight_shrd, numEvents, startAt, event_index_shrd,
441 thisBankPulseTimes, m_have_weight, event_weight_shrd, m_min_id, mid_id);
442 scheduler.push(newTask1);
443 if (m_loader.splitProcessing && (mid_id < m_max_id)) {
444 std::shared_ptr<Task> newTask2 = std::make_shared<ProcessBankData>(
445 m_loader, entry_name, prog, event_id_shrd, event_time_of_flight_shrd, numEvents, startAt, event_index_shrd,
446 thisBankPulseTimes, m_have_weight, event_weight_shrd, (mid_id + 1), m_max_id);
447 scheduler.push(newTask2);
448 }
449}
450
457int64_t LoadBankFromDiskTask::recalculateDataSize(const int64_t &size) {
458 if (size < 0) {
459 const int64_t shift = int64_t(1) << 32;
460 return shift + size;
461 }
462 return size;
463}
464
465} // namespace Mantid::DataHandling
bool getCancel() const
Returns the cancellation state.
Definition: Algorithm.cpp:1657
Kernel::Logger & getLogger() const
Returns a reference to the logger.
Definition: Algorithm.cpp:1660
Helper class for reporting progress from algorithms.
Definition: Progress.h:25
Helper class for LoadEventNexus that is specific to the current default loading code for NXevent_data...
int32_t eventid_max
Maximum (inclusive) event ID possible for this instrument.
std::vector< std::shared_ptr< BankPulseTimes > > m_bankPulseTimes
One entry of pulse times for each preprocessor.
detid_t pixelID_to_wi_offset
Offset in the pixelID_to_wi_vector to use.
bool splitProcessing
whether or not to launch multiple ProcessBankData jobs per bank
size_t eventsPerChunk
number of chunks per bank
int firstChunkForBank
for multiple chunks per bank
bool m_haveWeights
Flag for dealing with a simulated file.
void run() override
Main method that performs the work for the task.
int64_t recalculateDataSize(const int64_t &size)
Interpret the value describing the number of events.
uint32_t m_max_id
Maximum pixel ID in this data.
Kernel::ThreadScheduler & scheduler
ThreadScheduler running this task.
void loadPulseTimes(::NeXus::File &file)
Load the pulse times, if needed.
std::vector< int64_t > m_loadSize
How much to load in the file.
uint32_t m_min_id
Minimum pixel ID in this data.
void prepareEventId(::NeXus::File &file, int64_t &start_event, int64_t &stop_event, const std::vector< uint64_t > &event_index)
Open the event_id field and validate the contents.
API::Progress * prog
Progress reporting.
std::vector< int64_t > m_loadStart
Index to load start at in the file.
LoadBankFromDiskTask(DefaultEventLoader &loader, std::string entry_name, std::string entry_type, const std::size_t numEvents, const bool oldNeXusFileNames, API::Progress *prog, std::shared_ptr< std::mutex > ioMutex, Kernel::ThreadScheduler &scheduler, std::vector< int > framePeriodNumbers)
Constructor.
const std::vector< int > m_framePeriodNumbers
Frame period numbers.
std::shared_ptr< BankPulseTimes > thisBankPulseTimes
Object with the pulse times for this bank.
DefaultEventLoader & m_loader
Algorithm being run.
std::unique_ptr< std::vector< float > > loadTof(::NeXus::File &file)
Open and load the times-of-flight data.
std::vector< uint64_t > loadEventIndex(::NeXus::File &file)
Load the event_index field (a list of size of # of pulses giving the index in the event list for that...
bool m_loadError
Did we get an error in loading.
std::unique_ptr< std::vector< uint32_t > > loadEventId(::NeXus::File &file)
Load the event_id field, which has been opened.
std::unique_ptr< std::vector< float > > loadEventWeights(::NeXus::File &file)
Load weight of weigthed events if they exist.
int32_t m_specMax
Maximum spectrum to load.
std::string m_filename
The name and path of the input file.
std::shared_ptr< BankPulseTimes > m_allBanksPulseTimes
Pulse times for ALL banks, taken from proton_charge log.
int32_t m_specMin
Minimum spectrum to load.
std::string m_top_entry_name
name of top level NXentry to use
void debug(const std::string &msg)
Logs at debug level.
Definition: Logger.cpp:114
void error(const std::string &msg)
Logs at error level.
Definition: Logger.cpp:77
void warning(const std::string &msg)
Logs at warning level.
Definition: Logger.cpp:86
void report()
Increments the loop counter by 1, then sends the progress notification on behalf of its algorithm.
Definition: ProgressBase.h:51
double m_cost
Cached computational cost for the thread.
Definition: Task.h:82
void setMutex(const std::shared_ptr< std::mutex > &mutex)
Set the mutex object for this Task.
Definition: Task.h:78
The ThreadScheduler object defines how tasks are allocated to threads and in what order.
virtual void push(std::shared_ptr< Task > newTask)=0
Add a Task to the queue.
std::size_t numEvents(::NeXus::File &file, bool &hasTotalCounts, bool &oldNeXusFileNames, const std::string &prefix, const NexusHDF5Descriptor &descriptor)
Get the number of events in the currently opened group.
void timeConversionVector(std::vector< T > &vec, const std::string &input_unit, const std::string &output_unit)
Definition: Unit.h:708
constexpr int EMPTY_INT() noexcept
Returns what we consider an "empty" integer within a property.
Definition: EmptyValues.h:25
STL namespace.