Skip to content

Commit

Permalink
Surface blocking flag to the public interface to support async actions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jun-he committed Aug 7, 2024
1 parent f2672dd commit ad08175
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class StepInstanceActionHandler {
* the runtime DAG. Only allow existing a single non-terminal step attempt at any time, which must
* be the latest one.
*/
public RunResponse restart(RunRequest runRequest) {
public RunResponse restart(RunRequest runRequest, boolean blocking) {
if (!runRequest.isFreshRun()
&& runRequest.getCurrentPolicy() != RunPolicy.RESTART_FROM_SPECIFIC) {
updateRunRequestForRestartFromInlineRoot(runRequest);
}

RunResponse runResponse = actionHandler.restartRecursively(runRequest);
if (runResponse.getStatus() == RunResponse.Status.DELEGATED) {
return restartDirectly(runResponse, runRequest);
return restartDirectly(runResponse, runRequest, blocking);
}
return runResponse;
}
Expand Down Expand Up @@ -112,8 +112,9 @@ private void updateRunRequestForRestartFromInlineRoot(RunRequest runRequest) {
}

/** Directly restart a step without going to its ancestors. */
public RunResponse restartDirectly(RunResponse restartStepInfo, RunRequest runRequest) {
return actionDao.restartDirectly(restartStepInfo, runRequest, true);
public RunResponse restartDirectly(
RunResponse restartStepInfo, RunRequest runRequest, boolean blocking) {
return actionDao.restartDirectly(restartStepInfo, runRequest, blocking);
}

/** Bypasses the step dependencies. */
Expand Down Expand Up @@ -142,7 +143,12 @@ public StepInstanceActionResponse terminate(
}

public StepInstanceActionResponse skip(
String workflowId, long workflowInstanceId, String stepId, User user, RunRequest runRequest) {
String workflowId,
long workflowInstanceId,
String stepId,
User user,
RunRequest runRequest,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getWorkflowInstance(
workflowId, workflowInstanceId, Constants.LATEST_INSTANCE_RUN, true);
Expand Down Expand Up @@ -178,7 +184,7 @@ public StepInstanceActionResponse skip(
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP);
}

RunResponse runResponse = restart(runRequest);
RunResponse runResponse = restart(runRequest, blocking);
return runResponse.toStepInstanceActionResponse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRestartNewRun() {
.restartConfig(
RestartConfig.builder().addRestartNode("sample-minimal-wf", 1, "job1").build())
.build();
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand All @@ -104,7 +104,7 @@ public void testRestartNewAttempt() {
RunResponse runResponse =
RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build();
when(actionDao.restartDirectly(any(), any(), anyBoolean())).thenReturn(runResponse);
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testRestartFromInlineRootWithinForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testRestartFromInlineRootWithinNonForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal root",
IllegalArgumentException.class,
"instance [null] is in non-terminal state [IN_PROGRESS]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
WorkflowInstanceAggregatedInfo aggregatedInfo = mock(WorkflowInstanceAggregatedInfo.class);
Expand All @@ -250,14 +250,14 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal step",
IllegalArgumentException.class,
"step null[job1] is in non-terminal state [RUNNING]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.FATALLY_FAILED);
AssertHelper.assertThrows(
"Cannot restart from inline root for invalid restart path",
IllegalArgumentException.class,
"restart-path size is not 1",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));
}

@Test
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testSkipRunningStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
}

Expand Down Expand Up @@ -321,7 +321,7 @@ public void testSkipFailedStepInRunningInstance() {
.thenReturn(RunResponse.builder().status(RunResponse.Status.DELEGATED).build());
when(actionDao.restartDirectly(any(), eq(runRequest), eq(true)))
.thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true));
Expand All @@ -338,7 +338,7 @@ public void testSkipShouldWakeupStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
}

Expand Down Expand Up @@ -367,7 +367,7 @@ public void testSkipStoppedStepInStoppedInstance() {
.build();
when(actionHandler.restartRecursively(runRequest))
.thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true));
Expand All @@ -388,20 +388,20 @@ public void testInvalidSkip() {
"Cannot find status in aggregated step views",
NullPointerException.class,
"Invalid: cannot find the step view of workflow step ",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.NOT_CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] before it is created. Please try it again.",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] because it is unsupported by the step action map",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));
}
}

0 comments on commit ad08175

Please sign in to comment.