Mantid
Loading...
Searching...
No Matches
TaskBasedAlgorithm.h
Go to the documentation of this file.
1// Mantid Repository : https://github.com/mantidproject/mantid
2//
3// Copyright © 2026 ISIS Rutherford Appleton Laboratory UKRI,
4// NScD Oak Ridge National Laboratory, European Spallation Source,
5// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
6// SPDX - License - Identifier: GPL - 3.0 +
7#pragma once
8#include <algorithm>
9#include <limits>
10#include <numeric>
11#include <stdexcept>
12#include <string>
13#include <unordered_map>
14#include <unordered_set>
15#include <vector>
16
17#include "AnalysisDataService.h"
19#include "MatrixWorkspace.h"
20
21namespace Mantid::API {
22
23template <class T> class TaskBasedAlgorithm : virtual public API::DataProcessorAlgorithm {
24public:
26
27protected:
29 public:
30 explicit AlgorithmTask(T *parent, const std::string &name)
32 addDependantTaskSet(); // Start with one dependant task set by default
33 }
35 m_dependantTasks.emplace_back();
36 m_dependantOutputs.emplace_back();
37 return m_dependantTasks.size() - 1;
38 }
39 void setDependantTask(const std::string &task, const std::string &output_name = "", const std::string &alias = "",
40 const size_t dependantTaskSet = 0) {
41 if (dependantTaskSet >= m_dependantTasks.size())
42 throw std::runtime_error("Dependant task set index " + std::to_string(dependantTaskSet) +
43 " is out of range for task " + m_name);
44 if (output_name.empty()) {
45 // If no output name is provided, assume the whole task output is required
46 // If no alias is provided, use the task name as the alias
47 m_dependantTasks[dependantTaskSet][task] = {};
48 } else {
49 m_dependantTasks[dependantTaskSet][task].push_back({output_name, alias});
50 }
51 }
52 void execute() {
53 auto missingTasks = evaluateDependentTasks();
54 if (!missingTasks.empty()) {
55 throw std::runtime_error(
56 "Cannot execute task " + m_name + " as the following dependent tasks outputs are not available: " +
57 std::accumulate(std::next(missingTasks.begin()), missingTasks.end(), missingTasks.front(),
58 [](const std::string &a, const std::string &b) { return a + ", " + b; }));
59 }
61 m_parent->g_log.debug("Executing task: " + m_name + "\n");
64 m_parent->g_log.debug("Finished executing task: " + m_name + "\n");
65 }
66 const std::vector<std::string> &getExpectedOutputs() const { return m_expectedOutputs; }
67 void setExpectedOutputs(const std::vector<std::string> &expectedOutputs) { m_expectedOutputs = expectedOutputs; }
68 const std::string &name() const { return m_name; }
69 void initAsFirstTask(std::shared_ptr<MatrixWorkspace> inputWS) {
70 addDependantTaskOutput("InputWorkspace", inputWS, 0);
71 m_firstTaskFlag = true;
72 }
73 void setTaskExecutionOrder(const std::vector<std::string> *taskExecutionOrder) {
74 m_taskExecutionOrder = taskExecutionOrder;
75 }
76 const std::string &getSelectedOutput() const { return m_selectedOutput; }
78
79 protected:
81 void outputWorkspace(std::shared_ptr<MatrixWorkspace> ws, const std::string &outputName) {
82 setSelectedOutput(outputName);
83 m_parent->m_algorithmTaskOutputs[m_name][outputName] = ws;
84 }
85
86 std::shared_ptr<MatrixWorkspace> getDependantWorkspace(const std::string &outputAlias) {
87 return m_dependantOutputs[m_activeDependantTaskSet].at(outputAlias);
88 }
89
90 void setSelectedOutput(const std::string &output, const bool overwrite = false) {
91 // If overwrite is false, only set the selected output if it has not already been set
92 if (!overwrite && !m_selectedOutput.empty())
93 return;
94 m_selectedOutput = output;
95 }
96
97 private:
98 // vector of dependent task sets: map of dependant task name: dependant outputs (task name, alias pairs)
99 std::vector<std::unordered_map<std::string, std::vector<std::pair<std::string, std::string>>>> m_dependantTasks;
100 std::string m_name;
101 std::vector<std::string> m_expectedOutputs;
102 std::vector<std::unordered_map<std::string, std::shared_ptr<MatrixWorkspace>>> m_dependantOutputs;
105 std::vector<size_t> m_fulfilledDependantTaskSets;
106 const std::vector<std::string> *m_taskExecutionOrder = nullptr;
107 std::string m_selectedOutput;
108
109 virtual void executeImpl() = 0;
110
111 // if dependent outputs have been supplied, check they are fulfillable. If not, take all outputs from any specified
112 // tasks.
113 std::string populateDependantTasks(const size_t taskSetIndex) {
114 for (auto &item : m_dependantTasks[taskSetIndex]) {
115 const auto &taskName = item.first;
116 auto &outputs = item.second;
117 auto it = std::find_if(m_parent->m_stagedAlgorithmTasks.cbegin(), m_parent->m_stagedAlgorithmTasks.cend(),
118 [&taskName](std::shared_ptr<AlgorithmTask> task) { return task->name() == taskName; });
119 if (it == m_parent->m_stagedAlgorithmTasks.cend())
120 return taskName; // Task not found, this task set cannot be fulfilled
121 if (outputs.empty()) {
122 // If no specific outputs are listed, populate with the whole task output
123 const auto &expectedOutputs = (*it)->getExpectedOutputs();
124 std::transform(expectedOutputs.begin(), expectedOutputs.end(), std::back_inserter(outputs),
125 [](const auto &output) { return std::pair<std::string, std::string>{output, output}; });
126 }
127 }
128 return "";
129 }
130
132 if (m_fulfilledDependantTaskSets.size() == 1) {
133 m_activeDependantTaskSet = m_fulfilledDependantTaskSets.front();
134 return;
135 } else if (m_fulfilledDependantTaskSets.size() == 0) {
136 return;
137 }
138 // We have multiple fulfilled task sets - select based on the execution order of tasks.
139 // The task set containing the task executed in closest proximity to this task wins.
140 const auto myIt = std::find(m_taskExecutionOrder->cbegin(), m_taskExecutionOrder->cend(), m_name);
141 const auto myIndex = std::distance(m_taskExecutionOrder->cbegin(), myIt);
142 size_t closestTaskSet = 0;
143 int closestDistance = std::numeric_limits<int>::max();
144 for (auto taskSet : m_fulfilledDependantTaskSets) {
145 for (const auto &task : m_dependantTasks[taskSet]) {
146 const auto &taskName = task.first;
147 auto it = std::find(m_taskExecutionOrder->cbegin(), m_taskExecutionOrder->cend(), taskName);
148 std::size_t index = std::distance(m_taskExecutionOrder->cbegin(), it);
149 int distance = (int)myIndex - (int)index;
150 // Do not consider tasks that occur after the current task
151 if ((distance < closestDistance) && distance > 0) {
152 closestDistance = distance;
153 closestTaskSet = taskSet;
154 }
155 }
156 }
157 m_activeDependantTaskSet = closestTaskSet;
158 }
159
160 // after execution, check that expected outputs from this task are present in m_algorithmTaskOutputs
162 if (!m_parent->m_algorithmTaskOutputs.contains(m_name))
163 throw std::runtime_error("No output from task " + m_name + " found after task execution");
164 std::vector<std::string> missingOutput;
165 const auto &taskOutputs = m_parent->m_algorithmTaskOutputs[m_name];
166 std::copy_if(m_expectedOutputs.begin(), m_expectedOutputs.end(), std::back_inserter(missingOutput),
167 [&taskOutputs](const auto &output) { return !taskOutputs.contains(output); });
168 if (!missingOutput.empty()) {
169 throw std::runtime_error(
170 "Expected outputs from task " + m_name + " not found after task execution: " +
171 std::accumulate(std::next(missingOutput.begin()), missingOutput.end(), missingOutput.front(),
172 [](const std::string &a, const std::string &b) { return a + ", " + b; }));
173 }
174 }
175
176 // check if output from dependant tasks is available in m_algorithmTaskOutputs. This is set by dependant tasks
177 std::vector<std::string> evaluateDependentTasks() {
178 // If this is the first task, we expect the dependant outputs to be set as algorithm properties rather than
179 // outputs from other tasks
180 if (m_firstTaskFlag)
181 return {};
182 std::vector<std::string> missingTasksAll;
183 // Loop through each task set
184 for (size_t i = 0; i < m_dependantTasks.size(); ++i) {
185 // if task set is unfulfillable due to missing tasks
186 std::vector<std::string> missingTasks;
187 const auto &missingTask = populateDependantTasks(i);
188 if (!missingTask.empty()) {
189 missingTasks.push_back("Task set " + std::to_string(i) +
190 " unfulfillable as required tasks not staged: " + missingTask);
191 } else {
192 for (const auto &[taskName, outputs] : m_dependantTasks[i]) {
193 if (!m_parent->m_algorithmTaskOutputs.contains(taskName)) {
194 missingTasks.push_back("Task set: " + std::to_string(i) + " Task name: " + taskName + ": ALL OUTPUTS");
195 } else {
196 for (const auto &output : outputs) {
197 if (!m_parent->m_algorithmTaskOutputs[taskName].contains(output.first)) {
198 missingTasks.push_back("Task set: " + std::to_string(i) + " Task name: " + taskName + ": " +
199 output.first);
200 } else {
201 // populate dependent outputs for use in the task execution
202 addDependantTaskOutput(output.second, m_parent->m_algorithmTaskOutputs[taskName][output.first], i);
203 }
204 }
205 }
206 }
207 }
208 // If we have found a task set with all outputs available, add this to fulfilled sets
209 // otherwise, add missing tasks to the list of missing tasks for all sets
210 if (missingTasks.empty()) {
211 m_fulfilledDependantTaskSets.push_back(i);
212 } else {
213 missingTasksAll.insert(missingTasksAll.cend(), missingTasks.cbegin(), missingTasks.cend());
214 }
215 }
216 return (m_fulfilledDependantTaskSets.empty() ? missingTasksAll : std::vector<std::string>{});
217 }
218
219 void addDependantTaskOutput(const std::string &outputName, std::shared_ptr<MatrixWorkspace> ws,
220 const size_t taskSetIndex) {
221 m_dependantOutputs[taskSetIndex][outputName] = ws;
222 }
223 };
224
225 void stageAlgorithmTasks(std::vector<std::shared_ptr<AlgorithmTask>> tasks) {
226 if (tasks.empty())
227 return;
228 // for first task in sequence, pass in input workspace (prevent mutating of input by default)
229 API::MatrixWorkspace_sptr inputWS = getProperty("InputWorkspace");
230 if (!m_mutableInput) {
231 auto cloneAlg = createChildAlgorithm("CloneWorkspace");
232 cloneAlg->setProperty("InputWorkspace", inputWS);
233 cloneAlg->execute();
234 Workspace_sptr clone = cloneAlg->getProperty("OutputWorkspace");
235 inputWS = std::dynamic_pointer_cast<MatrixWorkspace>(clone);
236 }
237 tasks[0]->initAsFirstTask(inputWS);
238 m_stagedAlgorithmTasks = tasks;
239 }
240
242 m_taskExecutionOrder =
243 isDefault("TaskExecutionOrder") ? constructTaskExecutionOrder() : getProperty("TaskExecutionOrder");
244 std::vector<std::shared_ptr<AlgorithmTask>> tasksToStage(m_taskExecutionOrder.size());
245 validateTaskExecutionOrder();
246 for (auto &task : m_AlgorithmTasks) {
247 task->setTaskExecutionOrder(&m_taskExecutionOrder);
248 auto it = std::find(m_taskExecutionOrder.begin(), m_taskExecutionOrder.end(), task->name());
249 if (it != m_taskExecutionOrder.end()) {
250 std::size_t index = std::distance(m_taskExecutionOrder.begin(), it);
251 tasksToStage[index] = task;
252 }
253 }
254 stageAlgorithmTasks(tasksToStage);
255 }
256
258 std::unordered_set<std::string> testSet;
259 testSet.reserve(m_taskExecutionOrder.size());
260 for (const auto &task : m_taskExecutionOrder) {
261 auto it = std::find_if(m_AlgorithmTasks.cbegin(), m_AlgorithmTasks.cend(),
262 [&](const auto &algorithmTask) { return algorithmTask->name() == task; });
263 if (it == m_AlgorithmTasks.cend())
264 throw std::runtime_error("Invalid task specified in TaskExecutionOrder: " + task);
265 auto [taskIt, inserted] = testSet.insert(task);
266 if (!inserted)
267 throw std::runtime_error("Duplicate task specified in TaskExecutionOrder, this is not yet supported: " + task);
268 }
269 }
270
271 void execTasks(const std::string &diagWorkspacePrefix = "") {
272 configureAlgorithmTasks();
273
274 try {
275 int step = 0;
276 for (size_t i = 0; i < m_stagedAlgorithmTasks.size(); ++i) {
277 const auto &task = m_stagedAlgorithmTasks[i];
278 task->execute();
279 if (!diagWorkspacePrefix.empty()) {
280 const auto &taskOutput = m_algorithmTaskOutputs.at(task->name());
281 for (const auto &output : taskOutput) {
282 const auto &outputName = output.first;
283 outputDebugWorkspace(output.second, diagWorkspacePrefix, "_" + outputName, step);
284 }
285 step++;
286 }
287 // Output the selected output of the last task
288 if (i == m_stagedAlgorithmTasks.size() - 1)
289 setProperty("OutputWorkspace", m_algorithmTaskOutputs.at(task->name()).at(task->getSelectedOutput()));
290 }
291 clearMembers();
292 } catch (...) { // ensure members are cleared even if error is throw during execution
293 clearMembers();
294 throw;
295 }
296 }
297
299 for (auto &task : m_stagedAlgorithmTasks) {
300 task->clear();
301 }
302 m_algorithmTaskOutputs.clear();
303 }
304
305 void outputDebugWorkspace(const MatrixWorkspace_sptr &ws, const std::string &wsName, const std::string &wsSuffix,
306 const int step) {
307 // Clone the workspace to preserve its state as this instance
308 MatrixWorkspace_sptr cloneWS = ws->clone();
309 AnalysisDataService::Instance().addOrReplace(wsName + "_" + std::to_string(step) + wsSuffix, cloneWS);
310 }
311
312 template <typename... TaskTypes>
313 void initTaskBasedAlgorithm(const std::vector<std::string> &defaultTaskExecutionOrder = {}) {
314 static_assert((std::is_base_of_v<AlgorithmTask, TaskTypes> && ...), "All TaskTypes must derive from AlgorithmTask");
315 m_AlgorithmTasks.clear();
316 m_AlgorithmTasks.reserve(sizeof...(TaskTypes));
317 auto *parent = static_cast<T *>(this);
318 (m_AlgorithmTasks.emplace_back(std::make_shared<TaskTypes>(parent)), ...);
319 declareProperty("TaskExecutionOrder", defaultTaskExecutionOrder, "The tasks to execute, in execution order.");
320 }
321
322 // Returns default TaskExecutionOrder property, if manipulation is necessary
323 // provide override
324 virtual std::vector<std::string> constructTaskExecutionOrder() {
325 std::vector<std::string> teo = getProperty("TaskExecutionOrder");
326 return teo;
327 };
328
329 // Allows the tasks to mutate the original input workspace
330 void setMutableInput(const bool inputIsMutable) { m_mutableInput = inputIsMutable; }
331
332 std::vector<std::shared_ptr<AlgorithmTask>> m_stagedAlgorithmTasks;
333 // map of task name: (map of output name: outputs)
334 std::unordered_map<std::string, std::unordered_map<std::string, std::shared_ptr<MatrixWorkspace>>>
336 std::vector<std::shared_ptr<AlgorithmTask>> m_AlgorithmTasks;
337 std::vector<std::string> m_taskExecutionOrder;
339};
340} // namespace Mantid::API
std::map< DeltaEMode::Type, std::string > index
Data processor algorithm to be used as a parent to workflow algorithms.
void initAsFirstTask(std::shared_ptr< MatrixWorkspace > inputWS)
std::vector< std::unordered_map< std::string, std::vector< std::pair< std::string, std::string > > > > m_dependantTasks
const std::vector< std::string > * m_taskExecutionOrder
void setTaskExecutionOrder(const std::vector< std::string > *taskExecutionOrder)
void setSelectedOutput(const std::string &output, const bool overwrite=false)
std::shared_ptr< MatrixWorkspace > getDependantWorkspace(const std::string &outputAlias)
AlgorithmTask(T *parent, const std::string &name)
std::vector< std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_dependantOutputs
void setDependantTask(const std::string &task, const std::string &output_name="", const std::string &alias="", const size_t dependantTaskSet=0)
void addDependantTaskOutput(const std::string &outputName, std::shared_ptr< MatrixWorkspace > ws, const size_t taskSetIndex)
void setExpectedOutputs(const std::vector< std::string > &expectedOutputs)
const std::vector< std::string > & getExpectedOutputs() const
void outputWorkspace(std::shared_ptr< MatrixWorkspace > ws, const std::string &outputName)
std::string populateDependantTasks(const size_t taskSetIndex)
std::unordered_map< std::string, std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_algorithmTaskOutputs
void execTasks(const std::string &diagWorkspacePrefix="")
std::vector< std::shared_ptr< AlgorithmTask > > m_stagedAlgorithmTasks
void initTaskBasedAlgorithm(const std::vector< std::string > &defaultTaskExecutionOrder={})
std::vector< std::shared_ptr< AlgorithmTask > > m_AlgorithmTasks
std::vector< std::string > m_taskExecutionOrder
void outputDebugWorkspace(const MatrixWorkspace_sptr &ws, const std::string &wsName, const std::string &wsSuffix, const int step)
void setMutableInput(const bool inputIsMutable)
virtual std::vector< std::string > constructTaskExecutionOrder()
void stageAlgorithmTasks(std::vector< std::shared_ptr< AlgorithmTask > > tasks)
std::shared_ptr< Workspace > Workspace_sptr
shared pointer to Mantid::API::Workspace
std::shared_ptr< MatrixWorkspace > MatrixWorkspace_sptr
shared pointer to the matrix workspace base class
std::string to_string(const wide_integer< Bits, Signed > &n)