Skip to content

Commit

Permalink
[Improvement-16907][Master] Response for stop/pasue event when workfl…
Browse files Browse the repository at this point in the history
…ow instance statue is ready_stop/ready_pause (#16908)
  • Loading branch information
ruanwenjun authored Dec 25, 2024
1 parent dddf2dc commit 2284835
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
Expand Down Expand Up @@ -90,6 +91,30 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu
}
}

protected void killActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
workflowExecutionRunnable
.getWorkflowExecutionGraph()
.getActiveTaskExecutionRunnable()
.forEach(ITaskExecutionRunnable::kill);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
}
}

protected void pauseActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
workflowExecutionRunnable
.getWorkflowExecutionGraph()
.getActiveTaskExecutionRunnable()
.forEach(ITaskExecutionRunnable::pause);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
}
}

protected void onTaskFinish(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable) {
final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution
public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStopLifecycleEvent workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStopEvent);
super.killActiveTask(workflowExecutionRunnable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;

import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
Expand Down Expand Up @@ -64,15 +62,7 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR
final WorkflowPauseLifecycleEvent workflowPauseEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.transformWorkflowInstanceState(workflowExecutionRunnable, WorkflowExecutionStatus.READY_PAUSE);
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
workflowExecutionRunnable
.getWorkflowExecutionGraph()
.getActiveTaskExecutionRunnable()
.forEach(ITaskExecutionRunnable::pause);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
}
super.pauseActiveTask(workflowExecutionRunnable);
}

@Override
Expand All @@ -87,16 +77,7 @@ public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRu
final WorkflowStopLifecycleEvent workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.transformWorkflowInstanceState(workflowExecutionRunnable, WorkflowExecutionStatus.READY_STOP);
// do pause action
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
workflowExecutionRunnable
.getWorkflowExecutionGraph()
.getActiveTaskExecutionRunnable()
.forEach(ITaskExecutionRunnable::kill);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
}
super.killActiveTask(workflowExecutionRunnable);
}

@Override
Expand Down

0 comments on commit 2284835

Please sign in to comment.