13#include <unordered_map>
14#include <unordered_set>
39 void setDependantTask(
const std::string &task,
const std::string &output_name =
"",
const std::string &alias =
"",
40 const size_t dependantTaskSet = 0) {
42 throw std::runtime_error(
"Dependant task set index " +
std::to_string(dependantTaskSet) +
43 " is out of range for task " +
m_name);
44 if (output_name.empty()) {
54 if (!missingTasks.empty()) {
55 throw std::runtime_error(
56 "Cannot execute task " +
m_name +
" as the following dependent tasks outputs are not available: " +
57 std::accumulate(std::next(missingTasks.begin()), missingTasks.end(), missingTasks.front(),
58 [](
const std::string &a,
const std::string &b) { return a +
", " + b; }));
64 m_parent->g_log.debug(
"Finished executing task: " +
m_name +
"\n");
81 void outputWorkspace(std::shared_ptr<MatrixWorkspace> ws,
const std::string &outputName) {
99 std::vector<std::unordered_map<std::string, std::vector<std::pair<std::string, std::string>>>>
m_dependantTasks;
115 const auto &taskName = item.first;
116 auto &outputs = item.second;
117 auto it = std::find_if(
m_parent->m_stagedAlgorithmTasks.cbegin(),
m_parent->m_stagedAlgorithmTasks.cend(),
118 [&taskName](std::shared_ptr<AlgorithmTask> task) { return task->name() == taskName; });
119 if (it ==
m_parent->m_stagedAlgorithmTasks.cend())
121 if (outputs.empty()) {
123 const auto &expectedOutputs = (*it)->getExpectedOutputs();
124 std::transform(expectedOutputs.begin(), expectedOutputs.end(), std::back_inserter(outputs),
125 [](
const auto &output) { return std::pair<std::string, std::string>{output, output}; });
132 if (m_fulfilledDependantTaskSets.size() == 1) {
133 m_activeDependantTaskSet = m_fulfilledDependantTaskSets.front();
135 }
else if (m_fulfilledDependantTaskSets.size() == 0) {
142 size_t closestTaskSet = 0;
143 int closestDistance = std::numeric_limits<int>::max();
144 for (
auto taskSet : m_fulfilledDependantTaskSets) {
145 for (
const auto &task : m_dependantTasks[taskSet]) {
146 const auto &taskName = task.first;
149 int distance = (int)myIndex - (
int)
index;
151 if ((distance < closestDistance) && distance > 0) {
152 closestDistance = distance;
153 closestTaskSet = taskSet;
157 m_activeDependantTaskSet = closestTaskSet;
162 if (!m_parent->m_algorithmTaskOutputs.contains(m_name))
163 throw std::runtime_error(
"No output from task " + m_name +
" found after task execution");
164 std::vector<std::string> missingOutput;
165 const auto &taskOutputs = m_parent->m_algorithmTaskOutputs[m_name];
166 std::copy_if(m_expectedOutputs.begin(), m_expectedOutputs.end(), std::back_inserter(missingOutput),
167 [&taskOutputs](
const auto &output) { return !taskOutputs.contains(output); });
168 if (!missingOutput.empty()) {
169 throw std::runtime_error(
170 "Expected outputs from task " + m_name +
" not found after task execution: " +
171 std::accumulate(std::next(missingOutput.begin()), missingOutput.end(), missingOutput.front(),
172 [](
const std::string &a,
const std::string &b) { return a +
", " + b; }));
182 std::vector<std::string> missingTasksAll;
184 for (
size_t i = 0; i < m_dependantTasks.size(); ++i) {
186 std::vector<std::string> missingTasks;
187 const auto &missingTask = populateDependantTasks(i);
188 if (!missingTask.empty()) {
190 " unfulfillable as required tasks not staged: " + missingTask);
192 for (
const auto &[taskName, outputs] : m_dependantTasks[i]) {
193 if (!m_parent->m_algorithmTaskOutputs.contains(taskName)) {
194 missingTasks.push_back(
"Task set: " +
std::to_string(i) +
" Task name: " + taskName +
": ALL OUTPUTS");
196 for (
const auto &output : outputs) {
197 if (!m_parent->m_algorithmTaskOutputs[taskName].contains(output.first)) {
198 missingTasks.push_back(
"Task set: " +
std::to_string(i) +
" Task name: " + taskName +
": " +
202 addDependantTaskOutput(output.second, m_parent->m_algorithmTaskOutputs[taskName][output.first], i);
210 if (missingTasks.empty()) {
211 m_fulfilledDependantTaskSets.push_back(i);
213 missingTasksAll.insert(missingTasksAll.cend(), missingTasks.cbegin(), missingTasks.cend());
216 return (m_fulfilledDependantTaskSets.empty() ? missingTasksAll : std::vector<std::string>{});
220 const size_t taskSetIndex) {
221 m_dependantOutputs[taskSetIndex][outputName] = ws;
230 if (!m_mutableInput) {
231 auto cloneAlg = createChildAlgorithm(
"CloneWorkspace");
232 cloneAlg->setProperty(
"InputWorkspace", inputWS);
235 inputWS = std::dynamic_pointer_cast<MatrixWorkspace>(clone);
237 tasks[0]->initAsFirstTask(inputWS);
238 m_stagedAlgorithmTasks = tasks;
242 m_taskExecutionOrder =
243 isDefault(
"TaskExecutionOrder") ? constructTaskExecutionOrder() : getProperty(
"TaskExecutionOrder");
244 std::vector<std::shared_ptr<AlgorithmTask>> tasksToStage(m_taskExecutionOrder.size());
245 validateTaskExecutionOrder();
246 for (
auto &task : m_AlgorithmTasks) {
247 task->setTaskExecutionOrder(&m_taskExecutionOrder);
248 auto it = std::find(m_taskExecutionOrder.begin(), m_taskExecutionOrder.end(), task->name());
249 if (it != m_taskExecutionOrder.end()) {
250 std::size_t
index = std::distance(m_taskExecutionOrder.begin(), it);
251 tasksToStage[
index] = task;
254 stageAlgorithmTasks(tasksToStage);
258 std::unordered_set<std::string> testSet;
259 testSet.reserve(m_taskExecutionOrder.size());
260 for (
const auto &task : m_taskExecutionOrder) {
261 auto it = std::find_if(m_AlgorithmTasks.cbegin(), m_AlgorithmTasks.cend(),
262 [&](
const auto &algorithmTask) { return algorithmTask->name() == task; });
263 if (it == m_AlgorithmTasks.cend())
264 throw std::runtime_error(
"Invalid task specified in TaskExecutionOrder: " + task);
265 auto [taskIt, inserted] = testSet.insert(task);
267 throw std::runtime_error(
"Duplicate task specified in TaskExecutionOrder, this is not yet supported: " + task);
271 void execTasks(
const std::string &diagWorkspacePrefix =
"") {
272 configureAlgorithmTasks();
276 for (
size_t i = 0; i < m_stagedAlgorithmTasks.size(); ++i) {
277 const auto &task = m_stagedAlgorithmTasks[i];
279 if (!diagWorkspacePrefix.empty()) {
280 const auto &taskOutput = m_algorithmTaskOutputs.at(task->name());
281 for (
const auto &output : taskOutput) {
282 const auto &outputName = output.first;
283 outputDebugWorkspace(output.second, diagWorkspacePrefix,
"_" + outputName, step);
288 if (i == m_stagedAlgorithmTasks.size() - 1)
289 setProperty(
"OutputWorkspace", m_algorithmTaskOutputs.at(task->name()).at(task->getSelectedOutput()));
299 for (
auto &task : m_stagedAlgorithmTasks) {
302 m_algorithmTaskOutputs.clear();
309 AnalysisDataService::Instance().addOrReplace(wsName +
"_" +
std::to_string(step) + wsSuffix, cloneWS);
312 template <
typename... TaskTypes>
314 static_assert((std::is_base_of_v<AlgorithmTask, TaskTypes> && ...),
"All TaskTypes must derive from AlgorithmTask");
315 m_AlgorithmTasks.clear();
316 m_AlgorithmTasks.reserve(
sizeof...(TaskTypes));
317 auto *parent =
static_cast<T *
>(
this);
318 (m_AlgorithmTasks.emplace_back(std::make_shared<TaskTypes>(parent)), ...);
319 declareProperty(
"TaskExecutionOrder", defaultTaskExecutionOrder,
"The tasks to execute, in execution order.");
325 std::vector<std::string> teo = getProperty(
"TaskExecutionOrder");
334 std::unordered_map<std::string, std::unordered_map<std::string, std::shared_ptr<MatrixWorkspace>>>
std::map< DeltaEMode::Type, std::string > index
Data processor algorithm to be used as a parent to workflow algorithms.
void initAsFirstTask(std::shared_ptr< MatrixWorkspace > inputWS)
std::vector< size_t > m_fulfilledDependantTaskSets
std::vector< std::unordered_map< std::string, std::vector< std::pair< std::string, std::string > > > > m_dependantTasks
std::string m_selectedOutput
const std::vector< std::string > * m_taskExecutionOrder
virtual void executeImpl()=0
const std::string & getSelectedOutput() const
size_t m_activeDependantTaskSet
void setTaskExecutionOrder(const std::vector< std::string > *taskExecutionOrder)
void setSelectedOutput(const std::string &output, const bool overwrite=false)
std::shared_ptr< MatrixWorkspace > getDependantWorkspace(const std::string &outputAlias)
AlgorithmTask(T *parent, const std::string &name)
std::vector< std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_dependantOutputs
size_t addDependantTaskSet()
void setDependantTask(const std::string &task, const std::string &output_name="", const std::string &alias="", const size_t dependantTaskSet=0)
void addDependantTaskOutput(const std::string &outputName, std::shared_ptr< MatrixWorkspace > ws, const size_t taskSetIndex)
void checkExpectedOutputs()
void setExpectedOutputs(const std::vector< std::string > &expectedOutputs)
std::vector< std::string > evaluateDependentTasks()
const std::vector< std::string > & getExpectedOutputs() const
const std::string & name() const
std::vector< std::string > m_expectedOutputs
void outputWorkspace(std::shared_ptr< MatrixWorkspace > ws, const std::string &outputName)
std::string populateDependantTasks(const size_t taskSetIndex)
std::unordered_map< std::string, std::unordered_map< std::string, std::shared_ptr< MatrixWorkspace > > > m_algorithmTaskOutputs
void execTasks(const std::string &diagWorkspacePrefix="")
std::vector< std::shared_ptr< AlgorithmTask > > m_stagedAlgorithmTasks
void initTaskBasedAlgorithm(const std::vector< std::string > &defaultTaskExecutionOrder={})
std::vector< std::shared_ptr< AlgorithmTask > > m_AlgorithmTasks
std::vector< std::string > m_taskExecutionOrder
void outputDebugWorkspace(const MatrixWorkspace_sptr &ws, const std::string &wsName, const std::string &wsSuffix, const int step)
void setMutableInput(const bool inputIsMutable)
virtual std::vector< std::string > constructTaskExecutionOrder()
void validateTaskExecutionOrder()
void configureAlgorithmTasks()
void stageAlgorithmTasks(std::vector< std::shared_ptr< AlgorithmTask > > tasks)
std::shared_ptr< Workspace > Workspace_sptr
shared pointer to Mantid::API::Workspace
std::shared_ptr< MatrixWorkspace > MatrixWorkspace_sptr
shared pointer to the matrix workspace base class
std::string to_string(const wide_integer< Bits, Signed > &n)