Skip to content

Commit

Permalink
clean sub-process executions on iteration completed
Browse files Browse the repository at this point in the history
  • Loading branch information
paed01 committed Dec 16, 2023
1 parent 94932ca commit 28b1da7
Show file tree
Hide file tree
Showing 21 changed files with 830 additions and 320 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

# 12.0.0

Memory issues running sequential multi-instance sub-process (MISP). All MISP executions are put in a list to be able to save state.

## Breaking

- remove MISP execution from execution reference list when iteration is completed, discarded, or errored

# 11.1.1

- fix boundary event not cancelling task if resumed before task was resumed
Expand Down
3 changes: 1 addition & 2 deletions dist/activity/Activity.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ const kFormatter = Symbol.for('formatter');
const kMessageHandlers = Symbol.for('messageHandlers');
const kStateMessage = Symbol.for('stateMessage');
const kActivated = Symbol.for('activated');
var _default = Activity;
exports.default = _default;
var _default = exports.default = Activity;
function Activity(Behaviour, activityDef, context) {
const {
id,
Expand Down
3 changes: 1 addition & 2 deletions dist/activity/ActivityExecution.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ const kExecuteQ = Symbol.for('executeQ');
const kExecuteMessage = Symbol.for('executeMessage');
const kMessageHandlers = Symbol.for('messageHandlers');
const kPostponed = Symbol.for('postponed');
var _default = ActivityExecution;
exports.default = _default;
var _default = exports.default = ActivityExecution;
function ActivityExecution(activity, context) {
this.activity = activity;
this.context = context;
Expand Down
3 changes: 1 addition & 2 deletions dist/definition/Definition.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ const kMessageHandlers = Symbol.for('messageHandlers');
const kStateMessage = Symbol.for('stateMessage');
const kStatus = Symbol.for('status');
const kStopped = Symbol.for('stopped');
var _default = Definition;
exports.default = _default;
var _default = exports.default = Definition;
function Definition(context, options) {
if (!(this instanceof Definition)) return new Definition(context, options);
if (!context) throw new Error('No context');
Expand Down
3 changes: 1 addition & 2 deletions dist/flows/SequenceFlow.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ var _EventBroker = require("../EventBroker.js");
var _Api = require("../Api.js");
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
const kCounters = Symbol.for('counters');
var _default = SequenceFlow;
exports.default = _default;
var _default = exports.default = SequenceFlow;
function SequenceFlow(flowDef, {
environment
}) {
Expand Down
3 changes: 1 addition & 2 deletions dist/getPropertyValue.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ const propertyPattern = /(\w+)\((.*?)(?:\))|(\.|\[|^)(.+?)(?:\]|\[|\.|$)/;
const stringConstantPattern = /^(['"])(.*)\1$/;
const numberConstantPattern = /^\W*-?\d+(.\d+)?\W*$/;
const negativeIndexPattern = /^-\d+$/;
var _default = getPropertyValue;
exports.default = _default;
var _default = exports.default = getPropertyValue;
function getPropertyValue(inputContext, propertyPath, fnScope) {
if (!inputContext) return;
let resultValue;
Expand Down
4 changes: 2 additions & 2 deletions dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,6 @@ var _Transaction = _interopRequireDefault(require("./tasks/Transaction.js"));
var _Timers = require("./Timers.js");
var ISODuration = _interopRequireWildcard(require("./iso-duration.js"));
exports.ISODuration = ISODuration;
function _getRequireWildcardCache(nodeInterop) { if (typeof WeakMap !== "function") return null; var cacheBabelInterop = new WeakMap(); var cacheNodeInterop = new WeakMap(); return (_getRequireWildcardCache = function (nodeInterop) { return nodeInterop ? cacheNodeInterop : cacheBabelInterop; })(nodeInterop); }
function _interopRequireWildcard(obj, nodeInterop) { if (!nodeInterop && obj && obj.__esModule) { return obj; } if (obj === null || typeof obj !== "object" && typeof obj !== "function") { return { default: obj }; } var cache = _getRequireWildcardCache(nodeInterop); if (cache && cache.has(obj)) { return cache.get(obj); } var newObj = {}; var hasPropertyDescriptor = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var key in obj) { if (key !== "default" && Object.prototype.hasOwnProperty.call(obj, key)) { var desc = hasPropertyDescriptor ? Object.getOwnPropertyDescriptor(obj, key) : null; if (desc && (desc.get || desc.set)) { Object.defineProperty(newObj, key, desc); } else { newObj[key] = obj[key]; } } } newObj.default = obj; if (cache) { cache.set(obj, newObj); } return newObj; }
function _getRequireWildcardCache(e) { if ("function" != typeof WeakMap) return null; var r = new WeakMap(), t = new WeakMap(); return (_getRequireWildcardCache = function (e) { return e ? t : r; })(e); }
function _interopRequireWildcard(e, r) { if (!r && e && e.__esModule) return e; if (null === e || "object" != typeof e && "function" != typeof e) return { default: e }; var t = _getRequireWildcardCache(r); if (t && t.has(e)) return t.get(e); var n = { __proto__: null }, a = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var u in e) if ("default" !== u && Object.prototype.hasOwnProperty.call(e, u)) { var i = a ? Object.getOwnPropertyDescriptor(e, u) : null; i && (i.get || i.set) ? Object.defineProperty(n, u, i) : n[u] = e[u]; } return n.default = e, t && t.set(e, n), n; }
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
3 changes: 1 addition & 2 deletions dist/process/Process.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ const kMessageHandlers = Symbol.for('messageHandlers');
const kStateMessage = Symbol.for('stateMessage');
const kStatus = Symbol.for('status');
const kStopped = Symbol.for('stopped');
var _default = Process;
exports.default = _default;
var _default = exports.default = Process;
function Process(processDef, context) {
const {
id,
Expand Down
3 changes: 1 addition & 2 deletions dist/process/ProcessExecution.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ var _Api = require("../Api.js");
var _messageHelper = require("../messageHelper.js");
var _shared = require("../shared.js");
var _Tracker = require("../Tracker.js");
var _default = ProcessExecution;
exports.default = _default;
var _default = exports.default = ProcessExecution;
const kActivated = Symbol.for('activated');
const kActivityQ = Symbol.for('activityQ');
const kCompleted = Symbol.for('completed');
Expand Down
27 changes: 20 additions & 7 deletions dist/tasks/LoopCharacteristics.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,21 @@ function ParallelLoopCharacteristics(activity, characteristics) {
this.characteristics = characteristics;
this.running = 0;
this.index = 0;
this.discarded = 0;
}
ParallelLoopCharacteristics.prototype.execute = function execute(executeMessage) {
const chr = this.characteristics;
if (!chr.cardinality) throw new _Errors.RunError(`<${this.id}> cardinality or collection is required in parallel loops`, executeMessage);
const isRedelivered = executeMessage.fields.redelivered;
if (isRedelivered) {
if (!isNaN(executeMessage.content.index)) this.index = executeMessage.content.index;
if (!isNaN(executeMessage.content.running)) this.running = executeMessage.content.running;
const {
index,
running,
discarded
} = executeMessage.content;
if (!isNaN(index)) this.index = index;
if (!isNaN(running)) this.running = running;
if (!isNaN(discarded)) this.discarded = discarded;
}
chr.subscribe(this._onCompleteMessage.bind(this));
if (isRedelivered) return;
Expand All @@ -148,34 +155,39 @@ ParallelLoopCharacteristics.prototype._startBatch = function startBatch() {
...chr.getContent(),
index: this.index,
running: this.running,
discarded: this.discarded,
output: chr.output,
preventComplete: true
});
for (const content of batch) {
broker.publish('execution', 'execute.start', content);
}
};
ParallelLoopCharacteristics.prototype._onCompleteMessage = function onCompleteMessage(_, message) {
ParallelLoopCharacteristics.prototype._onCompleteMessage = function onCompleteMessage(routingKey, message) {
const chr = this.characteristics;
const {
content
} = message;
if (content.output !== undefined) chr.output[content.index] = content.output;
if (routingKey === 'execute.discard') {
this.discarded++;
}
this.running--;
this.activity.broker.publish('execution', 'execute.iteration.completed', {
...content,
...chr.getContent(),
index: this.index,
running: this.running,
discarded: this.discarded,
output: chr.output,
state: 'iteration.completed',
preventComplete: true
});
if (this.running <= 0 && !chr.next(this.index)) {
return chr.complete(content);
return chr.complete(content, this.discarded === this.index);
}
if (chr.isCompletionConditionMet(message)) {
return chr.complete(content);
return chr.complete(content, this.discarded === this.index);
}
if (this.running <= 0) {
this.running = 0;
Expand Down Expand Up @@ -263,9 +275,9 @@ Characteristics.prototype.isCompletionConditionMet = function isCompletionCondit
loopOutput: this.output
}));
};
Characteristics.prototype.complete = function complete(content) {
Characteristics.prototype.complete = function complete(content, allDiscarded) {
this.stop();
return this.broker.publish('execution', 'execute.completed', {
return this.broker.publish('execution', 'execute.' + (allDiscarded ? 'discard' : 'completed'), {
...content,
...this.getContent(),
output: this.output
Expand All @@ -286,6 +298,7 @@ Characteristics.prototype.subscribe = function subscribe(onIterationCompleteMess
function onComplete(routingKey, message, ...args) {
if (!message.content.isMultiInstance) return;
switch (routingKey) {
case 'execute.discard':
case 'execute.cancel':
case 'execute.completed':
return onIterationCompleteMessage(routingKey, message, ...args);
Expand Down
94 changes: 33 additions & 61 deletions dist/tasks/SubProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,52 +56,36 @@ function SubProcessBehaviour(activity, context) {
this.executionId = undefined;
this[kExecutions] = [];
this[kMessageHandlers] = {
onApiRootMessage: this._onApiRootMessage.bind(this),
onExecutionCompleted: this._onExecutionCompleted.bind(this)
};
}
Object.defineProperty(SubProcessBehaviour.prototype, 'execution', {
get() {
return this[kExecutions][0];
}
});
Object.defineProperty(SubProcessBehaviour.prototype, 'executions', {
get() {
return this[kExecutions].slice();
Object.defineProperties(SubProcessBehaviour.prototype, {
'execution': {
get() {
return this[kExecutions][0];
}
},
'executions': {
get() {
return this[kExecutions].slice();
}
}
});
SubProcessBehaviour.prototype.execute = function execute(executeMessage) {
const content = executeMessage.content;
let executionId = this.executionId;
if (content.isRootScope) {
executionId = this.executionId = content.executionId;
const {
isRootScope,
executionId
} = executeMessage.content;
if (isRootScope) {
this.executionId = executionId;
}
const loopCharacteristics = this.loopCharacteristics;
if (loopCharacteristics && content.isRootScope) {
this.broker.subscribeTmp('api', `activity.#.${executionId}`, this[kMessageHandlers].onApiRootMessage, {
noAck: true,
consumerTag: `_api-${executionId}`,
priority: 200
});
if (loopCharacteristics && isRootScope) {
return loopCharacteristics.execute(executeMessage);
}
const processExecution = this._upsertExecution(executeMessage);
return processExecution.execute(executeMessage);
};
SubProcessBehaviour.prototype.stop = function stop() {
for (const execution of this[kExecutions]) {
this.broker.cancel(`_sub-process-execution-${execution.executionId}`);
this.broker.cancel(`_sub-process-api-${execution.executionId}`);
execution.stop();
}
};
SubProcessBehaviour.prototype.discard = function discard() {
for (const execution of this[kExecutions]) {
this.broker.cancel(`_sub-process-execution-${execution.executionId}`);
this.broker.cancel(`_sub-process-api-${execution.executionId}`);
execution.discard();
}
};
SubProcessBehaviour.prototype.getState = function getState() {
if (this.loopCharacteristics) {
return {
Expand Down Expand Up @@ -145,35 +129,22 @@ SubProcessBehaviour.prototype.getPostponed = function getPostponed() {
return result;
}, []);
};
SubProcessBehaviour.prototype._onApiRootMessage = function onApiRootMessage(_, message) {
const messageType = message.properties.type;
switch (messageType) {
case 'stop':
this.broker.cancel(message.fields.consumerTag);
this.stop();
break;
case 'discard':
this.broker.cancel(message.fields.consumerTag);
this.discard();
break;
}
};
SubProcessBehaviour.prototype._upsertExecution = function upsertExecution(executeMessage) {
const content = executeMessage.content;
const executionId = content.executionId;
let execution = this._getExecutionById(executionId);
if (execution) {
if (executeMessage.fields.redelivered) this._addListeners(execution, executionId);
if (executeMessage.fields.redelivered) this._addListeners(executionId);
return execution;
}
const subEnvironment = this.environment.clone();
const subContext = this.context.clone(subEnvironment, this.activity);
execution = new _ProcessExecution.default(this.activity, subContext);
this[kExecutions].push(execution);
this._addListeners(execution, executionId);
this._addListeners(executionId);
return execution;
};
SubProcessBehaviour.prototype._addListeners = function addListeners(processExecution, executionId) {
SubProcessBehaviour.prototype._addListeners = function addListeners(executionId) {
this.broker.subscribeTmp('subprocess-execution', `execution.#.${executionId}`, this[kMessageHandlers].onExecutionCompleted, {
noAck: true,
consumerTag: `_sub-process-execution-${executionId}`
Expand All @@ -187,21 +158,14 @@ SubProcessBehaviour.prototype._onExecutionCompleted = function onExecutionComple
switch (messageType) {
case 'stopped':
{
broker.cancel(message.fields.consumerTag);
break;
return broker.cancel(message.fields.consumerTag);
}
case 'completed':
case 'cancel':
case 'discard':
{
broker.cancel(message.fields.consumerTag);
broker.publish('execution', 'execute.' + messageType, (0, _messageHelper.cloneContent)(content));
break;
}
case 'completed':
{
broker.cancel(message.fields.consumerTag);
broker.publish('execution', 'execute.completed', (0, _messageHelper.cloneContent)(content));
break;
return this._completeExecution('execute.' + messageType, content);
}
case 'error':
{
Expand All @@ -210,11 +174,19 @@ SubProcessBehaviour.prototype._onExecutionCompleted = function onExecutionComple
error
} = content;
this.activity.logger.error(`<${this.id}>`, error);
broker.publish('execution', 'execute.error', (0, _messageHelper.cloneContent)(content));
break;
return this._completeExecution('execute.error', content);
}
}
};
SubProcessBehaviour.prototype._completeExecution = function completeExecution(completeRoutingKey, content) {
if (this.loopCharacteristics) {
const executions = this[kExecutions];
const executionIdx = executions.findIndex(pe => pe.executionId === content.executionId);
if (executionIdx < 0) return;
executions.splice(executionIdx, 1);
}
this.broker.publish('execution', completeRoutingKey, (0, _messageHelper.cloneContent)(content));
};
SubProcessBehaviour.prototype.getApi = function getApi(apiMessage) {
const content = apiMessage.content;
if (content.id === this.id) return;
Expand Down
14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bpmn-elements",
"version": "11.1.1",
"version": "12.0.0",
"description": "Executable workflow elements based on BPMN 2.0",
"type": "module",
"main": "dist/index.js",
Expand Down Expand Up @@ -46,23 +46,23 @@
"devDependencies": {
"@aircall/expression-parser": "^1.0.4",
"@babel/cli": "^7.22.9",
"@babel/core": "^7.22.9",
"@babel/preset-env": "^7.22.9",
"@babel/core": "^7.23.6",
"@babel/preset-env": "^7.23.6",
"@babel/register": "^7.22.5",
"@bonniernews/hot-bev": "^0.4.0",
"bpmn-moddle": "^8.0.1",
"c8": "^8.0.0",
"camunda-bpmn-moddle": "^7.0.1",
"chai": "^4.3.7",
"chronokinesis": "^5.0.2",
"chronokinesis": "^6.0.0",
"debug": "^4.3.4",
"eslint": "^8.44.0",
"eslint-plugin-import": "^2.27.5",
"eslint": "^8.56.0",
"eslint-plugin-import": "^2.29.1",
"got": "^12.6.1",
"mocha": "^10.1.0",
"mocha-cakes-2": "^3.3.0",
"moddle-context-serializer": "^4.0.0",
"nock": "^13.3.1"
"nock": "^13.4.0"
},
"dependencies": {
"smqp": "^8.0.0"
Expand Down
Loading

0 comments on commit 28b1da7

Please sign in to comment.