Skip to content

Commit

Permalink
Merge branch 'main' into flowable-release-7.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Aug 15, 2023
2 parents 3a97c0a + be5dc01 commit 44db5a2
Show file tree
Hide file tree
Showing 254 changed files with 713 additions and 16,181 deletions.
3 changes: 0 additions & 3 deletions distro/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,6 @@
<fileset dir="../modules/flowable-jmx/target">
<include name="flowable-jmx-*.jar" />
</fileset>
<fileset dir="../modules/flowable-jms-spring-executor/target">
<include name="flowable-jms-spring-executor-*.jar" />
</fileset>
</copy>

</target>
Expand Down
1 change: 0 additions & 1 deletion distro/src/notice.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ flowable-form-api<version><.jar | -source.jar | -javadoc.jar>
flowable-form-model<version><.jar | -source.jar | -javadoc.jar>
flowable-http<version><.jar | -source.jar | -javadoc.jar>
flowable-image-generator<version><.jar | -source.jar | -javadoc.jar>
flowable-jms-spring-executor<version><.jar | -source.jar | -javadoc.jar>
flowable-jmx<version><.jar | -source.jar | -javadoc.jar>
flowable-ldap<version><.jar | -source.jar | -javadoc.jar>
flowable-osgi<version><.jar | -source.jar | -javadoc.jar>
Expand Down
2 changes: 1 addition & 1 deletion docs/docusaurus/docs/bpmn/ch03-Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ The managed implementations fall back to their default counterparts if the threa

## Job executor activation

The AsyncExecutor is a component that manages a thread pool to fire timers and other asynchronous tasks. Other implementations are possible (for example using a message queue, see the advanced section of the user guide).
The AsyncExecutor is a component that manages a thread pool to fire timers and other asynchronous tasks.

By default, the AsyncExecutor is not activated and not started. With the following configuration the async executor can be started together with the Flowable Engine.

Expand Down
2 changes: 1 addition & 1 deletion docs/docusaurus/docs/bpmn/ch04-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ There are two ways of querying data from the engine: the query API and native qu
Sometimes you need more powerful queries, for example, queries using an OR operator or restrictions you cannot express using the Query API. For these cases, we have native queries, which allow you to write your own SQL queries. The return type is defined by the Query object you use and the data is mapped into the correct objects (Task, ProcessInstance, Execution, …​). Since the query will be fired at the database you have to use table and column names as they are defined in the database; this requires some knowledge about the internal data structure and it is recommended to use native queries with care. The table names can be retrieved through the API to keep the dependency as small as possible.

List<Task> tasks = taskService.createNativeTaskQuery()
.sql("SELECT count(*) FROM " + managementService.getTableName(Task.class) +
.sql("SELECT * FROM " + managementService.getTableName(Task.class) +
" T WHERE T.NAME_ = #{taskName}")
.parameter("taskName", "gonzoTask")
.list();
Expand Down
2 changes: 0 additions & 2 deletions docs/docusaurus/docs/bpmn/ch11-History.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ When the history data is not to be persisted in the default history tables, but

- Or, configure the customHistoryJobHandlers list with all instances will be added to the historyJobHandlers map at boot time.

Alternatively, it is also possible to use a Message Queue and configure the engine in such a way that a message will be sent when a new history job is available. This way, the historical data can be processed on different servers to where the engines are run. It’s also possible to configure the engine and Message Queue using JTA (when using JMS) and not store the historical data in a job, but send it all data to a Message Queue that participates in a global transaction.

See [the Flowable Async History Examples](https://github.com/flowable/flowable-examples/tree/master/async-history) for various examples on how to configure the Async History, including the default way, using a JMS queue, using JTA or using a Message Queue and a Spring Boot application that acts as a message listener.

## History for audit purposes
Expand Down
120 changes: 0 additions & 120 deletions docs/docusaurus/docs/bpmn/ch18-Advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,126 +130,6 @@ The following properties are available on the process engine configuration throu
</tbody>
</table>

### Message Queue based Async Executor

When reading the [async executor design section](bpmn/ch18-Advanced.md#async-executor-design), it becomes clear that the architecture is inspired by message queues. The async executor is designed in such a way that a message queue can easily be used to take over the job of the thread pool and the handling of async jobs.

Benchmarks have shown that using a message queue is superior, throughput-wise, to the thread pool-backed async executor. However, it does come with an extra architectural component, which of course makes setup, maintenance and monitoring more complex. For many users, the performance of the thread pool-backed async executor is more than sufficient. It is nice to know however, that there is an alternative if the required performance grows.

Currently, the only option that is supported out-of-the-box is JMS with Spring. The reason for supporting Spring before anything else is because Spring has some very nice features that ease a lot of the pain when it comes to threading and dealing with multiple message consumers. However, the integration is so simple, that it can easily be ported to any message queue implementation or protocol (Stomp, AMPQ, and so on). Feedback is appreciated for what should be the next implementation.

When a new async job is created by the engine, a message is put on a message queue (in a transaction committed transaction listener, so we’re sure the job entry is in the database) containing the job identifier. A message consumer then takes this job identifier to fetch the job, and execute the job. The async executor will not create a thread pool anymore. It will insert and query for timers from a separate thread. When a timer fires, it is moved to the async job table, which now means a message is sent to the message queue too. The 'reset expired' thread will also unlock jobs as usual, as message queues can fail too. Instead of 'unlocking' a job, a message will now be resent. The async executor will not poll for async jobs anymore.

The implementation consists of two classes:

- An implementation of the *org.flowable.engine.impl.asyncexecutor.JobManager* interface that puts a message on a message queue instead of passing it to the thread pool.

- A *jakarta.jms.MessageListener* implementation that consumes a message from the message queue, using the job identifier in the message to fetch and execute the job.

First of all, add the *flowable-jms-spring-executor* dependency to your project:

<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>${flowable.version}</version>
</dependency>

To enable the message queue based async executor, in the process engine configuration, the following needs to be done:

- *asyncExecutorActivate* must be set to *true*, as usual

- *asyncExecutorMessageQueueMode* needs to be set to *true*

- The *org.flowable.spring.executor.jms.MessageBasedJobManager* must be injected as *JobManager*

Below is a complete example of a Java based configuration, using *ActiveMQ* as the message queue broker.

Some things to note:

- The *MessageBasedJobManager* expects a *JMSTemplate* to be injected that is configured with a correct *connectionFactory*.

- We’re using the *MessageListenerContainer* concept from Spring, as this simplifies threading and multiple consumers a lot.

<!-- -->

@Configuration
public class SpringJmsConfig {

@Bean
public DataSource dataSource() {
// Omitted
}

@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}

@Bean
public SpringProcessEngineConfiguration processEngineConfiguration(DataSource dataSource, PlatformTransactionManager transactionManager,
JobManager jobManager) {
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource);
configuration.setTransactionManager(transactionManager);
configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
configuration.setAsyncExecutorMessageQueueMode(true);
configuration.setAsyncExecutorActivate(true);
configuration.setJobManager(jobManager);
return configuration;
}

@Bean
public ProcessEngine processEngine(ProcessEngineConfiguration processEngineConfiguration) {
return processEngineConfiguration.buildProcessEngine();
}

@Bean
public MessageBasedJobManager jobManager(JmsTemplate jmsTemplate) {
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate);
return jobManager;
}

@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
return new CachingConnectionFactory(activeMQConnectionFactory);
}

@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory);
return jmsTemplate;
}

@Bean
public MessageListenerContainer messageListenerContainer(JobMessageListener jobMessageListener) {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-jobs");
messageListenerContainer.setMessageListener(jobMessageListener);
messageListenerContainer.setConcurrentConsumers(2);
messageListenerContainer.start();
return messageListenerContainer;
}

@Bean
public JobMessageListener jobMessageListener(ProcessEngineConfiguration processEngineConfiguration) {
JobMessageListener jobMessageListener = new JobMessageListener();
jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration);
return jobMessageListener;
}

}

In the code above, the *JobMessageListener* and *MessageBasedJobManager* are the only classes from the *flowable-jms-spring-executor* module. All the other code is from Spring. As such, when wanting to port this to other queues/protocols, these classes must be ported.

## Hooking into process parsing

A BPMN 2.0 XML needs to be parsed to the Flowable internal model to be executed on the Flowable engine. This parsing happens during a deployment of the process or when a process is not found in memory, and the XML is fetched from the database.
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/src/zh_CN/bpmn/ch04-API.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ List<Task> tasks = taskService.createTaskQuery()
[source,java,linenums]
----
List<Task> tasks = taskService.createNativeTaskQuery()
.sql("SELECT count(*) FROM " + managementService.getTableName(Task.class) +
.sql("SELECT * FROM " + managementService.getTableName(Task.class) +
" T WHERE T.NAME_ = #{taskName}")
.parameter("taskName", "gonzoTask")
.list();
Expand Down
5 changes: 0 additions & 5 deletions modules/flowable-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,6 @@
<artifactId>flowable-image-generator</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jmx</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,6 @@ public IndentingXMLStreamWriter(XMLStreamWriter writer) {
super(writer);
}

/**
* Return the current indent step.
*
* <p>
* Return the current indent step: each start tag will be indented by this number of spaces times the number of ancestors that the element has.
* </p>
*
* @return The number of spaces in each indentation step, or 0 or less for no indentation.
* @see #setIndentStep(int)
*
* @deprecated Only return the length of the indent string.
*/
@Deprecated
public int getIndentStep() {
return indentStep.length();
}

/**
* Set the current indent step.
*
* @param indentStep
* The new indent step (0 or less for no indentation).
* @see #getIndentStep()
*
* @deprecated Should use the version that takes string.
*/
@Deprecated
public void setIndentStep(int indentStep) {
StringBuilder s = new StringBuilder();
for (; indentStep > 0; indentStep--) {
s.append(' ');
}
setIndentStep(s.toString());
}

public void setIndentStep(String s) {
this.indentStep = s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,6 @@ public interface CmmnRepositoryService {
*/
List<DmnDecision> getDecisionsForCaseDefinition(String caseDefinitionId);

/**
* Retrieves the {@link DmnDecision}s associated with the given case definition.
*
* @param caseDefinitionId
* id of the case definition, cannot be null.
*
* @deprecated replaced by getDecisionsForCaseDefinition(String caseDefinition)
*
*/
@Deprecated
List<DmnDecision> getDecisionTablesForCaseDefinition(String caseDefinitionId);

/**
* Retrieves the {@link FormDefinition}s associated with the given case definition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,8 @@

public interface DecisionTableVariableManager {

@Deprecated
void setVariablesOnPlanItemInstance(List<Map<String, Object>> executionResult, String decisionKey, PlanItemInstance planItemInstance, ObjectMapper objectMapper);
void setVariablesOnPlanItemInstance(List<Map<String, Object>> decisionResult, String externalRef, PlanItemInstance planItemInstance, ObjectMapper objectMapper, boolean multipleResults);

@Deprecated
void setDecisionServiceVariablesOnExecution(Map<String, List<Map<String, Object>>> executionResult, String decisionKey, PlanItemInstance planItemInstance, ObjectMapper objectMapper);

default void setVariablesOnPlanItemInstance(List<Map<String, Object>> decisionResult, String externalRef, PlanItemInstance planItemInstance, ObjectMapper objectMapper, boolean multipleResults) {
setVariablesOnPlanItemInstance(decisionResult, externalRef, planItemInstance, objectMapper);
}

default void setDecisionServiceVariablesOnPlanItemInstance(Map<String, List<Map<String, Object>>> executionResult, String decisionKey, PlanItemInstance planItemInstance, ObjectMapper objectMapper, boolean multipleResults) {
setDecisionServiceVariablesOnExecution(executionResult, decisionKey, planItemInstance, objectMapper);
}
void setDecisionServiceVariablesOnPlanItemInstance(Map<String, List<Map<String, Object>>> executionResult, String decisionKey, PlanItemInstance planItemInstance, ObjectMapper objectMapper, boolean multipleResults);

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ public interface HistoricCaseInstanceQuery extends Query<HistoricCaseInstanceQue
*/
HistoricCaseInstanceQuery includeCaseVariables();

/**
* Limit historic case instance variables
* @deprecated no longer needed, this is a noop
*/
@Deprecated
HistoricCaseInstanceQuery limitCaseVariables(Integer historicCaseVariablesLimit);

/**
* Only select historic case instances that are defined by a case definition with the given deployment identifier.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,6 @@ public interface CaseInstanceQuery extends Query<CaseInstanceQuery, CaseInstance
*/
CaseInstanceQuery endOr();

/**
* Limit case instance variables
* @deprecated no longer needed, this is a noop
*/
@Deprecated
CaseInstanceQuery limitCaseInstanceVariables(Integer caseInstanceVariablesLimit);

/**
* Localize case name to specified locale.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,6 @@ public IndentingXMLStreamWriter(XMLStreamWriter writer) {
super(writer);
}

/**
* Return the current indent step.
*
* <p>
* Return the current indent step: each start tag will be indented by this number of spaces times the number of ancestors that the element has.
* </p>
*
* @return The number of spaces in each indentation step, or 0 or less for no indentation.
* @see #setIndentStep(int)
*
* @deprecated Only return the length of the indent string.
*/
@Deprecated
public int getIndentStep() {
return indentStep.length();
}

/**
* Set the current indent step.
*
* @param indentStep
* The new indent step (0 or less for no indentation).
* @see #getIndentStep()
*
* @deprecated Should use the version that takes string.
*/
@Deprecated
public void setIndentStep(int indentStep) {
StringBuilder s = new StringBuilder();
for (; indentStep > 0; indentStep--) {
s.append(' ');
}
setIndentStep(s.toString());
}

public void setIndentStep(String s) {
this.indentStep = s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ protected void copyProcessEngineProperties(ProcessEngineConfigurationImpl proces
// The job handlers will be added in the CmmnEngineConfiguration itself
cmmnEngineConfiguration.setAsyncHistoryEnabled(true);
cmmnEngineConfiguration.setAsyncHistoryExecutor(asyncHistoryExecutor);
cmmnEngineConfiguration.setAsyncHistoryJsonGroupingEnabled(processEngineConfiguration.isAsyncHistoryJsonGroupingEnabled());
cmmnEngineConfiguration.setAsyncHistoryJsonGroupingThreshold(processEngineConfiguration.getAsyncHistoryJsonGroupingThreshold());
cmmnEngineConfiguration.setAsyncHistoryJsonGzipCompressionEnabled(processEngineConfiguration.isAsyncHistoryJsonGzipCompressionEnabled());

cmmnEngineConfiguration.setAsyncHistoryTaskExecutor(processEngineConfiguration.getAsyncHistoryTaskExecutor());

Expand Down
Loading

0 comments on commit 44db5a2

Please sign in to comment.