Skip to content

Commit

Permalink
Remove async history support
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Aug 15, 2023
1 parent 5ba6010 commit be5dc01
Show file tree
Hide file tree
Showing 127 changed files with 90 additions and 12,558 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ protected void copyProcessEngineProperties(ProcessEngineConfigurationImpl proces
// The job handlers will be added in the CmmnEngineConfiguration itself
cmmnEngineConfiguration.setAsyncHistoryEnabled(true);
cmmnEngineConfiguration.setAsyncHistoryExecutor(asyncHistoryExecutor);
cmmnEngineConfiguration.setAsyncHistoryJsonGroupingEnabled(processEngineConfiguration.isAsyncHistoryJsonGroupingEnabled());
cmmnEngineConfiguration.setAsyncHistoryJsonGroupingThreshold(processEngineConfiguration.getAsyncHistoryJsonGroupingThreshold());
cmmnEngineConfiguration.setAsyncHistoryJsonGzipCompressionEnabled(processEngineConfiguration.isAsyncHistoryJsonGzipCompressionEnabled());

cmmnEngineConfiguration.setAsyncHistoryTaskExecutor(processEngineConfiguration.getAsyncHistoryTaskExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.flowable.cmmn.api.runtime.PlanItemInstanceState;
import org.flowable.cmmn.engine.CmmnEngine;
import org.flowable.cmmn.engine.CmmnEngines;
import org.flowable.cmmn.engine.impl.history.async.CmmnAsyncHistoryConstants;
import org.flowable.cmmn.engine.test.impl.CmmnJobTestHelper;
import org.flowable.common.engine.api.async.AsyncTaskExecutor;
import org.flowable.engine.ProcessEngine;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.impl.history.async.HistoryJsonConstants;
import org.flowable.job.api.HistoryJob;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.HistoryJobService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.flowable.job.service.impl.persistence.entity.HistoryJobEntity;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -54,9 +53,6 @@ public void cleanup() {
.createDeploymentQuery()
.list()
.forEach(deployment -> cmmnEngine.getCmmnRepositoryService().deleteDeployment(deployment.getId(), true));
// Execute history jobs for the delete deployments
processEngine.getManagementService().createHistoryJobQuery().list()
.forEach(historyJob -> processEngine.getManagementService().executeHistoryJob(historyJob.getId()));

cmmnEngine.close();
processEngine.close();
Expand All @@ -79,66 +75,48 @@ public void testSharedAsyncHistoryExecutor() {
assertThat(processEngine.getProcessEngineConfiguration().getAsyncHistoryExecutor().getJobServiceConfiguration().getHistoryJobExecutionScope())
.isEqualTo(JobServiceConfiguration.JOB_EXECUTION_SCOPE_ALL);

// 2 job handlers / engine
assertThat(processEngineAsyncExecutor.getJobServiceConfiguration().getHistoryJobHandlers()).hasSize(4);

// Deploy and start test processes/cases
// Trigger one plan item instance to start the process
processEngine.getRepositoryService().createDeployment().addClasspathResource("org/flowable/cmmn/test/oneTaskProcess.bpmn20.xml").deploy();
cmmnEngine.getCmmnRepositoryService().createDeployment()
.addClasspathResource("org/flowable/cmmn/test/ProcessTaskTest.testOneTaskProcessNonBlocking.cmmn").deploy();
cmmnEngine.getCmmnRuntimeService().createCaseInstanceBuilder().caseDefinitionKey("myCase").start();
cmmnEngine.getCmmnRuntimeService().triggerPlanItemInstance(
cmmnEngine.getCmmnRuntimeService().createPlanItemInstanceQuery().planItemInstanceState(PlanItemInstanceState.ACTIVE).singleResult().getId());

// As async history is enabled, there should be no historical entries yet, but there should be history jobs
assertThat(cmmnEngine.getCmmnHistoryService().createHistoricCaseInstanceQuery().count()).isZero();
assertThat(processEngine.getHistoryService().createHistoricProcessInstanceQuery().count()).isZero();

// 3 history jobs expected:
// - one for the case instance start
// - one for the plan item instance trigger
// - one for the process instance start
assertThat(cmmnEngine.getCmmnManagementService().createHistoryJobQuery().count()).isEqualTo(3);
assertThat(processEngine.getManagementService().createHistoryJobQuery().count()).isEqualTo(3);

// Expected 2 jobs originating from the cmmn engine and 1 for the process engine
int cmmnHistoryJobs = 0;
int bpmnHistoryJobs = 0;
for (HistoryJob historyJob : cmmnEngine.getCmmnManagementService().createHistoryJobQuery().list()) {
if (historyJob.getJobHandlerType().equals(CmmnAsyncHistoryConstants.JOB_HANDLER_TYPE_DEFAULT_ASYNC_HISTORY)
|| historyJob.getJobHandlerType().equals(CmmnAsyncHistoryConstants.JOB_HANDLER_TYPE_DEFAULT_ASYNC_HISTORY_ZIPPED)) {
cmmnHistoryJobs++;
} else if (historyJob.getJobHandlerType().equals(HistoryJsonConstants.JOB_HANDLER_TYPE_DEFAULT_ASYNC_HISTORY)
|| historyJob.getJobHandlerType().equals(HistoryJsonConstants.JOB_HANDLER_TYPE_DEFAULT_ASYNC_HISTORY_ZIPPED)) {
bpmnHistoryJobs++;
}

// Execution scope should be all (see the CmmnEngineConfigurator)
assertThat(historyJob.getScopeType()).isEqualTo(JobServiceConfiguration.JOB_EXECUTION_SCOPE_ALL);
}
assertThat(bpmnHistoryJobs).isEqualTo(1);
assertThat(cmmnHistoryJobs).isEqualTo(2);
// 1 job handlers / engine
assertThat(processEngineAsyncExecutor.getJobServiceConfiguration().getHistoryJobHandlers())
.containsOnlyKeys("bpmn-test-history-job-handler", "cmmn-test-history-job-handler");

processEngine.getManagementService()
.executeCommand(commandContext -> {
HistoryJobService historyJobService = CommandContextUtil.getProcessEngineConfiguration(commandContext)
.getJobServiceConfiguration()
.getHistoryJobService();

HistoryJobEntity historyJob = historyJobService.createHistoryJob();
historyJob.setScopeType(JobServiceConfiguration.JOB_EXECUTION_SCOPE_ALL);
historyJob.setJobHandlerType("bpmn-test-history-job-handler");
historyJob.setRetries(3);
historyJob.setCreateTime(commandContext.getClock().getCurrentTime());
historyJobService.scheduleHistoryJob(historyJob);
return null;
});

cmmnEngine.getCmmnEngineConfiguration()
.getCommandExecutor()
.execute(commandContext -> {
HistoryJobService historyJobService = org.flowable.cmmn.engine.impl.util.CommandContextUtil.getCmmnEngineConfiguration(commandContext)
.getJobServiceConfiguration()
.getHistoryJobService();

HistoryJobEntity historyJob = historyJobService.createHistoryJob();
historyJob.setScopeType(JobServiceConfiguration.JOB_EXECUTION_SCOPE_ALL);
historyJob.setJobHandlerType("cmmn-test-history-job-handler");
historyJob.setRetries(3);
historyJob.setCreateTime(commandContext.getClock().getCurrentTime());
historyJobService.scheduleHistoryJob(historyJob);
return null;
});

assertThat(cmmnEngine.getCmmnManagementService().createHistoryJobQuery().count()).isEqualTo(2);
assertThat(processEngine.getManagementService().createHistoryJobQuery().count()).isEqualTo(2);

// Starting the async history executor should process all of these
CmmnJobTestHelper.waitForAsyncHistoryExecutorToProcessAllJobs(cmmnEngine.getCmmnEngineConfiguration(), 10000L, 200L, true);

assertThat(cmmnEngine.getCmmnHistoryService().createHistoricCaseInstanceQuery().count()).isEqualTo(1);
assertThat(processEngine.getHistoryService().createHistoricProcessInstanceQuery().count()).isEqualTo(1);
assertThat(cmmnEngine.getCmmnManagementService().createHistoryJobQuery().count()).isZero();
assertThat(processEngine.getManagementService().createHistoryJobQuery().count()).isZero();
}

@Test
public void testProcessAsyncHistoryNotChanged() {
// This test validates that the shared async history executor does not intervene when running a process regularly
processEngine.getRepositoryService().createDeployment().addClasspathResource("org/flowable/cmmn/test/oneTaskProcess.bpmn20.xml").deploy();
processEngine.getRuntimeService().startProcessInstanceByKey("oneTask");
assertThat(processEngine.getManagementService().createHistoryJobQuery().count()).isEqualTo(1);

HistoryJob historyJob = processEngine.getManagementService().createHistoryJobQuery().singleResult();
assertThat(historyJob.getJobHandlerType()).isEqualTo(HistoryJsonConstants.JOB_HANDLER_TYPE_DEFAULT_ASYNC_HISTORY);
processEngine.getManagementService().executeHistoryJob(historyJob.getId());
}

}
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.job.service.impl.history.async.transformer;

import java.util.List;
package org.flowable.cmmn.test;

import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.HistoryJobHandler;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.persistence.entity.HistoryJobEntity;

import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* @author Filip Hrisafov
*/
public class TestHistoryJobHandler implements HistoryJobHandler {

public interface HistoryJsonTransformer {

String FIELD_NAME_TYPE = "type";
String FIELD_NAME_DATA = "data";
protected final String type;

List<String> getTypes();
public TestHistoryJobHandler(String type) {
this.type = type;
}

boolean isApplicable(ObjectNode historicalData, CommandContext commandContext);
@Override
public String getType() {
return type;
}

void transformJson(HistoryJobEntity job, ObjectNode historicalData, CommandContext commandContext);
@Override
public void execute(HistoryJobEntity job, String configuration, CommandContext commandContext, JobServiceConfiguration jobServiceConfiguration) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@
</constructor-arg>
</bean>

<bean id="cmmnEngineConfiguration" class="org.flowable.cmmn.engine.CmmnEngineConfiguration">
<property name="customHistoryJobHandlers">
<list>
<bean class="org.flowable.cmmn.test.TestHistoryJobHandler">
<constructor-arg name="type" value="cmmn-test-history-job-handler" />
</bean>
</list>
</property>
</bean>

<bean id="cmmnEngineConfigurator" class="org.flowable.cmmn.engine.configurator.CmmnEngineConfigurator">
<property name="cmmnEngineConfiguration" ref="cmmnEngineConfiguration"/>
</bean>

<bean id="processEngineConfiguration" class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration">
<property name="dataSource" ref="dataSource"/>

Expand All @@ -34,16 +48,22 @@

<property name="asyncHistoryEnabled" value="true" />
<property name="asyncHistoryExecutorActivate" value="false" />
<property name="asyncHistoryJsonGroupingEnabled" value="true" />
<property name="asyncHistoryJsonGroupingThreshold" value="1" />

<property name="defaultFailedJobWaitTime" value="1" />
<property name="asyncFailedJobWaitTime" value="1" />
<property name="asyncHistoryExecutorDefaultAsyncJobAcquireWaitTime" value="100" />

<property name="configurators">
<list>
<bean class="org.flowable.cmmn.engine.configurator.CmmnEngineConfigurator" />
<ref bean="cmmnEngineConfigurator" />
</list>
</property>

<property name="customHistoryJobHandlers">
<list>
<bean class="org.flowable.cmmn.test.TestHistoryJobHandler">
<constructor-arg name="type" value="bpmn-test-history-job-handler" />
</bean>
</list>
</property>
</bean>
Expand Down
Loading

0 comments on commit be5dc01

Please sign in to comment.