Skip to content

Commit

Permalink
Backport fix batch migration transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
tijsrademakers committed Apr 16, 2024
1 parent b4dfe8c commit cd3a086
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.flowable.cmmn.engine.impl.migration.CaseInstanceMigrationDocumentImpl;
import org.flowable.cmmn.engine.impl.migration.CaseInstanceMigrationManager;
import org.flowable.cmmn.engine.impl.util.CommandContextUtil;
import org.flowable.common.engine.api.FlowableBatchPartMigrationException;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
Expand Down Expand Up @@ -49,30 +52,46 @@ public void execute(JobEntity job, String configuration, VariableScope variableS
CaseInstanceMigrationDocument migrationDocument = CaseInstanceMigrationDocumentImpl.fromJson(
batch.getBatchDocumentJson(engineConfiguration.getEngineCfgKey()));

String exceptionMessage = null;
try {
migrationManager.migrateCaseInstance(batchPart.getScopeId(), migrationDocument, commandContext);
} catch (FlowableException e) {
exceptionMessage = e.getMessage();
String exceptionMessage = e.getMessage();

engineConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
CommandConfig commandConfig = engineConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
return engineConfiguration.getCommandExecutor().execute(commandConfig, new Command<Void>() {
@Override
public Void execute(CommandContext commandContext2) {
String resultAsJsonString = prepareResultAsJsonString(exceptionMessage);
batchService.completeBatchPart(batchPartId, CaseInstanceBatchMigrationResult.RESULT_FAIL, resultAsJsonString);

return null;
}
});
}
});

FlowableBatchPartMigrationException wrappedException = new FlowableBatchPartMigrationException(e.getMessage(), e);
wrappedException.setIgnoreFailedJob(true);
throw wrappedException;
}

String resultAsJsonString = prepareResultAsJsonString(exceptionMessage);

if (exceptionMessage != null) {
batchService.completeBatchPart(batchPartId, CaseInstanceBatchMigrationResult.RESULT_FAIL, resultAsJsonString);
} else {
batchService.completeBatchPart(batchPartId, CaseInstanceBatchMigrationResult.RESULT_SUCCESS, resultAsJsonString);
}
String resultAsJsonString = prepareResultAsJsonString();
batchService.completeBatchPart(batchPartId, CaseInstanceBatchMigrationResult.RESULT_SUCCESS, resultAsJsonString);
}

protected String prepareResultAsJsonString() {
ObjectNode objectNode = getObjectMapper().createObjectNode();
objectNode.put(BATCH_RESULT_STATUS_LABEL, CaseInstanceBatchMigrationResult.RESULT_SUCCESS);
return objectNode.toString();
}

protected static String prepareResultAsJsonString(String exceptionMessage) {
protected String prepareResultAsJsonString(String exceptionMessage) {
ObjectNode objectNode = getObjectMapper().createObjectNode();
if (exceptionMessage == null) {
objectNode.put(BATCH_RESULT_STATUS_LABEL, CaseInstanceBatchMigrationResult.RESULT_SUCCESS);
} else {
objectNode.put(BATCH_RESULT_STATUS_LABEL, CaseInstanceBatchMigrationResult.RESULT_FAIL);
objectNode.put(BATCH_RESULT_MESSAGE_LABEL, exceptionMessage);
}
objectNode.put(BATCH_RESULT_STATUS_LABEL, CaseInstanceBatchMigrationResult.RESULT_FAIL);
objectNode.put(BATCH_RESULT_MESSAGE_LABEL, exceptionMessage);
return objectNode.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public Batch batchMigrateCaseInstancesOfCaseDefinition(String caseDefinitionId,
job.setScopeType(ScopeTypes.CMMN);
job.setJobHandlerConfiguration(CaseInstanceMigrationJobHandler.getHandlerCfgForBatchPartId(batchPart.getId()));
jobService.createAsyncJob(job, false);
job.setRetries(0);
jobService.scheduleAsyncJob(job);
}

Expand All @@ -338,6 +339,7 @@ public Batch batchMigrateCaseInstancesOfCaseDefinition(String caseDefinitionId,
TimerJobEntity timerJob = timerJobService.createTimerJob();
timerJob.setJobType(JobEntity.JOB_TYPE_TIMER);
timerJob.setRevision(1);
timerJob.setRetries(0);
timerJob.setJobHandlerType(CaseInstanceMigrationStatusJobHandler.TYPE);
timerJob.setJobHandlerConfiguration(CaseInstanceMigrationJobHandler.getHandlerCfgForBatchId(batch.getId()));
timerJob.setScopeType(ScopeTypes.CMMN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.flowable.cmmn.api.runtime.CaseInstance;
import org.flowable.cmmn.api.runtime.PlanItemInstance;
import org.flowable.cmmn.api.runtime.PlanItemInstanceState;
import org.flowable.cmmn.engine.impl.job.CaseInstanceMigrationJobHandler;
import org.flowable.cmmn.engine.impl.job.CaseInstanceMigrationStatusJobHandler;
import org.flowable.cmmn.engine.impl.migration.CaseInstanceMigrationDocumentBuilderImpl;
import org.flowable.cmmn.engine.test.impl.CmmnHistoryTestHelper;
Expand Down Expand Up @@ -95,10 +96,16 @@ void testCaseInstanceBatchMigrationSuccess() {

for (CaseInstanceBatchMigrationPartResult part : migrationResult.getAllMigrationParts()) {
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_SUCCESS);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_SUCCESS);
}

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);

assertAfterMigrationState(caseInstance1, destinationDefinition, caseInstance1AfterMigration, 2);
assertAfterMigrationState(caseInstance2, destinationDefinition, caseInstance2AfterMigration, 2);
Expand Down Expand Up @@ -177,17 +184,105 @@ void testCaseInstanceBatchMigrationWithError() {

for (CaseInstanceBatchMigrationPartResult part : migrationResult.getAllMigrationParts()) {
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_FAIL);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_FAIL);
}

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);

assertAfterMigrationState(caseInstance1, caseDefinitionVersion1, caseInstance1AfterMigration, 1);
assertAfterMigrationState(caseInstance2, caseDefinitionVersion1, caseInstance2AfterMigration, 1);

cmmnManagementService.deleteBatch(batch.getId());
}

@Test
void testCaseInstanceBatchMigrationWithWrongMapping() {
// GIVEN
CaseDefinition caseDefinitionVersion1 = deployCaseDefinition("test1", "org/flowable/cmmn/test/migration/two-task.cmmn.xml");
CaseInstance caseInstance1 = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testCase").start();
CaseInstance caseInstance2 = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testCase").start();

CmmnDeployment deployment = cmmnRepositoryService.createDeployment()
.name("test1")
.addClasspathResource("org/flowable/cmmn/test/migration/two-task.cmmn.xml")
.deploy();

CaseDefinition caseDefinitionVersion2 = cmmnRepositoryService.createCaseDefinitionQuery()
.deploymentId(deployment.getId())
.singleResult();

CaseInstanceMigrationDocumentBuilder migrationDoc = new CaseInstanceMigrationDocumentBuilderImpl()
.setCaseDefinitionToMigrateTo(caseDefinitionVersion2.getId())
.addActivatePlanItemDefinitionMapping(PlanItemDefinitionMappingBuilder.createActivatePlanItemDefinitionMappingFor("nonExisting"));

Batch batch = cmmnMigrationService.createCaseInstanceMigrationBuilderFromCaseInstanceMigrationDocument(migrationDoc.build())
.batchMigrateCaseInstances(caseDefinitionVersion1.getId());

// assert created migration result and parts
assertThat(CmmnJobTestHelper.areJobsAvailable(cmmnManagementService)).isTrue();
CaseInstanceBatchMigrationResult migrationResultPriorProcessing = cmmnMigrationService.getResultsOfBatchCaseInstanceMigration(batch.getId());
assertThat(migrationResultPriorProcessing).isNotNull();
assertThat(migrationResultPriorProcessing.getBatchId()).isEqualTo(batch.getId());
assertThat(migrationResultPriorProcessing.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_IN_PROGRESS);
assertThat(migrationResultPriorProcessing.getCompleteTime()).isNull();
assertThat(migrationResultPriorProcessing.getAllMigrationParts()).hasSize(2);
assertThat(migrationResultPriorProcessing.getWaitingMigrationParts()).hasSize(2);
assertThat(migrationResultPriorProcessing.getSuccessfulMigrationParts()).isEmpty();
assertThat(migrationResultPriorProcessing.getFailedMigrationParts()).isEmpty();

assertThat(cmmnManagementService.createJobQuery().handlerType(CaseInstanceMigrationJobHandler.TYPE).list()).hasSize(2);

// WHEN
// Start async executor to process the batches
CmmnJobTestHelper.waitForJobExecutorToProcessAllAsyncJobs(cmmnEngineConfiguration, 5000L, 500L, true);
assertThat(CmmnJobTestHelper.areJobsAvailable(cmmnManagementService)).isFalse();

CaseInstance caseInstance1AfterMigration = cmmnRuntimeService.createCaseInstanceQuery()
.caseInstanceId(caseInstance1.getId())
.singleResult();
CaseInstance caseInstance2AfterMigration = cmmnRuntimeService.createCaseInstanceQuery()
.caseInstanceId(caseInstance2.getId())
.singleResult();

executeMigrationJobStatusHandlerTimerJob();

// THEN
CaseInstanceBatchMigrationResult migrationResult = cmmnMigrationService.getResultsOfBatchCaseInstanceMigration(batch.getId());
assertThat(migrationResult.getBatchId()).isEqualTo(batch.getId());
assertThat(migrationResult.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(migrationResult.getCompleteTime()).isNotNull();
assertThat(migrationResult.getAllMigrationParts()).hasSize(2);
assertThat(migrationResult.getWaitingMigrationParts()).isEmpty();
assertThat(migrationResult.getSuccessfulMigrationParts()).hasSize(0);
assertThat(migrationResult.getFailedMigrationParts()).hasSize(2);

for (CaseInstanceBatchMigrationPartResult part : migrationResult.getAllMigrationParts()) {
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_FAIL);
assertThat(part.getMigrationMessage()).contains("Cannot find plan item with definition id 'nonExisting'");
}

assertThat(cmmnManagementService.createJobQuery().handlerType(CaseInstanceMigrationJobHandler.TYPE).list()).hasSize(0);

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance1.getId()).list()).hasSize(0);

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance2.getId()).list()).hasSize(0);

assertAfterMigrationState(caseInstance1, caseDefinitionVersion1, caseInstance1AfterMigration, 1);
assertAfterMigrationState(caseInstance2, caseDefinitionVersion1, caseInstance2AfterMigration, 1);
cmmnManagementService.deleteBatch(batch.getId());
}

void assertAfterMigrationState(CaseInstance caseInstance, CaseDefinition destinationDefinition, CaseInstance caseInstanceAfterMigration,
int caseDefinitionVersion) {
assertThat(caseInstanceAfterMigration.getCaseDefinitionId()).isEqualTo(destinationDefinition.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.common.engine.api;

public class FlowableBatchPartMigrationException extends FlowableException {

private static final long serialVersionUID = 1L;

protected boolean ignoreFailedJob;

public FlowableBatchPartMigrationException(String message, Throwable cause) {
super(message, cause);
}

public FlowableBatchPartMigrationException(String message) {
super(message);
}

public boolean isIgnoreFailedJob() {
return ignoreFailedJob;
}

public void setIgnoreFailedJob(boolean ignoreFailedJob) {
this.ignoreFailedJob = ignoreFailedJob;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import org.flowable.batch.api.Batch;
import org.flowable.batch.api.BatchPart;
import org.flowable.batch.api.BatchService;
import org.flowable.common.engine.api.FlowableBatchPartMigrationException;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.migration.ProcessInstanceMigrationDocumentImpl;
Expand Down Expand Up @@ -48,30 +51,46 @@ public void execute(JobEntity job, String configuration, VariableScope variableS
Batch batch = batchService.getBatch(batchPart.getBatchId());
ProcessInstanceMigrationDocument migrationDocument = ProcessInstanceMigrationDocumentImpl.fromJson(batch.getBatchDocumentJson(processEngineConfiguration.getEngineCfgKey()));

String exceptionMessage = null;
try {
processInstanceMigrationManager.migrateProcessInstance(batchPart.getScopeId(), migrationDocument, commandContext);
} catch (FlowableException e) {
exceptionMessage = e.getMessage();
String exceptionMessage = e.getMessage();

processEngineConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
CommandConfig commandConfig = processEngineConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
return processEngineConfiguration.getCommandExecutor().execute(commandConfig, new Command<Void>() {
@Override
public Void execute(CommandContext commandContext2) {
String resultAsJsonString = prepareResultAsJsonString(exceptionMessage);
batchService.completeBatchPart(batchPartId, ProcessInstanceBatchMigrationResult.RESULT_FAIL, resultAsJsonString);

return null;
}
});
}
});

FlowableBatchPartMigrationException wrappedException = new FlowableBatchPartMigrationException(e.getMessage(), e);
wrappedException.setIgnoreFailedJob(true);
throw wrappedException;
}

String resultAsJsonString = prepareResultAsJsonString(exceptionMessage);

if (exceptionMessage != null) {
batchService.completeBatchPart(batchPartId, ProcessInstanceBatchMigrationResult.RESULT_FAIL, resultAsJsonString);
} else {
batchService.completeBatchPart(batchPartId, ProcessInstanceBatchMigrationResult.RESULT_SUCCESS, resultAsJsonString);
}
String resultAsJsonString = prepareResultAsJsonString();
batchService.completeBatchPart(batchPartId, ProcessInstanceBatchMigrationResult.RESULT_SUCCESS, resultAsJsonString);
}

protected String prepareResultAsJsonString() {
ObjectNode objectNode = getObjectMapper().createObjectNode();
objectNode.put(BATCH_RESULT_STATUS_LABEL, ProcessInstanceBatchMigrationResult.RESULT_SUCCESS);
return objectNode.toString();
}

protected static String prepareResultAsJsonString(String exceptionMessage) {
protected String prepareResultAsJsonString(String exceptionMessage) {
ObjectNode objectNode = getObjectMapper().createObjectNode();
if (exceptionMessage == null) {
objectNode.put(BATCH_RESULT_STATUS_LABEL, ProcessInstanceBatchMigrationResult.RESULT_SUCCESS);
} else {
objectNode.put(BATCH_RESULT_STATUS_LABEL, ProcessInstanceBatchMigrationResult.RESULT_FAIL);
objectNode.put(BATCH_RESULT_MESSAGE_LABEL, exceptionMessage);
}
objectNode.put(BATCH_RESULT_STATUS_LABEL, ProcessInstanceBatchMigrationResult.RESULT_FAIL);
objectNode.put(BATCH_RESULT_MESSAGE_LABEL, exceptionMessage);
return objectNode.toString();
}

Expand Down
Loading

0 comments on commit cd3a086

Please sign in to comment.