Mantid
Loading...
Searching...
No Matches
ThreadSchedulerMutexes.h
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7#pragma once
8
10
11#include <algorithm>
12#include <memory>
13#include <mutex>
14#include <numeric>
15#include <set>
16
17namespace Mantid {
18namespace Kernel {
19
37public:
39
40 ~ThreadSchedulerMutexes() override { clear(); }
41
42 //-------------------------------------------------------------------------------
43 void push(std::shared_ptr<Task> newTask) override {
44 // Cache the total cost
45 std::lock_guard<std::mutex> lock(m_queueLock);
46 m_cost += newTask->cost();
47
48 std::shared_ptr<std::mutex> mut = newTask->getMutex();
49 m_supermap[mut].emplace(newTask->cost(), newTask);
50 }
51
52 //-------------------------------------------------------------------------------
53 std::shared_ptr<Task> pop(size_t threadnum) override {
54 UNUSED_ARG(threadnum);
55
56 std::shared_ptr<Task> temp = nullptr;
57
58 std::lock_guard<std::mutex> lock(m_queueLock);
59 // Check the size within the same locking block; otherwise the size may
60 // change before you get the next item.
61 if (!m_supermap.empty()) {
62 // We iterate in reverse as to take the NULL mutex last, even if no mutex
63 // is busy
64 for (auto &mutexedMap : m_supermap) {
65 // The key is the mutex associated with the inner map
66 std::shared_ptr<std::mutex> mapMutex = mutexedMap.first;
67 if ((!mapMutex) || (m_mutexes.empty()) || (m_mutexes.find(mapMutex) == m_mutexes.end())) {
68 // The mutex of this map is free!
69 InnerMap &map = mutexedMap.second;
70
71 if (!map.empty()) {
72 // Look for the largest cost item in it.
73 auto it2 = map.end();
74 --it2;
75 // Great, we found something.
76 temp = std::move(it2->second);
77 // Take it out of the map (popped)
78 map.erase(it2);
79 break;
80 }
81 }
82 }
83 if (temp == nullptr) {
84 // Nothing was found, meaning all mutexes are in use
85 // Try the first non-empty map
86 for (auto &mutexedMap : m_supermap) {
87 if (!mutexedMap.second.empty()) {
88 InnerMap &map = mutexedMap.second;
89 // Use the first one
90 temp = std::move(map.begin()->second);
91 // And erase that item (pop it)
92 map.erase(map.begin());
93 break;
94 }
95 }
96 }
97 // If temp is still NULL, then no tasks are left.
98 }
99
100 // --- Add the mutex (if any) to the list of "busy" ones ---
101 if (temp) {
102 std::shared_ptr<std::mutex> mut = temp->getMutex();
103 if (mut)
104 m_mutexes.insert(mut);
105 }
106
107 return temp;
108 }
109
110 //-----------------------------------------------------------------------------------
116 void finished(Task *task, size_t threadnum) override {
117 UNUSED_ARG(threadnum);
118 std::shared_ptr<std::mutex> mut = task->getMutex();
119 if (mut) {
120 std::lock_guard<std::mutex> lock(m_queueLock);
121 // We take this mutex off the list of used ones.
122 m_mutexes.erase(mut);
123 }
124 }
125
126 //-------------------------------------------------------------------------------
127 size_t size() override {
128 std::lock_guard<std::mutex> lock(m_queueLock);
129 // Add up the sizes of all contained maps.
130 return std::accumulate(
131 m_supermap.cbegin(), m_supermap.cend(), size_t{0},
132 [](size_t total, const std::pair<const std::shared_ptr<std::mutex> &, const InnerMap &> &mutexedMap) {
133 return total + mutexedMap.second.size();
134 });
135 }
136
137 //-------------------------------------------------------------------------------
139 bool empty() override {
140 std::lock_guard<std::mutex> lock(m_queueLock);
141 auto mapWithTasks =
142 std::find_if_not(m_supermap.cbegin(), m_supermap.cend(),
143 [](const std::pair<const std::shared_ptr<std::mutex>, const InnerMap &> &mutexedMap) {
144 return mutexedMap.second.empty();
145 });
146 return mapWithTasks == m_supermap.cend();
147 }
148
149 //-------------------------------------------------------------------------------
150 void clear() override {
151 std::lock_guard<std::mutex> lock(m_queueLock);
152
153 // Empty out the queue and delete the pointers!
154 for (auto &it : m_supermap) {
155 InnerMap &map = it.second;
156 map.clear();
157 }
158 m_supermap.clear();
159 m_cost = 0;
160 m_costExecuted = 0;
161 }
162
163protected:
165 using InnerMap = std::multimap<double, std::shared_ptr<Task>>;
167 using SuperMap = std::map<std::shared_ptr<std::mutex>, InnerMap>;
168
172
174 std::set<std::shared_ptr<std::mutex>> m_mutexes;
175};
176
177} // namespace Kernel
178} // namespace Mantid
#define DLLExport
Definitions of the DLLImport compiler directives for MSVC.
Definition: System.h:53
#define UNUSED_ARG(x)
Function arguments are sometimes unused in certain implmentations but are required for documentation ...
Definition: System.h:64
A Task is a unit of work to be scheduled and run by a ThreadPool.
Definition: Task.h:29
std::shared_ptr< std::mutex > getMutex()
Get the mutex object for this Task.
Definition: Task.h:72
ThreadSchedulerMutexes : Version of a ThreadSchedulerLargestCost that also makes sure to not try to s...
void clear() override
Empty out the queue.
SuperMap m_supermap
A super map; first key = a Mutex * Inside it: second key = the cost.
std::set< std::shared_ptr< std::mutex > > m_mutexes
Vector of currently used mutexes.
std::map< std::shared_ptr< std::mutex >, InnerMap > SuperMap
Map to maps, sorted by Mutex*.
std::shared_ptr< Task > pop(size_t threadnum) override
Retrieves the next Task to execute.
void finished(Task *task, size_t threadnum) override
Signal to the scheduler that a task is complete.
std::multimap< double, std::shared_ptr< Task > > InnerMap
Map to tasks, sorted by cost.
void push(std::shared_ptr< Task > newTask) override
Add a Task to the queue.
size_t size() override
Returns the size of the queue.
The ThreadScheduler object defines how tasks are allocated to threads and in what order.
Helper class which provides the Collimation Length for SANS instruments.