diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 8b78903a5a14..1cd6ad21b72e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; @@ -90,6 +91,30 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu } } + protected void killActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + try { + LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); + workflowExecutionRunnable + .getWorkflowExecutionGraph() + .getActiveTaskExecutionRunnable() + .forEach(ITaskExecutionRunnable::kill); + } finally { + LogUtils.removeWorkflowInstanceIdMDC(); + } + } + + protected void pauseActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + try { + LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); + workflowExecutionRunnable + .getWorkflowExecutionGraph() + .getActiveTaskExecutionRunnable() + .forEach(ITaskExecutionRunnable::pause); + } finally { + LogUtils.removeWorkflowInstanceIdMDC(); + } + } + protected void onTaskFinish(final IWorkflowExecutionRunnable workflowExecutionRunnable, final ITaskExecutionRunnable taskExecutionRunnable) { final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java index c35d712bd355..8b1f393ffe63 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java @@ -74,7 +74,7 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStopLifecycleEvent workflowStopEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); - logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStopEvent); + super.killActiveTask(workflowExecutionRunnable); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index 914d05019184..2ef810aba452 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -18,10 +18,8 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent; @@ -64,15 +62,7 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR final WorkflowPauseLifecycleEvent workflowPauseEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); super.transformWorkflowInstanceState(workflowExecutionRunnable, WorkflowExecutionStatus.READY_PAUSE); - try { - LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); - workflowExecutionRunnable - .getWorkflowExecutionGraph() - .getActiveTaskExecutionRunnable() - .forEach(ITaskExecutionRunnable::pause); - } finally { - LogUtils.removeWorkflowInstanceIdMDC(); - } + super.pauseActiveTask(workflowExecutionRunnable); } @Override @@ -87,16 +77,7 @@ public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRu final WorkflowStopLifecycleEvent workflowStopEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); super.transformWorkflowInstanceState(workflowExecutionRunnable, WorkflowExecutionStatus.READY_STOP); - // do pause action - try { - LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); - workflowExecutionRunnable - .getWorkflowExecutionGraph() - .getActiveTaskExecutionRunnable() - .forEach(ITaskExecutionRunnable::kill); - } finally { - LogUtils.removeWorkflowInstanceIdMDC(); - } + super.killActiveTask(workflowExecutionRunnable); } @Override