Skip to content

Commit

Permalink
refactor outbound sequence flow handling #42
Browse files Browse the repository at this point in the history
  • Loading branch information
paed01 committed Sep 28, 2024
1 parent 915c56a commit 82d4dea
Show file tree
Hide file tree
Showing 16 changed files with 1,102 additions and 406 deletions.
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ node_modules/
coverage/
tmp/
test/resources/*.bpmn
*.bpmn
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 16.2.0

- refactor outbound sequence flow evaluation in an attempt to mitigate nasty discard loops when multiple outbound flows have the same target. What happens now is that only one (1) flow will be touched triggering the targeted activity. E.g: all outbound are discarded - only the last discarded flow is discarded; all but one flow is discarded - only taken flow is touched; all flows taken - only the last taken flow is taken. What about conditional flows? No worries, all conditional flows conditions are still evaluated

## 16.1.0

- support ISO8601 interval timers with unbounded number of repetitions, e.g `R/PT1M` or `R-1/PT1M`
Expand Down
198 changes: 48 additions & 150 deletions dist/activity/Activity.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var _EventBroker = require("../EventBroker.js");
var _MessageFormatter = require("../MessageFormatter.js");
var _messageHelper = require("../messageHelper.js");
var _Errors = require("../error/Errors.js");
var _outboundEvaluator = require("./outbound-evaluator.js");
function _interopRequireDefault(e) { return e && e.__esModule ? e : { default: e }; }
const kActivityDef = Symbol.for('activityDefinition');
const kConsuming = Symbol.for('consuming');
Expand Down Expand Up @@ -84,15 +85,20 @@ function Activity(Behaviour, activityDef, context) {
inboundTriggers = inboundSequenceFlows.slice();
}
const outboundSequenceFlows = context.getOutboundSequenceFlows(id);
const isParallelJoin = activityDef.isParallelGateway && inboundSequenceFlows.length > 1;
const flows = this[kFlows] = {
inboundSequenceFlows,
inboundAssociations,
inboundJoinFlows: new Set(),
inboundTriggers,
outboundSequenceFlows,
outboundEvaluator: new OutboundEvaluator(this, outboundSequenceFlows)
outboundEvaluator: new _outboundEvaluator.OutboundEvaluator(this, outboundSequenceFlows),
...(isParallelJoin && {
inboundJoinFlows: new Set(),
inboundSourceIds: new Set(inboundSequenceFlows.map(({
sourceId
}) => sourceId))
})
};
const isParallelJoin = activityDef.isParallelGateway && flows.inboundSequenceFlows.length > 1;
this[kFlags] = {
isEnd: flows.outboundSequenceFlows.length === 0,
isStart: flows.inboundSequenceFlows.length === 0 && !attachedTo && !behaviour.triggeredByEvent && !isForCompensation,
Expand Down Expand Up @@ -508,21 +514,21 @@ Activity.prototype._onJoinInbound = function onJoinInbound(routingKey, message)
} = message;
const {
inboundJoinFlows,
inboundTriggers
inboundSourceIds
} = this[kFlows];
let alreadyTouched = false;
const touched = new Set();
let taken;
for (const msg of inboundJoinFlows) {
const flowId = msg.content.id;
touched.add(flowId);
if (flowId === content.id) {
const sourceId = msg.content.sourceId;
touched.add(sourceId);
if (sourceId === content.sourceId) {
alreadyTouched = true;
}
}
inboundJoinFlows.add(message);
if (alreadyTouched) return;
const remaining = inboundTriggers.length - touched.size - 1;
const remaining = inboundSourceIds.size - touched.size - 1;
if (remaining) {
return this.logger.debug(`<${this.id}> inbound ${message.content.action} from <${message.content.id}>, ${remaining} remaining`);
}
Expand Down Expand Up @@ -851,11 +857,11 @@ Activity.prototype._doOutbound = function doOutbound(fromMessage, isDiscarded, c
}
let outboundFlows;
if (isDiscarded) {
outboundFlows = outboundSequenceFlows.map(flow => formatFlowAction(flow, {
outboundFlows = outboundSequenceFlows.map(flow => (0, _outboundEvaluator.formatFlowAction)(flow, {
action: 'discard'
}));
} else if (fromContent.outbound && fromContent.outbound.length) {
outboundFlows = outboundSequenceFlows.map(flow => formatFlowAction(flow, fromContent.outbound.filter(f => f.id === flow.id).pop()));
outboundFlows = outboundSequenceFlows.map(flow => (0, _outboundEvaluator.formatFlowAction)(flow, fromContent.outbound.filter(f => f.id === flow.id).pop()));
}
if (outboundFlows) {
this._doRunOutbound(outboundFlows, fromContent, discardSequence);
Expand All @@ -868,25 +874,41 @@ Activity.prototype._doOutbound = function doOutbound(fromMessage, isDiscarded, c
});
};
Activity.prototype._doRunOutbound = function doRunOutbound(outboundList, content, discardSequence) {
for (const outboundFlow of outboundList) {
const {
id: flowId,
action,
result
} = outboundFlow;
this.broker.publish('run', 'run.outbound.' + action, (0, _messageHelper.cloneContent)(content, {
flow: {
...(result && typeof result === 'object' && result),
...outboundFlow,
sequenceId: (0, _shared.getUniqueId)(`${flowId}_${action}`),
...(discardSequence && {
discardSequence: discardSequence.slice()
})
if (outboundList.length === 1) {
this._publishRunOutbound(outboundList[0], content, discardSequence);
} else {
const targets = new Map();
for (const outboundFlow of outboundList) {
const prevTarget = targets.get(outboundFlow.targetId);
if (!prevTarget) {
targets.set(outboundFlow.targetId, outboundFlow);
} else if (outboundFlow.action === 'take' && outboundFlow.action !== prevTarget.action) {
targets.set(outboundFlow.targetId, outboundFlow);
}
}));
}
for (const outboundFlow of targets.values()) {
this._publishRunOutbound(outboundFlow, content, discardSequence);
}
}
return outboundList;
};
Activity.prototype._publishRunOutbound = function publishRunOutbound(outboundFlow, content, discardSequence) {
const {
id: flowId,
action,
result
} = outboundFlow;
this.broker.publish('run', 'run.outbound.' + action, (0, _messageHelper.cloneContent)(content, {
flow: {
...(result && typeof result === 'object' && result),
...outboundFlow,
sequenceId: (0, _shared.getUniqueId)(`${flowId}_${action}`),
...(discardSequence && {
discardSequence: discardSequence.slice()
})
}
}));
};
Activity.prototype._onResumeMessage = function onResumeMessage(message) {
message.ack();
const stateMessage = this[kStateMessage];
Expand Down Expand Up @@ -991,128 +1013,4 @@ Activity.prototype._deactivateRunConsumers = function _deactivateRunConsumers()
this._pauseRunQ();
broker.cancel('_activity-execution');
this[kConsuming] = false;
};
function OutboundEvaluator(activity, outboundFlows) {
this.activity = activity;
this.broker = activity.broker;
const flows = this.outboundFlows = outboundFlows.slice();
const defaultFlowIdx = flows.findIndex(({
isDefault
}) => isDefault);
if (defaultFlowIdx > -1) {
const [defaultFlow] = flows.splice(defaultFlowIdx, 1);
flows.push(defaultFlow);
}
this.defaultFlowIdx = outboundFlows.findIndex(({
isDefault
}) => isDefault);
this._onEvaluated = this.onEvaluated.bind(this);
this.evaluateArgs = {};
}
OutboundEvaluator.prototype.evaluate = function evaluate(fromMessage, discardRestAtTake, callback) {
const outboundFlows = this.outboundFlows;
const args = this.evaluateArgs = {
fromMessage,
evaluationId: fromMessage.content.executionId,
discardRestAtTake,
callback,
conditionMet: false,
result: {},
takenCount: 0
};
if (!outboundFlows.length) return this.completed();
const flows = args.flows = outboundFlows.slice();
this.broker.subscribeTmp('execution', 'evaluate.flow.#', this._onEvaluated, {
consumerTag: `_flow-evaluation-${args.evaluationId}`
});
return this.evaluateFlow(flows.shift());
};
OutboundEvaluator.prototype.onEvaluated = function onEvaluated(routingKey, message) {
const content = message.content;
const {
id: flowId,
action,
evaluationId
} = message.content;
const args = this.evaluateArgs;
if (action === 'take') {
args.takenCount++;
args.conditionMet = true;
}
args.result[flowId] = content;
if ('result' in content) {
this.activity.logger.debug(`<${evaluationId} (${this.activity.id})> flow <${flowId}> evaluated to: ${!!content.result}`);
}
let nextFlow = args.flows.shift();
if (!nextFlow) return this.completed();
if (args.discardRestAtTake && args.conditionMet) {
do {
args.result[nextFlow.id] = formatFlowAction(nextFlow, {
action: 'discard'
});
} while (nextFlow = args.flows.shift());
return this.completed();
}
if (args.conditionMet && nextFlow.isDefault) {
args.result[nextFlow.id] = formatFlowAction(nextFlow, {
action: 'discard'
});
return this.completed();
}
message.ack();
this.evaluateFlow(nextFlow);
};
OutboundEvaluator.prototype.evaluateFlow = function evaluateFlow(flow) {
const broker = this.broker;
const {
fromMessage,
evaluationId
} = this.evaluateArgs;
flow.evaluate((0, _messageHelper.cloneMessage)(fromMessage), (err, result) => {
if (err) return this.completed(err);
const action = result ? 'take' : 'discard';
return broker.publish('execution', 'evaluate.flow.' + action, formatFlowAction(flow, {
action,
result,
evaluationId
}), {
persistent: false
});
});
};
OutboundEvaluator.prototype.completed = function completed(err) {
const {
callback,
evaluationId,
fromMessage,
result,
takenCount
} = this.evaluateArgs;
this.broker.cancel(`_flow-evaluation-${evaluationId}`);
if (err) return callback(err);
if (!takenCount && this.outboundFlows.length) {
const nonTakenError = new _Errors.ActivityError(`<${this.activity.id}> no conditional flow taken`, fromMessage);
return callback(nonTakenError);
}
const message = fromMessage.content.message;
const evaluationResult = [];
for (const flow of Object.values(result)) {
evaluationResult.push({
...flow,
...(message !== undefined && {
message
})
});
}
return callback(null, evaluationResult);
};
function formatFlowAction(flow, options) {
return {
...options,
id: flow.id,
action: options.action,
...(flow.isDefault && {
isDefault: true
})
};
}
};
Loading

0 comments on commit 82d4dea

Please sign in to comment.