Mantid
Loading...
Searching...
No Matches
ThreadScheduler.h
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2011 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7#pragma once
8
9#include "MantidKernel/DllConfig.h"
11#include "MantidKernel/Task.h"
12#include <deque>
13#include <map>
14#include <memory>
15#include <vector>
16
17namespace Mantid {
18
19namespace Kernel {
20
29//===========================================================================
30//===========================================================================
31//===========================================================================
32class MANTID_KERNEL_DLL ThreadScheduler {
33public:
36 ThreadScheduler() : m_cost(0), m_costExecuted(0), m_abortException(""), m_aborted(false) {}
37
39 virtual ~ThreadScheduler() = default;
40
41 //-----------------------------------------------------------------------------------
45 virtual void push(std::shared_ptr<Task> newTask) = 0;
46
47 //-----------------------------------------------------------------------------------
52 virtual std::shared_ptr<Task> pop(size_t threadnum) = 0;
53
54 //-----------------------------------------------------------------------------------
60 virtual void finished(Task *task, size_t threadnum) {
61 UNUSED_ARG(task);
62 UNUSED_ARG(threadnum);
63 }
64
65 //-----------------------------------------------------------------------------------
71 virtual void abort(std::runtime_error exception) {
72 // Save the exception for re-throwing
73 m_abortException = exception;
74 m_aborted = true;
75 // Clear (and delete) the queue
76 clear();
77 }
78
79 //-----------------------------------------------------------------------------------
81 virtual size_t size() = 0;
82
84 virtual bool empty() = 0;
85
87 virtual void clear() = 0;
88
89 //-------------------------------------------------------------------------------
91 double totalCost() { return m_cost; }
92
93 //-------------------------------------------------------------------------------
95 double totalCostExecuted() { return m_costExecuted; }
96
97 //-------------------------------------------------------------------------------
99 std::runtime_error getAbortException() { return m_abortException; }
100 //-------------------------------------------------------------------------------
102 bool getAborted() { return m_aborted; }
103
104protected:
106 double m_cost;
110 std::mutex m_queueLock;
112 std::runtime_error m_abortException;
115};
116
117//===========================================================================
118//===========================================================================
119//===========================================================================
126class MANTID_KERNEL_DLL ThreadSchedulerFIFO : public ThreadScheduler {
127public:
129
131 ~ThreadSchedulerFIFO() override { clear(); }
132
133 //-------------------------------------------------------------------------------
135 bool empty() override {
136 std::lock_guard<std::mutex> _lock(m_queueLock);
137 return m_queue.empty();
138 }
139
140 //-------------------------------------------------------------------------------
141 void push(std::shared_ptr<Task> newTask) override {
142 // Cache the total cost
143 m_queueLock.lock();
144 m_cost += newTask->cost();
145 m_queue.emplace_back(newTask);
146 m_queueLock.unlock();
147 }
148
149 //-------------------------------------------------------------------------------
150 std::shared_ptr<Task> pop(size_t threadnum) override {
151 UNUSED_ARG(threadnum);
152 std::shared_ptr<Task> temp = nullptr;
153 m_queueLock.lock();
154 // Check the size within the same locking block; otherwise the size may
155 // change before you get the next item.
156 if (!m_queue.empty()) {
157 // TODO: Would a try/catch block be smart here?
158 temp = m_queue.front();
159 m_queue.pop_front();
160 }
161 m_queueLock.unlock();
162 return temp;
163 }
164
165 //-------------------------------------------------------------------------------
166 size_t size() override {
167 m_queueLock.lock();
168 size_t temp = m_queue.size();
169 m_queueLock.unlock();
170 return temp;
171 }
172
173 //-------------------------------------------------------------------------------
174 void clear() override {
175 m_queueLock.lock();
176 // Empty out the queue
177 m_queue.clear();
178 m_cost = 0;
179 m_costExecuted = 0;
180 m_queueLock.unlock();
181 }
182
183protected:
185 std::deque<std::shared_ptr<Task>> m_queue;
186};
187
188//===========================================================================
189//===========================================================================
190//===========================================================================
197class MANTID_KERNEL_DLL ThreadSchedulerLIFO : public ThreadSchedulerFIFO {
198
199 //-------------------------------------------------------------------------------
200 std::shared_ptr<Task> pop(size_t threadnum) override {
201 UNUSED_ARG(threadnum);
202 std::shared_ptr<Task> temp = nullptr;
203 m_queueLock.lock();
204 // Check the size within the same locking block; otherwise the size may
205 // change before you get the next item.
206 if (!m_queue.empty()) {
207 // TODO: Would a try/catch block be smart here?
208 temp = m_queue.back();
209 m_queue.pop_back();
210 }
211 m_queueLock.unlock();
212 return temp;
213 }
214};
215
216//===========================================================================
217//===========================================================================
218//===========================================================================
228class MANTID_KERNEL_DLL ThreadSchedulerLargestCost : public ThreadScheduler {
229public:
231
233 ~ThreadSchedulerLargestCost() override { clear(); }
234
235 //-------------------------------------------------------------------------------
237 bool empty() override {
238 std::lock_guard<std::mutex> _lock(m_queueLock);
239 return m_map.empty();
240 }
241
242 //-------------------------------------------------------------------------------
243 void push(std::shared_ptr<Task> newTask) override {
244 // Cache the total cost
245 m_queueLock.lock();
246 m_cost += newTask->cost();
247 m_map.emplace(newTask->cost(), newTask);
248 m_queueLock.unlock();
249 }
250
251 //-------------------------------------------------------------------------------
252 std::shared_ptr<Task> pop(size_t threadnum) override {
253 UNUSED_ARG(threadnum);
254 std::shared_ptr<Task> temp = nullptr;
255 m_queueLock.lock();
256 // Check the size within the same locking block; otherwise the size may
257 // change before you get the next item.
258 if (!m_map.empty()) {
259 // Since the map is sorted by cost, we want the LAST item.
260 auto it = m_map.end();
261 it--;
262 temp = it->second;
263 m_map.erase(it);
264 }
265 m_queueLock.unlock();
266 return temp;
267 }
268
269 //-------------------------------------------------------------------------------
270 size_t size() override {
271 m_queueLock.lock();
272 size_t temp = m_map.size();
273 m_queueLock.unlock();
274 return temp;
275 }
276
277 //-------------------------------------------------------------------------------
278 void clear() override {
279 m_queueLock.lock();
280 // Empty out the queue and delete the pointers!
281 m_map.clear();
282 m_cost = 0;
283 m_costExecuted = 0;
284 m_queueLock.unlock();
285 }
286
287protected:
289 std::multimap<double, std::shared_ptr<Task>> m_map;
290};
291
292} // namespace Kernel
293} // namespace Mantid
#define UNUSED_ARG(x)
Function arguments are sometimes unused in certain implmentations but are required for documentation ...
Definition: System.h:64
A Task is a unit of work to be scheduled and run by a ThreadPool.
Definition: Task.h:29
A First-In-First-Out Thread Scheduler.
size_t size() override
Returns the size of the queue.
~ThreadSchedulerFIFO() override
Destructor.
void push(std::shared_ptr< Task > newTask) override
Add a Task to the queue.
std::shared_ptr< Task > pop(size_t threadnum) override
Retrieves the next Task to execute.
void clear() override
Empty out the queue.
std::deque< std::shared_ptr< Task > > m_queue
Queue of tasks.
A Last-In-First-Out Thread Scheduler.
std::shared_ptr< Task > pop(size_t threadnum) override
Retrieves the next Task to execute.
A Largest Cost Thread Scheduler.
void clear() override
Empty out the queue.
std::shared_ptr< Task > pop(size_t threadnum) override
Retrieves the next Task to execute.
size_t size() override
Returns the size of the queue.
std::multimap< double, std::shared_ptr< Task > > m_map
A multimap keeps tasks sorted by the key (cost)
void push(std::shared_ptr< Task > newTask) override
Add a Task to the queue.
The ThreadScheduler object defines how tasks are allocated to threads and in what order.
virtual void abort(std::runtime_error exception)
Signal to the scheduler that a task is complete.
bool getAborted()
Returns true if the execution was aborted.
bool m_aborted
The run was aborted due to an exception.
std::runtime_error m_abortException
The exception that aborted the run.
virtual ~ThreadScheduler()=default
Destructor.
double m_costExecuted
Accumulated cost of tasks that have been executed (popped)
double totalCostExecuted()
Returns the total cost of all Task's in the queue.
std::mutex m_queueLock
Mutex to prevent simultaneous access to the queue.
virtual size_t size()=0
Returns the size of the queue.
virtual void finished(Task *task, size_t threadnum)
Signal to the scheduler that a task is complete.
virtual std::shared_ptr< Task > pop(size_t threadnum)=0
Retrieves the next Task to execute.
double totalCost()
Returns the total cost of all Task's in the queue.
std::runtime_error getAbortException()
Returns the exception that was caught, if any.
virtual void push(std::shared_ptr< Task > newTask)=0
Add a Task to the queue.
double m_cost
Total cost of all tasks.
virtual void clear()=0
Empty out the queue.
virtual bool empty()=0
Returns true if the queue is empty.
Helper class which provides the Collimation Length for SANS instruments.