Mantid
Loading...
Searching...
No Matches
ThreadPool.cpp
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7//----------------------------------------------------------------------
8// Includes
9//----------------------------------------------------------------------
11
15#include "MantidKernel/Task.h"
17
18#include <Poco/Thread.h>
19
20#include <algorithm>
21#include <sstream>
22#include <stdexcept>
23// needed on windows and any place missing openmp
24#if defined(_WIN32) || !defined(_OPENMP)
25#include <Poco/Environment.h>
26#endif
27
28namespace Mantid::Kernel {
29
30//--------------------------------------------------------------------------------
43ThreadPool::ThreadPool(ThreadScheduler *scheduler, size_t numThreads, ProgressBase *prog)
44 : m_scheduler(std::unique_ptr<ThreadScheduler>(scheduler)), m_started(false),
45 m_prog(std::unique_ptr<ProgressBase>(prog)) {
46 if (!m_scheduler)
47 throw std::invalid_argument("NULL ThreadScheduler passed to ThreadPool constructor.");
48
49 if (numThreads == 0) {
50 // Uses Poco to find how many cores there are.
52 } else
53 m_numThreads = numThreads;
54 // std::cout << m_numThreads << " m_numThreads \n";
55}
56
57//--------------------------------------------------------------------------------
60ThreadPool::~ThreadPool() = default;
61
62//--------------------------------------------------------------------------------
68// windows hangs with openmp for some reason
69#if defined(_WIN32) || !defined(_OPENMP)
70 int physicalCores = Poco::Environment::processorCount();
71#else
72 int physicalCores = PARALLEL_GET_MAX_THREADS;
73#endif
74
75 auto maxCores = Kernel::ConfigService::Instance().getValue<int>("MultiThreaded.MaxCores");
76
77 if (!maxCores.is_initialized())
78 return std::min(maxCores.get_value_or(0), physicalCores);
79 else
80 return physicalCores;
81}
82
83//--------------------------------------------------------------------------------
94void ThreadPool::start(double waitSec) {
95 if (m_started)
96 throw std::runtime_error("Threads have already started.");
97 // Now, launch that many threads and let them wait for new tasks.
98 m_threads.clear();
99 m_runnables.clear();
100 for (size_t i = 0; i < m_numThreads; i++) {
101 // Make a descriptive name
102 std::ostringstream name;
103 name << "Thread" << i;
104 // Create the thread
105 auto thread = std::make_unique<Poco::Thread>(name.str());
106 // Make the runnable object and run it
107 auto runnable = std::make_unique<ThreadPoolRunnable>(i, m_scheduler.get(), m_prog.get(), waitSec);
108 thread->start(*runnable);
109 m_threads.emplace_back(std::move(thread));
110 m_runnables.emplace_back(std::move(runnable));
111 }
112 // Yep, all the threads are running.
113 m_started = true;
114}
115
116//--------------------------------------------------------------------------------
123void ThreadPool::schedule(const std::shared_ptr<Task> &task, bool start) {
124 if (task) {
125 m_scheduler->push(task);
126 // Start all the threads if they were not already.
127 if (start && !m_started)
128 this->start();
129 }
130}
131
132//--------------------------------------------------------------------------------
140bool compareTasks(Task *lhs, Task *rhs) { return (lhs->cost() > rhs->cost()); }
141
142//--------------------------------------------------------------------------------
151 // Are the threads REALLY started, or did they exit due to lack of tasks?
152 if (m_started) {
153 m_started = false;
154 // If any of the threads are running, then YES, it is really started.
155 for (auto &thread : m_threads)
156 m_started = m_started || thread->isRunning();
157 }
158
159 // Start all the threads if they were not already.
160 if (!m_started)
161 this->start();
162
163 // Clear any wait times so that the threads stop waiting for new tasks.
164 for (auto &runnable : m_runnables)
165 runnable->clearWait();
166
167 // Sequentially join all the threads.
168 for (auto &thread : m_threads) {
169 thread->join();
170 }
171
172 // Clear the vectors (the threads are deleted now).
173 m_threads.clear();
174
175 // Get rid of the runnables too
176 m_runnables.clear();
177
178 // This will make threads restart
179 m_started = false;
180
181 // Did one of the threads abort or throw an exception?
182 if (m_scheduler->getAborted()) {
183 // Re-raise the error
184 throw m_scheduler->getAbortException();
185 }
186}
187
188} // namespace Mantid::Kernel
const std::vector< double > & rhs
#define PARALLEL_GET_MAX_THREADS
static T & Instance()
Return a reference to the Singleton instance, creating it if it does not already exist Creation is do...
A Task is a unit of work to be scheduled and run by a ThreadPool.
Definition: Task.h:29
virtual double cost()
What is the computational cost of this task?
Definition: Task.h:53
void schedule(const std::shared_ptr< Task > &task, bool start=false)
Schedule a task for later execution.
Definition: ThreadPool.cpp:123
std::vector< std::unique_ptr< Poco::Thread > > m_threads
Vector with all the threads that are started.
Definition: ThreadPool.h:59
ThreadPool(ThreadScheduler *scheduler=new ThreadSchedulerFIFO(), size_t numThreads=0, ProgressBase *prog=nullptr)
Constructor.
Definition: ThreadPool.cpp:43
size_t m_numThreads
Number of cores used.
Definition: ThreadPool.h:53
bool m_started
Have the threads started?
Definition: ThreadPool.h:65
void start(double waitSec=0.0)
Start the threads and begin looking for tasks.
Definition: ThreadPool.cpp:94
std::vector< std::unique_ptr< ThreadPoolRunnable > > m_runnables
Vector of the POCO-required classes to actually run in a thread.
Definition: ThreadPool.h:62
std::unique_ptr< ProgressBase > m_prog
Progress reporter.
Definition: ThreadPool.h:68
void joinAll()
Wait for all threads that have started to finish.
Definition: ThreadPool.cpp:150
static size_t getNumPhysicalCores()
Return the number of physical cores available on the system.
Definition: ThreadPool.cpp:67
std::unique_ptr< ThreadScheduler > m_scheduler
The ThreadScheduler instance taking care of task scheduling.
Definition: ThreadPool.h:56
The ThreadScheduler object defines how tasks are allocated to threads and in what order.
bool compareTasks(Task *lhs, Task *rhs)
Method to perform sorting of task lists.
Definition: ThreadPool.cpp:140
STL namespace.