Mantid
Loading...
Searching...
No Matches
DefaultEventLoader.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
13
14using namespace Mantid::Kernel;
15
16namespace Mantid::DataHandling {
17
19 bool event_id_is_spec, std::vector<std::string> bankNames,
20 const std::vector<int> &periodLog, const std::string &classType,
21 std::vector<std::size_t> bankNumEvents, const bool oldNeXusFileNames, const bool precount,
22 const int chunk, const int totalChunks) {
23 DefaultEventLoader loader(alg, ws, haveWeights, event_id_is_spec, bankNames.size(), precount, chunk, totalChunks);
24
25 auto bankRange = loader.setupChunking(bankNames, bankNumEvents);
26
27 // Make the thread pool
28 auto scheduler = new ThreadSchedulerMutexes;
29 ThreadPool pool(scheduler);
30 auto diskIOMutex = std::make_shared<std::mutex>();
31
32 // set up progress bar for the rest of the (multi-threaded) process
33 size_t numProg = bankNames.size() * (1 + 3); // 1 = disktask, 3 = proc task
34 if (loader.splitProcessing)
35 numProg += bankNames.size() * 3; // 3 = second proc task
36 auto prog = std::make_unique<API::Progress>(loader.alg, 0.3, 1.0, numProg);
37
38 for (size_t i = bankRange.first; i < bankRange.second; i++) {
39 if (bankNumEvents[i] > 0)
40 pool.schedule(std::make_shared<LoadBankFromDiskTask>(loader, bankNames[i], classType, bankNumEvents[i],
41 oldNeXusFileNames, prog.get(), diskIOMutex, *scheduler,
42 periodLog));
43 }
44 // Start and end all threads
45 pool.joinAll();
46 diskIOMutex.reset();
47}
48
50 bool event_id_is_spec, const size_t numBanks, const bool precount,
51 const int chunk, const int totalChunks)
52 : m_haveWeights(haveWeights), event_id_is_spec(event_id_is_spec), precount(precount), chunk(chunk),
53 totalChunks(totalChunks), firstChunkForBank(1), eventsPerChunk(0), alg(alg), m_ws(ws) {
54 // This map will be used to find the workspace index
57 else
59
60 // Cache a map for speed.
61 if (!haveWeights) {
63 // Convert to weighted events
64 for (size_t i = 0; i < m_ws.getNumberHistograms(); i++) {
65 for (size_t period = 0; period < m_ws.nPeriods(); ++period) {
67 }
68 }
70 } else {
72 }
73 } else {
74 // Convert to weighted events
75 for (size_t i = 0; i < m_ws.getNumberHistograms(); i++) {
76 for (size_t period = 0; period < m_ws.nPeriods(); ++period) {
78 }
79 }
81 }
82
83 // split banks up if the number of cores is more than twice the number of
84 // banks
85 splitProcessing = bool(numBanks * 2 < ThreadPool::getNumPhysicalCores());
86}
87
88std::pair<size_t, size_t> DefaultEventLoader::setupChunking(std::vector<std::string> &bankNames,
89 std::vector<std::size_t> &bankNumEvents) {
90 size_t bank0 = 0;
91 size_t bankn = bankNames.size();
92 if (chunk != EMPTY_INT()) // We are loading part - work out the bank number range
93 {
94 const size_t total_events = std::accumulate(bankNumEvents.cbegin(), bankNumEvents.cend(), static_cast<size_t>(0));
95 eventsPerChunk = total_events / totalChunks;
96 // Sort banks by size
97 size_t tmp;
98 std::string stmp;
99 for (size_t i = 0; i < bankn; i++)
100 for (size_t j = 0; j < bankn - 1; j++)
101 if (bankNumEvents[j] < bankNumEvents[j + 1]) {
102 tmp = bankNumEvents[j];
103 bankNumEvents[j] = bankNumEvents[j + 1];
104 bankNumEvents[j + 1] = tmp;
105 stmp = bankNames[j];
106 bankNames[j] = bankNames[j + 1];
107 bankNames[j + 1] = stmp;
108 }
109 const auto bigBanks = std::count_if(bankNumEvents.cbegin(), bankNumEvents.cend(),
110 [this](const size_t numEvents) { return numEvents > eventsPerChunk; });
111 // Each chunk is part of bank or multiple whole banks
112 // 0.5 for last chunk of a bank with multiple chunks
113 // 0.1 for multiple whole banks not completely filled
115 static_cast<size_t>((static_cast<double>(bigBanks) / static_cast<double>(totalChunks) * 0.5 + 0.05) *
116 static_cast<double>(eventsPerChunk));
117 double partialChunk = 0.;
119 for (int chunki = 1; chunki <= chunk; chunki++) {
120 if (partialChunk > 1.) {
121 partialChunk = 0.;
122 firstChunkForBank = chunki;
123 bank0 = bankn;
124 }
125 if (bankNumEvents[bank0] > 1) {
126 partialChunk += static_cast<double>(eventsPerChunk) / static_cast<double>(bankNumEvents[bank0]);
127 }
128 if (chunki < totalChunks)
129 bankn = bank0 + 1;
130 else
131 bankn = bankNames.size();
132 if (chunki == firstChunkForBank && partialChunk > 1.0)
133 bankn += static_cast<size_t>(partialChunk) - 1;
134 if (bankn > bankNames.size())
135 bankn = bankNames.size();
136 }
137 for (size_t i = bank0; i < bankn; i++) {
138 size_t start_event = (chunk - firstChunkForBank) * eventsPerChunk;
139 size_t stop_event = bankNumEvents[i];
140 // Don't change stop_event for the final chunk
141 if (start_event + eventsPerChunk < stop_event)
142 stop_event = start_event + eventsPerChunk;
143 bankNumEvents[i] = stop_event - start_event;
144 }
145 }
146 return {bank0, bankn};
147}
148
149} // namespace Mantid::DataHandling
gsl_vector * tmp
Helper class for LoadEventNexus that is specific to the current default loading code for NXevent_data...
std::vector< std::vector< std::vector< Mantid::DataObjects::WeightedEventNoTime > * > > weightedNoTimeEventVectors
Vector where index = event_id; value = ptr to std::vector<WeightedEventNoTime> in the event list.
std::vector< std::vector< std::vector< Mantid::Types::Event::TofEvent > * > > eventVectors
Vector where index = event_id; value = ptr to std::vector<TofEvent> in the event list.
static void load(LoadEventNexus *alg, EventWorkspaceCollection &ws, bool haveWeights, bool event_id_is_spec, std::vector< std::string > bankNames, const std::vector< int > &periodLog, const std::string &classType, std::vector< std::size_t > bankNumEvents, const bool oldNeXusFileNames, const bool precount, const int chunk, const int totalChunks)
bool precount
Do we pre-count the # of events in each pixel ID?
std::vector< size_t > pixelID_to_wi_vector
Vector where (index = pixel ID+pixelID_to_wi_offset), value = workspace index)
std::vector< std::vector< std::vector< Mantid::DataObjects::WeightedEvent > * > > weightedEventVectors
Vector where index = event_id; value = ptr to std::vector<WeightedEvent> in the event list.
detid_t pixelID_to_wi_offset
Offset in the pixelID_to_wi_vector to use.
bool splitProcessing
whether or not to launch multiple ProcessBankData jobs per bank
DefaultEventLoader(LoadEventNexus *alg, EventWorkspaceCollection &ws, bool haveWeights, bool event_id_is_spec, const size_t numBanks, const bool precount, const int chunk, const int totalChunks)
size_t eventsPerChunk
number of chunks per bank
bool event_id_is_spec
True if the event_id is spectrum no not pixel ID.
std::pair< size_t, size_t > setupChunking(std::vector< std::string > &bankNames, std::vector< std::size_t > &bankNumEvents)
int firstChunkForBank
for multiple chunks per bank
void makeMapToEventLists(std::vector< std::vector< T > > &vectors)
Map detector IDs to event lists.
EventWorkspaceCollection : Collection of EventWorspaces to give backward-forward compatibility around...
std::vector< size_t > getDetectorIDToWorkspaceIndexVector(Mantid::specnum_t &offset, bool dothrow) const
const DataObjects::EventList & getSpectrum(const size_t workspace_index, const size_t periodNumber) const
std::vector< size_t > getSpectrumToWorkspaceIndexVector(Mantid::specnum_t &offset) const
double compressTolerance
Tolerance for CompressEvents; use -1 to mean don't compress.
void switchTo(Mantid::API::EventType newType) override
Switch the EventList to use the given EventType (TOF, WEIGHTED, or WEIGHTED_NOTIME)
A Thread Pool implementation that keeps a certain number of threads running (normally,...
Definition ThreadPool.h:36
void schedule(const std::shared_ptr< Task > &task, bool start=false)
Schedule a task for later execution.
void joinAll()
Wait for all threads that have started to finish.
static size_t getNumPhysicalCores()
Return the number of physical cores available on the system.
ThreadSchedulerMutexes : Version of a ThreadSchedulerLargestCost that also makes sure to not try to s...
std::size_t numEvents(Nexus::File &file, bool &hasTotalCounts, bool &oldNeXusFileNames, const std::string &prefix, const Nexus::NexusDescriptor &descriptor)
Get the number of events in the currently opened group.
constexpr int EMPTY_INT() noexcept
Returns what we consider an "empty" integer within a property.
Definition EmptyValues.h:24