Skip to content

Commit

Permalink
Do not delete and create new jobs when unacquiring and make sure that…
Browse files Browse the repository at this point in the history
… cleaning up execution deletes jobs in bulk without revision check
  • Loading branch information
filiphr authored and tijsrademakers committed Nov 15, 2023
1 parent c0241ce commit 6bdee37
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.engine.test.bpmn.async;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.interceptor.CommandInvoker;
import org.flowable.engine.impl.jobexecutor.AsyncContinuationJobHandler;
import org.flowable.engine.impl.jobexecutor.ParallelMultiInstanceActivityCompletionJobHandler;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.test.Deployment;
import org.flowable.engine.test.impl.CustomConfigurationFlowableTestCase;
import org.flowable.job.api.FlowableUnrecoverableJobException;
import org.flowable.job.api.Job;
import org.flowable.job.service.impl.cmd.LockExclusiveJobCmd;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.junit.jupiter.api.Test;

/**
* @author Filip Hrisafov
*/
class ParallelMultiInstanceAsyncNonExclusiveTest extends CustomConfigurationFlowableTestCase {

protected CustomCommandInvoker customCommandInvoker;
protected CustomEventListener customEventListener;
protected CollectingAsyncRunnableExecutionExceptionHandler executionExceptionHandler;

public ParallelMultiInstanceAsyncNonExclusiveTest() {
super("parallelMultiInstanceAsyncNonExclusiveTest");
}

@Override
protected void configureConfiguration(ProcessEngineConfigurationImpl processEngineConfiguration) {
customCommandInvoker = new CustomCommandInvoker();
processEngineConfiguration.setCommandInvoker(customCommandInvoker);
processEngineConfiguration.getAsyncExecutorConfiguration().setGlobalAcquireLockEnabled(true);
executionExceptionHandler = new CollectingAsyncRunnableExecutionExceptionHandler();
processEngineConfiguration.setCustomAsyncRunnableExecutionExceptionHandlers(Collections.singletonList(executionExceptionHandler));
customEventListener = new CustomEventListener();
processEngineConfiguration.setEventListeners(Collections.singletonList(customEventListener));

}

@Test
@Deployment
public void parallelMultiInstanceNonExclusiveJobs() {
// This test is trying to cause an optimistic locking exception when using non-exclusive parallel multi instance jobs.
// This is mimicking the following scenario:
// 4 async jobs complete in the same time, and thus they create 4 parallel-multi-instance-complete exclusive jobs
// 3 of those jobs will fail to get the exclusive lock and unacquire their jobs and 1 will get the lock
// the one that will get the lock will continue to the next step of the process and perform the multi instance cleanup
// the cleanup of the multi instance should not fail.

ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("parallelScriptTask")
.start();

List<Job> jobs = managementService.createJobQuery().list();
assertThat(jobs).hasSize(4);
assertThat(jobs)
.extracting(Job::getJobHandlerType)
.containsOnly(AsyncContinuationJobHandler.TYPE);
customCommandInvoker.lockExclusiveCounter = new AtomicLong(0L);

customCommandInvoker.executeLockReleaseLatch = new CountDownLatch(1);
customEventListener.parallelMultiInstanceCompleteLatch = customCommandInvoker.executeLockReleaseLatch;

customCommandInvoker.executeAsyncRunnableLatch = new CountDownLatch(4);
customEventListener.asyncContinuationLatch = new CountDownLatch(4);

customCommandInvoker.executeLockCountLatch = new CountDownLatch(3);
customEventListener.parallelMultiInstanceWaitCompleteLatch = customCommandInvoker.executeLockCountLatch;

waitForJobExecutorToProcessAllJobs(15_000, 200);

assertThat(executionExceptionHandler.getExceptions()).isEmpty();
assertThat(managementService.createJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty();
assertThat(managementService.createDeadLetterJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty();
}

protected static class CustomCommandInvoker extends CommandInvoker {

protected AtomicLong lockExclusiveCounter = new AtomicLong();
protected CountDownLatch executeLockCountLatch;
protected CountDownLatch executeLockReleaseLatch;
protected CountDownLatch executeAsyncRunnableLatch;

protected CustomCommandInvoker() {
super(((commandContext, runnable) -> runnable.run()), null);
}

@Override
public <T> T execute(CommandConfig config, Command<T> command, CommandExecutor commandExecutor) {
if (command instanceof LockExclusiveJobCmd) {
if (lockExclusiveCounter.incrementAndGet() > 1) {
// We let the first exclusive to run without waiting
// we then wait to complete this transaction until the execute lock exclusive is released
try {
executeLockCountLatch.countDown();
executeLockReleaseLatch.await(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
return super.execute(config, command, commandExecutor);
}
}

protected static class CustomEventListener implements FlowableEventListener {

protected CountDownLatch asyncContinuationLatch;
protected CountDownLatch parallelMultiInstanceCompleteLatch;
protected CountDownLatch parallelMultiInstanceWaitCompleteLatch;

@Override
public void onEvent(FlowableEvent event) {
if (FlowableEngineEventType.JOB_EXECUTION_SUCCESS.equals(event.getType()) && event instanceof FlowableEntityEvent) {
JobEntity entity = (JobEntity) ((FlowableEntityEvent) event).getEntity();
String jobHandlerType = entity.getJobHandlerType();
if (AsyncContinuationJobHandler.TYPE.equals(jobHandlerType)) {
// We are going to wait for all the async jobs to complete in the same time
asyncContinuationLatch.countDown();
try {
if (!asyncContinuationLatch.await(4, TimeUnit.SECONDS)) {
throw new FlowableUnrecoverableJobException("asyncContinuationLatch did not reach 0");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} else if (ParallelMultiInstanceActivityCompletionJobHandler.TYPE.equals(jobHandlerType)) {
// There will be one multi instance complete job, so we count it down to release the rest of the lock exclusive commands
parallelMultiInstanceCompleteLatch.countDown();

try {
// Wait for the rest of the lock exclusive commands to complete before resuming this transaction
if (!parallelMultiInstanceWaitCompleteLatch.await(4, TimeUnit.SECONDS)) {
throw new FlowableUnrecoverableJobException("parallelMultiInstanceWaitLatch did not reach 0");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

}

}

@Override
public boolean isFailOnException() {
return true;
}

@Override
public boolean isFireOnTransactionLifecycleEvent() {
return false;
}

@Override
public String getOnTransaction() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ public void testResetExpiredJobTimeout() {
managementService.executeCommand(new ResetExpiredJobsCmd(jobIds, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
assertThat(managementService.executeCommand(new FindExpiredJobsCmd(expiredJobsPagesSize, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration))).isEmpty();

assertThat(managementService.createJobQuery().jobId(job.getId()).singleResult()).isNull();
assertThat(managementService.createJobQuery().singleResult()).isNotNull();
JobEntity jobAfterExpiry = (JobEntity) managementService.createJobQuery().singleResult();
assertThat(jobAfterExpiry).isNotNull();
assertThat(jobAfterExpiry.getId()).isEqualTo(job.getId());
assertThat(jobAfterExpiry.getLockExpirationTime()).isNull();
assertThat(jobAfterExpiry.getLockOwner()).isNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:flowable="http://flowable.org/bpmn"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:design="http://flowable.org/design" typeLanguage="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL http://www.omg.org/spec/BPMN/2.0/20100501/BPMN20.xsd"
expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://flowable.org/test" design:palette="flowable-engage-process-palette">
<process id="parallelScriptTask" name="Parallel Script Task" isExecutable="true" flowable:candidateStarterGroups="flowableUser">
<extensionElements>
<design:stencilid><![CDATA[BPMNDiagram]]></design:stencilid>
<design:creationdate><![CDATA[2023-11-14T09:15:53.641Z]]></design:creationdate>
<design:modificationdate><![CDATA[2023-11-14T09:16:52.191Z]]></design:modificationdate>
</extensionElements>
<scriptTask id="bpmnTask_1" name="Script task" flowable:async="true" flowable:exclusive="false" scriptFormat="groovy"
flowable:autoStoreVariables="false">
<extensionElements>
<design:stencilid><![CDATA[ScriptTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
<multiInstanceLoopCharacteristics isSequential="false">
<extensionElements></extensionElements>
<loopCardinality>4</loopCardinality>
</multiInstanceLoopCharacteristics>
<script><![CDATA[println "Executing Test"]]></script>
</scriptTask>
<userTask id="bpmnTask_5" name="User task" flowable:assignee="${initiator}" flowable:formFieldValidation="false">
<extensionElements>
<flowable:task-candidates-type><![CDATA[all]]></flowable:task-candidates-type>
<design:stencilid><![CDATA[FormTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</userTask>
<startEvent id="startnoneevent1" flowable:initiator="initiator" flowable:formFieldValidation="false">
<extensionElements>
<flowable:work-form-field-validation><![CDATA[false]]></flowable:work-form-field-validation>
<design:stencilid><![CDATA[StartNoneEvent]]></design:stencilid>
</extensionElements>
</startEvent>
<endEvent id="bpmnEndEvent_7">
<extensionElements>
<design:stencilid><![CDATA[EndNoneEvent]]></design:stencilid>
</extensionElements>
</endEvent>
<sequenceFlow id="bpmnSequenceFlow_6" sourceRef="bpmnTask_1" targetRef="bpmnTask_5">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_8" sourceRef="bpmnTask_5" targetRef="bpmnEndEvent_7">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_2" sourceRef="startnoneevent1" targetRef="bpmnTask_1">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
</process>
<bpmndi:BPMNDiagram id="BPMNDiagram_parallelScriptTask">
<bpmndi:BPMNPlane bpmnElement="parallelScriptTask" id="BPMNPlane_parallelScriptTask">
<bpmndi:BPMNShape bpmnElement="bpmnTask_1" id="BPMNShape_bpmnTask_1">
<omgdc:Bounds height="80.0" width="100.0" x="395.0" y="236.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_5" id="BPMNShape_bpmnTask_5">
<omgdc:Bounds height="80.0" width="100.0" x="545.0" y="236.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="startnoneevent1" id="BPMNShape_startnoneevent1">
<omgdc:Bounds height="30.0" width="30.0" x="315.0" y="261.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnEndEvent_7" id="BPMNShape_bpmnEndEvent_7">
<omgdc:Bounds height="28.0" width="28.0" x="695.0" y="262.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_2" id="BPMNEdge_bpmnSequenceFlow_2" flowable:sourceDockerX="15.0" flowable:sourceDockerY="15.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="345.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="395.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_8" id="BPMNEdge_bpmnSequenceFlow_8" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="14.0" flowable:targetDockerY="14.0">
<omgdi:waypoint x="645.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="695.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_6" id="BPMNEdge_bpmnSequenceFlow_6" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="495.0" y="276.0"></omgdi:waypoint>
<omgdi:waypoint x="545.0" y="276.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.common.engine.impl.persistence.entity.ByteArrayRef;
import org.flowable.job.api.DeadLetterJobQuery;
import org.flowable.job.api.HistoryJobQuery;
import org.flowable.job.api.JobQuery;
import org.flowable.job.api.SuspendedJobQuery;
import org.flowable.job.api.TimerJobQuery;
import org.flowable.job.service.InternalJobManager;
import org.flowable.job.service.JobService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
Expand Down Expand Up @@ -200,13 +203,30 @@ public void deleteJob(JobEntity job) {
public void deleteJobsByExecutionId(String executionId) {
JobEntityManager jobEntityManager = getJobEntityManager();
Collection<JobEntity> jobsForExecution = jobEntityManager.findJobsByExecutionId(executionId);
if (jobsForExecution.isEmpty()) {
return;
}

InternalJobManager internalJobManager = configuration.getInternalJobManager();
FlowableEventDispatcher eventDispatcher = getEventDispatcher();
boolean eventDispatcherEnabled = eventDispatcher != null && eventDispatcher.isEnabled();
for (JobEntity job : jobsForExecution) {
getJobEntityManager().delete(job);
if (getEventDispatcher() != null && getEventDispatcher().isEnabled()) {
getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
if (internalJobManager != null) {
internalJobManager.handleJobDelete(job);
}

deleteByteArrayRef(job.getExceptionByteArrayRef());
deleteByteArrayRef(job.getCustomValuesByteArrayRef());

if (eventDispatcherEnabled) {
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
FlowableEngineEventType.ENTITY_DELETED, job), configuration.getEngineName());
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
FlowableEngineEventType.JOB_CANCELED, job), configuration.getEngineName());
}
}

jobEntityManager.deleteJobsByExecutionId(executionId);
}

@Override
Expand Down Expand Up @@ -235,4 +255,10 @@ public void deleteDeadLetterJobsByExecutionId(String executionId) {
}
}

protected void deleteByteArrayRef(ByteArrayRef jobByteArrayRef) {
if (jobByteArrayRef != null) {
jobByteArrayRef.delete(configuration.getEngineName());
}
}

}
Loading

0 comments on commit 6bdee37

Please sign in to comment.