Mantid
Loading...
Searching...
No Matches
DefaultEventLoader.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
13
14using namespace Mantid::Kernel;
15
16namespace Mantid::DataHandling {
17
19 bool event_id_is_spec, std::vector<std::string> bankNames,
20 const std::vector<int> &periodLog, const std::string &classType,
21 std::vector<std::size_t> bankNumEvents, const bool oldNeXusFileNames, const bool precount,
22 const int chunk, const int totalChunks) {
23 DefaultEventLoader loader(alg, ws, haveWeights, event_id_is_spec, bankNames.size(), precount, chunk, totalChunks);
24
25 auto bankRange = loader.setupChunking(bankNames, bankNumEvents);
26
27 // Make the thread pool
28 auto scheduler = new ThreadSchedulerMutexes;
29 ThreadPool pool(scheduler);
30 auto diskIOMutex = std::make_shared<std::mutex>();
31
32 // set up progress bar for the rest of the (multi-threaded) process
33 size_t numProg = bankNames.size() * (1 + 3); // 1 = disktask, 3 = proc task
34 if (loader.splitProcessing)
35 numProg += bankNames.size() * 3; // 3 = second proc task
36 auto prog = std::make_unique<API::Progress>(loader.alg, 0.3, 1.0, numProg);
37
38 for (size_t i = bankRange.first; i < bankRange.second; i++) {
39 if (bankNumEvents[i] > 0)
40 pool.schedule(std::make_shared<LoadBankFromDiskTask>(loader, bankNames[i], classType, bankNumEvents[i],
41 oldNeXusFileNames, prog.get(), diskIOMutex, *scheduler,
42 periodLog));
43 }
44 // Start and end all threads
45 pool.joinAll();
46 diskIOMutex.reset();
47}
48
50 bool event_id_is_spec, const size_t numBanks, const bool precount,
51 const int chunk, const int totalChunks)
52 : m_haveWeights(haveWeights), event_id_is_spec(event_id_is_spec), precount(precount), chunk(chunk),
53 totalChunks(totalChunks), firstChunkForBank(1), eventsPerChunk(0), alg(alg), m_ws(ws) {
54 // This map will be used to find the workspace index
57 else
59
60 // Cache a map for speed.
61 if (!haveWeights) {
63 } else {
64 // Convert to weighted events
65 for (size_t i = 0; i < m_ws.getNumberHistograms(); i++) {
67 }
69 }
70
71 // split banks up if the number of cores is more than twice the number of
72 // banks
73 splitProcessing = bool(numBanks * 2 < ThreadPool::getNumPhysicalCores());
74}
75
76std::pair<size_t, size_t> DefaultEventLoader::setupChunking(std::vector<std::string> &bankNames,
77 std::vector<std::size_t> &bankNumEvents) {
78 size_t bank0 = 0;
79 size_t bankn = bankNames.size();
80 if (chunk != EMPTY_INT()) // We are loading part - work out the bank number range
81 {
82 const size_t total_events = std::accumulate(bankNumEvents.cbegin(), bankNumEvents.cend(), static_cast<size_t>(0));
83 eventsPerChunk = total_events / totalChunks;
84 // Sort banks by size
85 size_t tmp;
86 std::string stmp;
87 for (size_t i = 0; i < bankn; i++)
88 for (size_t j = 0; j < bankn - 1; j++)
89 if (bankNumEvents[j] < bankNumEvents[j + 1]) {
90 tmp = bankNumEvents[j];
91 bankNumEvents[j] = bankNumEvents[j + 1];
92 bankNumEvents[j + 1] = tmp;
93 stmp = bankNames[j];
94 bankNames[j] = bankNames[j + 1];
95 bankNames[j + 1] = stmp;
96 }
97 int bigBanks = 0;
98 for (size_t i = 0; i < bankn; i++)
99 if (bankNumEvents[i] > eventsPerChunk)
100 bigBanks++;
101 // Each chunk is part of bank or multiple whole banks
102 // 0.5 for last chunk of a bank with multiple chunks
103 // 0.1 for multiple whole banks not completely filled
105 static_cast<size_t>((static_cast<double>(bigBanks) / static_cast<double>(totalChunks) * 0.5 + 0.05) *
106 static_cast<double>(eventsPerChunk));
107 double partialChunk = 0.;
109 for (int chunki = 1; chunki <= chunk; chunki++) {
110 if (partialChunk > 1.) {
111 partialChunk = 0.;
112 firstChunkForBank = chunki;
113 bank0 = bankn;
114 }
115 if (bankNumEvents[bank0] > 1) {
116 partialChunk += static_cast<double>(eventsPerChunk) / static_cast<double>(bankNumEvents[bank0]);
117 }
118 if (chunki < totalChunks)
119 bankn = bank0 + 1;
120 else
121 bankn = bankNames.size();
122 if (chunki == firstChunkForBank && partialChunk > 1.0)
123 bankn += static_cast<size_t>(partialChunk) - 1;
124 if (bankn > bankNames.size())
125 bankn = bankNames.size();
126 }
127 for (size_t i = bank0; i < bankn; i++) {
128 size_t start_event = (chunk - firstChunkForBank) * eventsPerChunk;
129 size_t stop_event = bankNumEvents[i];
130 // Don't change stop_event for the final chunk
131 if (start_event + eventsPerChunk < stop_event)
132 stop_event = start_event + eventsPerChunk;
133 bankNumEvents[i] = stop_event - start_event;
134 }
135 }
136 return {bank0, bankn};
137}
138
139} // namespace Mantid::DataHandling
gsl_vector * tmp
Helper class for LoadEventNexus that is specific to the current default loading code for NXevent_data...
std::vector< std::vector< std::vector< Mantid::Types::Event::TofEvent > * > > eventVectors
Vector where index = event_id; value = ptr to std::vector<TofEvent> in the event list.
static void load(LoadEventNexus *alg, EventWorkspaceCollection &ws, bool haveWeights, bool event_id_is_spec, std::vector< std::string > bankNames, const std::vector< int > &periodLog, const std::string &classType, std::vector< std::size_t > bankNumEvents, const bool oldNeXusFileNames, const bool precount, const int chunk, const int totalChunks)
bool precount
Do we pre-count the # of events in each pixel ID?
std::vector< size_t > pixelID_to_wi_vector
Vector where (index = pixel ID+pixelID_to_wi_offset), value = workspace index)
std::vector< std::vector< std::vector< Mantid::DataObjects::WeightedEvent > * > > weightedEventVectors
Vector where index = event_id; value = ptr to std::vector<WeightedEvent> in the event list.
detid_t pixelID_to_wi_offset
Offset in the pixelID_to_wi_vector to use.
bool splitProcessing
whether or not to launch multiple ProcessBankData jobs per bank
DefaultEventLoader(LoadEventNexus *alg, EventWorkspaceCollection &ws, bool haveWeights, bool event_id_is_spec, const size_t numBanks, const bool precount, const int chunk, const int totalChunks)
size_t eventsPerChunk
number of chunks per bank
bool event_id_is_spec
True if the event_id is spectrum no not pixel ID.
std::pair< size_t, size_t > setupChunking(std::vector< std::string > &bankNames, std::vector< std::size_t > &bankNumEvents)
int firstChunkForBank
for multiple chunks per bank
void makeMapToEventLists(std::vector< std::vector< T > > &vectors)
Map detector IDs to event lists.
EventWorkspaceCollection : Collection of EventWorspaces to give backward-forward compatibility around...
std::vector< size_t > getDetectorIDToWorkspaceIndexVector(Mantid::specnum_t &offset, bool dothrow) const
const DataObjects::EventList & getSpectrum(const size_t workspace_index, const size_t periodNumber) const
std::vector< size_t > getSpectrumToWorkspaceIndexVector(Mantid::specnum_t &offset) const
void switchTo(Mantid::API::EventType newType) override
Switch the EventList to use the given EventType (TOF, WEIGHTED, or WEIGHTED_NOTIME)
Definition: EventList.cpp:649
A Thread Pool implementation that keeps a certain number of threads running (normally,...
Definition: ThreadPool.h:36
void schedule(const std::shared_ptr< Task > &task, bool start=false)
Schedule a task for later execution.
Definition: ThreadPool.cpp:123
void joinAll()
Wait for all threads that have started to finish.
Definition: ThreadPool.cpp:150
static size_t getNumPhysicalCores()
Return the number of physical cores available on the system.
Definition: ThreadPool.cpp:67
ThreadSchedulerMutexes : Version of a ThreadSchedulerLargestCost that also makes sure to not try to s...
constexpr int EMPTY_INT() noexcept
Returns what we consider an "empty" integer within a property.
Definition: EmptyValues.h:25