Skip to content

Commit

Permalink
resume subtasks
Browse files Browse the repository at this point in the history
  • Loading branch information
snagasawa committed Dec 21, 2023
1 parent 8147ce2 commit 25391a7
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashSet;
import java.util.ArrayList;
import java.util.stream.Collectors;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.*;
Expand Down Expand Up @@ -182,6 +183,27 @@ private static long addTasks(TaskControlStore store,
firstTask = false;
}

Map<String, Long> taskNameAndIds = tasks.stream()
.collect(Collectors.toMap(
WorkflowTask::getFullName,
task -> indexToId.get(tasks.indexOf(task))
));

resumingTasks
.stream()
.filter(resumingTask -> !taskNameAndIds.keySet().contains(resumingTask.getFullName())
&& resumingTask.getFullName().endsWith("^sub"))
.forEach(resumingTask -> {
String parentTaskName = resumingTask.getFullName().replaceAll("\\^sub$", "");

store.addResumedSubtask(attemptId,
taskNameAndIds.get(parentTaskName),
resumingTask.getTaskType(),
TaskStateCode.SUCCESS,
(isInitialTask ? TaskStateFlags.empty().withInitialTask() : TaskStateFlags.empty()),
resumingTask);
});

return rootTaskId;
}

Expand Down

0 comments on commit 25391a7

Please sign in to comment.