diff --git a/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java b/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java index 85680dd40b9..2ad8eaf9c49 100644 --- a/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java +++ b/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java @@ -68,13 +68,14 @@ protected void handleJobDeleteInternal(Job job) { @Override protected void lockJobScopeInternal(Job job) { CaseInstanceEntityManager caseInstanceEntityManager = cmmnEngineConfiguration.getCaseInstanceEntityManager(); - String lockOwner; - Date lockExpirationTime; + String lockOwner = null; + Date lockExpirationTime = null; if (job instanceof JobInfoEntity) { lockOwner = ((JobInfoEntity) job).getLockOwner(); lockExpirationTime = ((JobInfoEntity) job).getLockExpirationTime(); - } else { + } + if (lockOwner == null || lockExpirationTime == null) { int lockMillis = cmmnEngineConfiguration.getAsyncExecutor().getAsyncJobLockTimeInMillis(); GregorianCalendar lockCal = new GregorianCalendar(); lockCal.setTime(cmmnEngineConfiguration.getClock().getCurrentTime()); diff --git a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java index 0033e232670..8fdb45a32c1 100644 --- a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java +++ b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java @@ -148,13 +148,15 @@ protected void lockJobScopeInternal(Job job) { ExecutionEntityManager executionEntityManager = getExecutionEntityManager(); ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId()); if (execution != null) { - String lockOwner; - Date lockExpirationTime; + String lockOwner = null; + Date lockExpirationTime = null; if (job instanceof JobInfoEntity) { lockOwner = ((JobInfoEntity) job).getLockOwner(); lockExpirationTime = ((JobInfoEntity) job).getLockExpirationTime(); - } else { + } + + if (lockOwner == null || lockExpirationTime == null) { int lockMillis = processEngineConfiguration.getAsyncExecutor().getAsyncJobLockTimeInMillis(); GregorianCalendar lockCal = new GregorianCalendar(); lockCal.setTime(processEngineConfiguration.getClock().getCurrentTime()); diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.java index 7d030dc3213..9ced07a67fa 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.java @@ -18,7 +18,10 @@ import org.flowable.engine.history.HistoricActivityInstance; import org.flowable.engine.impl.test.HistoryTestHelper; import org.flowable.engine.impl.test.PluggableFlowableTestCase; +import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.test.Deployment; +import org.flowable.job.service.impl.asyncexecutor.AsyncJobExecutorConfiguration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; @@ -57,4 +60,35 @@ public void testExclusiveJobs() { } + @Test + @Deployment + public void testParallelGatewayExclusiveJobs() { + ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder() + .processDefinitionKey("parallelExclusiveServiceTasks") + .variable("counter", 0L) + .start(); + + assertThat(runtimeService.getVariable(processInstance.getId(), "counter")) + .isEqualTo(0L); + + AsyncJobExecutorConfiguration asyncExecutorConfiguration = processEngineConfiguration.getAsyncExecutorConfiguration(); + boolean originalGlobalAcquireLockEnabled = asyncExecutorConfiguration.isGlobalAcquireLockEnabled(); + CollectingAsyncRunnableExecutionExceptionHandler executionExceptionHandler = new CollectingAsyncRunnableExecutionExceptionHandler(); + try { + asyncExecutorConfiguration.setGlobalAcquireLockEnabled(true); + processEngineConfiguration.getJobServiceConfiguration() + .getAsyncRunnableExecutionExceptionHandlers() + .add(0, executionExceptionHandler); + waitForJobExecutorToProcessAllJobs(15000, 200); + } finally { + asyncExecutorConfiguration.setGlobalAcquireLockEnabled(originalGlobalAcquireLockEnabled); + processEngineConfiguration.getJobServiceConfiguration() + .getAsyncRunnableExecutionExceptionHandlers() + .remove(executionExceptionHandler); + } + + assertThat(executionExceptionHandler.getExceptions()).isEmpty(); + assertThat(runtimeService.getVariable(processInstance.getId(), "counter")) + .isEqualTo(3L); + } } diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/CollectingAsyncRunnableExecutionExceptionHandler.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/CollectingAsyncRunnableExecutionExceptionHandler.java new file mode 100644 index 00000000000..b4ce7e761d1 --- /dev/null +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/bpmn/async/CollectingAsyncRunnableExecutionExceptionHandler.java @@ -0,0 +1,38 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.flowable.engine.test.bpmn.async; + +import java.util.ArrayList; +import java.util.Collection; + +import org.flowable.job.api.JobInfo; +import org.flowable.job.service.JobServiceConfiguration; +import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler; + +/** + * @author Filip Hrisafov + */ +public class CollectingAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler { + + protected final Collection exceptions = new ArrayList<>(); + + @Override + public boolean handleException(JobServiceConfiguration jobServiceConfiguration, JobInfo job, Throwable exception) { + exceptions.add(exception); + return false; + } + + public Collection getExceptions() { + return exceptions; + } +} diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/BulkUpdateJobLockTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/BulkUpdateJobLockTest.java index 2cfcb98a9d1..425c21ffc26 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/BulkUpdateJobLockTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/BulkUpdateJobLockTest.java @@ -157,7 +157,7 @@ public void testHistoryJobBulkUpdate() { return null; }); - for (Job job : managementService.createJobQuery().list()) { + for (HistoryJob job : managementService.createHistoryJobQuery().list()) { assertThat(((HistoryJobEntity) job).getLockOwner()).isEqualTo("test"); assertThat(((HistoryJobEntity) job).getLockExpirationTime()).isNotNull(); } diff --git a/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.testParallelGatewayExclusiveJobs.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.testParallelGatewayExclusiveJobs.bpmn20.xml new file mode 100755 index 00000000000..f483f0133b4 --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/bpmn/async/AsyncExclusiveJobsTest.testParallelGatewayExclusiveJobs.bpmn20.xml @@ -0,0 +1,186 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java index cac4dd00c2a..8f48ef89190 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java @@ -332,7 +332,12 @@ public void unacquire(JobInfo job) { if (job instanceof HistoryJob) { - HistoryJobEntity jobEntity = (HistoryJobEntity) job; + HistoryJobEntity jobEntity = jobServiceConfiguration.getHistoryJobEntityManager().findById(job.getId()); + if (jobEntity == null) { + LOGGER.debug("History Job {} does not exist anymore and will not be unacquired. It has most likely been deleted " + + "e.g. as part of another concurrent part of a process / case instance.", job.getId()); + return; + } HistoryJobEntity newJobEntity = jobServiceConfiguration.getHistoryJobEntityManager().create(); copyHistoryJobInfo(newJobEntity, jobEntity); @@ -348,7 +353,13 @@ public void unacquire(JobInfo job) { // will avoid that the job is immediately is picked up again (for example // when doing lots of exclusive jobs for the same process instance) - JobEntity jobEntity = (JobEntity) job; + JobEntity jobEntity = jobServiceConfiguration.getJobEntityManager().findById(job.getId()); + + if (jobEntity == null) { + LOGGER.debug("Async Job {} does not exist anymore and will not be unacquired. It has most likely been deleted " + + "e.g. as part of another concurrent part of a process / case instance.", job.getId()); + return; + } JobEntity newJobEntity = jobServiceConfiguration.getJobEntityManager().create(); copyJobInfo(newJobEntity, jobEntity); @@ -363,7 +374,13 @@ public void unacquire(JobInfo job) { // as the chance of failure will be high. } else if (job instanceof ExternalWorkerJobEntity) { - ExternalWorkerJobEntity jobEntity = (ExternalWorkerJobEntity) job; + ExternalWorkerJobEntity jobEntity = jobServiceConfiguration.getExternalWorkerJobEntityManager().findById(job.getId()); + + if (jobEntity == null) { + LOGGER.debug("External Worker Job {} does not exist anymore and will not be unacquired. It has most likely been deleted " + + "e.g. as part of another concurrent part of a process / case instance.", job.getId()); + return; + } ExternalWorkerJobEntity newJobEntity = jobServiceConfiguration.getExternalWorkerJobEntityManager().create(); copyJobInfo(newJobEntity, jobEntity);