Mantid
Loading...
Searching...
No Matches
TaskBasedAlgorithm.h
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2026 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7#pragma once
8#include <algorithm>
9#include <limits>
10#include <numeric>
11#include <stdexcept>
12#include <string>
13#include <unordered_map>
14#include <unordered_set>
15#include <utility>
16#include <vector>
17
18#include "AnalysisDataService.h"
20#include "MatrixWorkspace.h"
21
22namespace Mantid::API {
23
24template <class T> class TaskBasedAlgorithm : virtual public API::DataProcessorAlgorithm {
25public:
27
28protected:
30 public:
31 explicit AlgorithmTask(T *parent, const std::string &name)
33 addDependantTaskSet(); // Start with one dependant task set by default
34 }
36 m_dependantTasks.emplace_back();
37 m_dependantOutputs.emplace_back();
38 return m_dependantTasks.size() - 1;
39 }
40 void setDependantTask(const std::string &task, const std::string &output_name = "", const std::string &alias = "",
41 const size_t dependantTaskSet = 0) {
42 if (dependantTaskSet >= m_dependantTasks.size())
43 throw std::runtime_error("Dependant task set index " + std::to_string(dependantTaskSet) +
44 " is out of range for task " + m_name);
45 if (output_name.empty()) {
46 // If no output name is provided, assume the whole task output is required
47 // If no alias is provided, use the task name as the alias
48 m_dependantTasks[dependantTaskSet][task] = {};
49 } else {
50 m_dependantTasks[dependantTaskSet][task].push_back({output_name, alias});
51 }
52 }
53 void execute() {
54 auto missingTasks = evaluateDependentTasks();
55 if (!missingTasks.empty()) {
56 throw std::runtime_error(
57 "Cannot execute task " + m_name + " as the following dependent tasks outputs are not available: " +
58 std::accumulate(std::next(missingTasks.begin()), missingTasks.end(), missingTasks.front(),
59 [](const std::string &a, const std::string &b) { return a + ", " + b; }));
60 }
62 m_parent->g_log.debug("Executing task: " + m_name + "\n");
65 m_parent->g_log.debug("Finished executing task: " + m_name + "\n");
66 }
67 const std::vector<std::string> &getExpectedOutputs() const { return m_expectedOutputs; }
68 void setExpectedOutputs(const std::vector<std::string> &expectedOutputs) { m_expectedOutputs = expectedOutputs; }
69 const std::string &name() const { return m_name; }
70 void initAsFirstTask(std::shared_ptr<MatrixWorkspace> inputWS) {
71 addDependantTaskOutput("InputWorkspace", std::move(inputWS), 0);
72 m_firstTaskFlag = true;
73 }
74 void setTaskExecutionOrder(const std::vector<std::string> *taskExecutionOrder) {
75 m_taskExecutionOrder = taskExecutionOrder;
76 }
77 const std::string &getSelectedOutput() const { return m_selectedOutput; }
79
80 protected:
82 void outputWorkspace(std::shared_ptr<MatrixWorkspace> ws, const std::string &outputName) {
83 setSelectedOutput(outputName);
84 m_parent->m_algorithmTaskOutputs[m_name][outputName] = std::move(ws);
85 }
86
87 std::shared_ptr<MatrixWorkspace> getDependantWorkspace(const std::string &outputAlias) {
88 return m_dependantOutputs[m_activeDependantTaskSet].at(outputAlias);
89 }
90
91 void setSelectedOutput(const std::string &output, const bool overwrite = false) {
92 // If overwrite is false, only set the selected output if it has not already been set
93 if (!overwrite && !m_selectedOutput.empty())
94 return;
95 m_selectedOutput = output;
96 }
97
98 private:
99 // vector of dependent task sets: map of dependant task name: dependant outputs (task name, alias pairs)
100 std::vector<std::unordered_map<std::string, std::vector<std::pair<std::string, std::string>>>> m_dependantTasks;
101 std::string m_name;
102 std::vector<std::string> m_expectedOutputs;
103 std::vector<std::unordered_map<std::string, std::shared_ptr<MatrixWorkspace>>> m_dependantOutputs;
106 std::vector<size_t> m_fulfilledDependantTaskSets;
107 const std::vector<std::string> *m_taskExecutionOrder = nullptr;
108 std::string m_selectedOutput;
109
110 virtual void executeImpl() = 0;
111
112 // if dependent outputs have been supplied, check they are fulfillable. If not, take all outputs from any specified
113 // tasks.
114 std::string populateDependantTasks(const size_t taskSetIndex) {
115 for (auto &item : m_dependantTasks[taskSetIndex]) {
116 const auto &taskName = item.first;
117 auto &outputs = item.second;
118 auto it = std::find_if(m_parent->m_stagedAlgorithmTasks.cbegin(), m_parent->m_stagedAlgorithmTasks.cend(),
119 [&taskName](std::shared_ptr<AlgorithmTask> task) { return task->name() == taskName; });
120 if (it == m_parent->m_stagedAlgorithmTasks.cend())
121 return taskName; // Task not found, this task set cannot be fulfilled
122 if (outputs.empty()) {
123 // If no specific outputs are listed, populate with the whole task output
124 const auto &expectedOutputs = (*it)->getExpectedOutputs();
125 std::transform(expectedOutputs.begin(), expectedOutputs.end(), std::back_inserter(outputs),
126 [](const auto &output) { return std::pair<std::string, std::string>{output, output}; });
127 }
128 }
129 return "";
130 }
131
133 if (m_fulfilledDependantTaskSets.size() == 1) {
134 m_activeDependantTaskSet = m_fulfilledDependantTaskSets.front();
135 return;
136 } else if (m_fulfilledDependantTaskSets.size() == 0) {
137 return;
138 }
139 // We have multiple fulfilled task sets - select based on the execution order of tasks.
140 // The task set containing the task executed in closest proximity to this task wins.
141 const auto myIt = std::find(m_taskExecutionOrder->cbegin(), m_taskExecutionOrder->cend(), m_name);
142 const auto myIndex = std::distance(m_taskExecutionOrder->cbegin(), myIt);
143 size_t closestTaskSet = 0;
144 int closestDistance = std::numeric_limits<int>::max();
145 for (auto taskSet : m_fulfilledDependantTaskSets) {
146 for (const auto &task : m_dependantTasks[taskSet]) {
147 const auto &taskName = task.first;
148 auto it = std::find(m_taskExecutionOrder->cbegin(), m_taskExecutionOrder->cend(), taskName);
149 std::size_t index = std::distance(m_taskExecutionOrder->cbegin(), it);
150 int distance = (int)myIndex - (int)index;
151 // Do not consider tasks that occur after the current task
152 if ((distance < closestDistance) && distance > 0) {
153 closestDistance = distance;
154 closestTaskSet = taskSet;
155 }
156 }
157 }
158 m_activeDependantTaskSet = closestTaskSet;
159 }
160
161 // after execution, check that expected outputs from this task are present in m_algorithmTaskOutputs
163 if (!m_parent->m_algorithmTaskOutputs.contains(m_name))
164 throw std::runtime_error("No output from task " + m_name + " found after task execution");
165 std::vector<std::string> missingOutput;
166 const auto &taskOutputs = m_parent->m_algorithmTaskOutputs[m_name];
167 std::copy_if(m_expectedOutputs.begin(), m_expectedOutputs.end(), std::back_inserter(missingOutput),
168 [&taskOutputs](const auto &output) { return !taskOutputs.contains(output); });
169 if (!missingOutput.empty()) {
170 throw std::runtime_error(
171 "Expected outputs from task " + m_name + " not found after task execution: " +
172 std::accumulate(std::next(missingOutput.begin()), missingOutput.end(), missingOutput.front(),
173 [](const std::string &a, const std::string &b) { return a + ", " + b; }));
174 }
175 }
176
177 // check if output from dependant tasks is available in m_algorithmTaskOutputs. This is set by dependant tasks
178 std::vector<std::string> evaluateDependentTasks() {
179 // If this is the first task, we expect the dependant outputs to be set as algorithm properties rather than
180 // outputs from other tasks
181 if (m_firstTaskFlag)
182 return {};
183 std::vector<std::string> missingTasksAll;
184 // Loop through each task set
185 for (size_t i = 0; i < m_dependantTasks.size(); ++i) {
186 // if task set is unfulfillable due to missing tasks
187 std::vector<std::string> missingTasks;
188 const auto &missingTask = populateDependantTasks(i);
189 if (!missingTask.empty()) {
190 missingTasks.push_back("Task set " + std::to_string(i) +
191 " unfulfillable as required tasks not staged: " + missingTask);
192 } else {
193 for (const auto &[taskName, outputs] : m_dependantTasks[i]) {
194 if (!m_parent->m_algorithmTaskOutputs.contains(taskName)) {
195 missingTasks.push_back("Task set: " + std::to_string(i) + " Task name: " + taskName + ": ALL OUTPUTS");
196 } else {
197 for (const auto &output : outputs) {
198 if (!m_parent->m_algorithmTaskOutputs[taskName].contains(output.first)) {
199 missingTasks.push_back("Task set: " + std::to_string(i) + " Task name: " + taskName + ": " +
200 output.first);
201 } else {
202 // populate dependent outputs for use in the task execution
203 addDependantTaskOutput(output.second, m_parent->m_algorithmTaskOutputs[taskName][output.first], i);
204 }
205 }
206 }
207 }
208 }
209 // If we have found a task set with all outputs available, add this to fulfilled sets
210 // otherwise, add missing tasks to the list of missing tasks for all sets
211 if (missingTasks.empty()) {
212 m_fulfilledDependantTaskSets.push_back(i);
213 } else {
214 missingTasksAll.insert(missingTasksAll.cend(), missingTasks.cbegin(), missingTasks.cend());
215 }
216 }
217 if (m_fulfilledDependantTaskSets.empty())
218 return missingTasksAll;
219 return {};
220 }
221
222 void addDependantTaskOutput(const std::string &outputName, std::shared_ptr<MatrixWorkspace> ws,
223 const size_t taskSetIndex) {
224 m_dependantOutputs[taskSetIndex][outputName] = std::move(ws);
225 }
226 };
227
228 void stageAlgorithmTasks(std::vector<std::shared_ptr<AlgorithmTask>> tasks) {
229 if (tasks.empty())
230 return;
231 // for first task in sequence, pass in input workspace (prevent mutating of input by default)
232 API::MatrixWorkspace_sptr inputWS = getProperty("InputWorkspace");
233 if (!m_mutableInput) {
234 auto cloneAlg = createChildAlgorithm("CloneWorkspace");
235 cloneAlg->setProperty("InputWorkspace", inputWS);
236 cloneAlg->execute();
237 Workspace_sptr clone = cloneAlg->getProperty("OutputWorkspace");
238 inputWS = std::dynamic_pointer_cast<MatrixWorkspace>(clone);
239 }
240 tasks[0]->initAsFirstTask(std::move(inputWS));
241 m_stagedAlgorithmTasks = std::move(tasks);
242 }
243
245 m_taskExecutionOrder =
246 isDefault("TaskExecutionOrder") ? constructTaskExecutionOrder() : getProperty("TaskExecutionOrder");
247 std::vector<std::shared_ptr<AlgorithmTask>> tasksToStage(m_taskExecutionOrder.size());
248 validateTaskExecutionOrder();
249 for (auto &task : m_AlgorithmTasks) {
250 task->setTaskExecutionOrder(&m_taskExecutionOrder);
251 auto it = std::find(m_taskExecutionOrder.begin(), m_taskExecutionOrder.end(), task->name());
252 if (it != m_taskExecutionOrder.end()) {
253 std::size_t index = std::distance(m_taskExecutionOrder.begin(), it);
254 tasksToStage[index] = task;
255 }
256 }
257 stageAlgorithmTasks(std::move(tasksToStage));
258 }
259
261 std::unordered_set<std::string> testSet;
262 testSet.reserve(m_taskExecutionOrder.size());
263 for (const auto &task : m_taskExecutionOrder) {
264 auto it = std::find_if(m_AlgorithmTasks.cbegin(), m_AlgorithmTasks.cend(),
265 [&](const auto &algorithmTask) { return algorithmTask->name() == task; });
266 if (it == m_AlgorithmTasks.cend())
267 throw std::runtime_error("Invalid task specified in TaskExecutionOrder: " + task);
268 auto [taskIt, inserted] = testSet.insert(task);
269 if (!inserted)
270 throw std::runtime_error("Duplicate task specified in TaskExecutionOrder, this is not yet supported: " + task);
271 }
272 }
273
274 void execTasks(const std::string &diagWorkspacePrefix = "") {
275 configureAlgorithmTasks();
276
277 try {
278 int step = 0;
279 for (size_t i = 0; i < m_stagedAlgorithmTasks.size(); ++i) {
280 const auto &task = m_stagedAlgorithmTasks[i];
281 task->execute();
282 if (!diagWorkspacePrefix.empty()) {
283 const auto &taskOutput = m_algorithmTaskOutputs.at(task->name());
284 for (const auto &output : taskOutput) {
285 const auto &outputName = output.first;
286 outputDebugWorkspace(output.second, diagWorkspacePrefix, "_" + outputName, step);
287 }
288 step++;
289 }
290 // Output the selected output of the last task
291 if (i == m_stagedAlgorithmTasks.size() - 1)
292 setProperty("OutputWorkspace", m_algorithmTaskOutputs.at(task->name()).at(task->getSelectedOutput()));
293 }
294 clearMembers();
295 } catch (...) { // ensure members are cleared even if error is throw during execution
296 clearMembers();
297 throw;
298 }
299 }
300
302 for (auto &task : m_stagedAlgorithmTasks) {
303 task->clear();
304 }
305 m_algorithmTaskOutputs.clear();
306 }
307
308 void outputDebugWorkspace(const MatrixWorkspace_sptr &ws, const std::string &wsName, const std::string &wsSuffix,
309 const int step) {
310 // Clone the workspace to preserve its state as this instance
311 MatrixWorkspace_sptr cloneWS = ws->clone();
312 AnalysisDataService::Instance().addOrReplace(wsName + "_" + std::to_string(step) + wsSuffix, cloneWS);
313 }
314
315 template <typename... TaskTypes>
316 void initTaskBasedAlgorithm(const std::vector<std::string> &defaultTaskExecutionOrder = {}) {
317 static_assert((std::is_base_of_v<AlgorithmTask, TaskTypes> && ...), "All TaskTypes must derive from AlgorithmTask");
318 m_AlgorithmTasks.clear();
319 m_AlgorithmTasks.reserve(sizeof...(TaskTypes));
320 auto *parent = static_cast<T *>(this);
321 (m_AlgorithmTasks.emplace_back(std::make_shared<TaskTypes>(parent)), ...);
322 declareProperty("TaskExecutionOrder", defaultTaskExecutionOrder, "The tasks to execute, in execution order.");
323 }
324
325 // Returns default TaskExecutionOrder property, if manipulation is necessary
326 // provide override
327 virtual std::vector<std::string> constructTaskExecutionOrder() {
328 std::vector<std::string> teo = getProperty("TaskExecutionOrder");
329 return teo;
330 };
331
332 // Allows the tasks to mutate the original input workspace
333 void setMutableInput(const bool inputIsMutable) { m_mutableInput = inputIsMutable; }
334
335 std::vector<std::shared_ptr<AlgorithmTask>> m_stagedAlgorithmTasks;
336 // map of task name: (map of output name: outputs)
337 std::unordered_map<std::string, std::unordered_map<std::string, std::shared_ptr<MatrixWorkspace>>>
339 std::vector<std::shared_ptr<AlgorithmTask>> m_AlgorithmTasks;
340 std::vector<std::string> m_taskExecutionOrder;
342};
343} // namespace Mantid::API
std::map< DeltaEMode::Type, std::string > index
Data processor algorithm to be used as a parent to workflow algorithms.
void initAsFirstTask(std::shared_ptr< MatrixWorkspace > inputWS)
std::vector< std::unordered_map< std::string, std::vector< std::pair< std::string, std::string > > > > m_dependantTasks
const std::vector< std::string > * m_taskExecutionOrder
void setTaskExecutionOrder(const std::vector< std::string > *taskExecutionOrder)
void setSelectedOutput(const std::string &output, const bool overwrite=false)
std::shared_ptr< MatrixWorkspace > getDependantWorkspace(const std::string &outputAlias)
AlgorithmTask(T *parent, const std::string &name)
std::vector< std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_dependantOutputs
void setDependantTask(const std::string &task, const std::string &output_name="", const std::string &alias="", const size_t dependantTaskSet=0)
void addDependantTaskOutput(const std::string &outputName, std::shared_ptr< MatrixWorkspace > ws, const size_t taskSetIndex)
void setExpectedOutputs(const std::vector< std::string > &expectedOutputs)
const std::vector< std::string > & getExpectedOutputs() const
void outputWorkspace(std::shared_ptr< MatrixWorkspace > ws, const std::string &outputName)
std::string populateDependantTasks(const size_t taskSetIndex)
std::unordered_map< std::string, std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_algorithmTaskOutputs
void execTasks(const std::string &diagWorkspacePrefix="")
std::vector< std::shared_ptr< AlgorithmTask > > m_stagedAlgorithmTasks
void initTaskBasedAlgorithm(const std::vector< std::string > &defaultTaskExecutionOrder={})
std::vector< std::shared_ptr< AlgorithmTask > > m_AlgorithmTasks
std::vector< std::string > m_taskExecutionOrder
void outputDebugWorkspace(const MatrixWorkspace_sptr &ws, const std::string &wsName, const std::string &wsSuffix, const int step)
void setMutableInput(const bool inputIsMutable)
virtual std::vector< std::string > constructTaskExecutionOrder()
void stageAlgorithmTasks(std::vector< std::shared_ptr< AlgorithmTask > > tasks)
std::shared_ptr< Workspace > Workspace_sptr
shared pointer to Mantid::API::Workspace
std::shared_ptr< MatrixWorkspace > MatrixWorkspace_sptr
shared pointer to the matrix workspace base class
std::string to_string(const wide_integer< Bits, Signed > &n)