Skip to content

Commit

Permalink
use Set and Map where feasible
Browse files Browse the repository at this point in the history
  • Loading branch information
paed01 committed Jun 9, 2024
1 parent 2dc61be commit 730a194
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 222 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

# 15.0.0

- Use Set and Map where feasible to increase performance

# 14.1.0

- delegate Signal within a process
Expand Down
1 change: 0 additions & 1 deletion dist/MessageFormatter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ function Formatter(element, formatQ) {
this.broker = broker;
this.logger = logger;
this.formatQ = formatQ;
this.pendingFormats = [];
this[kOnMessage] = this._onMessage.bind(this);
}
Formatter.prototype.format = function format(message, callback) {
Expand Down
34 changes: 15 additions & 19 deletions dist/Tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ exports.ActivityTracker = ActivityTracker;
function ActivityTracker(parentId) {
this.id = parentId;
this.status = {
wait: [],
execute: [],
timer: []
wait: new Set(),
execute: new Set(),
timer: new Set()
};
}
Object.defineProperty(ActivityTracker.prototype, 'activityStatus', {
get() {
const status = this.status;
if (status.execute.length) return 'executing';
if (status.timer.length) return 'timer';
return status.wait.length ? 'wait' : 'idle';
if (status.execute.size) return 'executing';
if (status.timer.size) return 'timer';
return status.wait.size ? 'wait' : 'idle';
}
});
ActivityTracker.prototype.track = function track(routingKey, message) {
Expand Down Expand Up @@ -55,36 +55,32 @@ ActivityTracker.prototype._executing = function executing(id) {
wait,
execute
} = this.status;
if (execute.indexOf(id) === -1) execute.push(id);
let idx;
if ((idx = wait.indexOf(id)) !== -1) wait.splice(idx, 1);
wait.delete(id);
execute.add(id);
};
ActivityTracker.prototype._waiting = function waiting(id) {
const {
wait,
execute
} = this.status;
if (wait.indexOf(id) === -1) wait.push(id);
let idx;
if ((idx = execute.indexOf(id)) !== -1) execute.splice(idx, 1);
execute.delete(id);
wait.add(id);
};
ActivityTracker.prototype._timer = function timerFn(id) {
const {
timer,
execute
} = this.status;
if (timer.indexOf(id) === -1) timer.push(id);
let idx;
if ((idx = execute.indexOf(id)) !== -1) execute.splice(idx, 1);
execute.delete(id);
timer.add(id);
};
ActivityTracker.prototype._leave = function leave(id) {
const {
wait,
execute,
timer
} = this.status;
let idx;
if ((idx = wait.indexOf(id)) !== -1) wait.splice(idx, 1);
if ((idx = execute.indexOf(id)) !== -1) execute.splice(idx, 1);
if ((idx = timer.indexOf(id)) !== -1) timer.splice(idx, 1);
execute.delete(id);
timer.delete(id);
wait.delete(id);
};
117 changes: 63 additions & 54 deletions dist/definition/DefinitionExecution.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ function DefinitionExecution(definition, context) {
const environment = this.environment = definition.environment;
this.context = context;
const processes = context.getProcesses();
const ids = [];
const executable = [];
const ids = new Set();
const executable = new Set();
for (const bp of processes) {
bp.environment.assignVariables(environment.variables);
bp.environment.assignSettings(environment.settings);
ids.push(bp.id);
if (bp.isExecutable) executable.push(bp);
ids.add(bp.id);
if (bp.isExecutable) executable.add(bp);
}
this[kProcesses] = {
processes,
running: [],
ids,
executable,
postponed: []
running: new Set(),
postponed: new Set()
};
broker.assertExchange('execution', 'topic', {
autoDelete: false,
Expand Down Expand Up @@ -80,12 +80,12 @@ Object.defineProperties(DefinitionExecution.prototype, {
},
processes: {
get() {
return this[kProcesses].running;
return [...this[kProcesses].running];
}
},
postponedCount: {
get() {
return this[kProcesses].postponed.length;
return this[kProcesses].postponed.size;
}
},
isRunning: {
Expand All @@ -97,7 +97,7 @@ Object.defineProperties(DefinitionExecution.prototype, {
get() {
let status = 'idle';
const running = this[kProcesses].running;
if (!running || !running.length) return status;
if (!running.size) return status;
for (const bp of running) {
const bpStatus = bp.activityStatus;
switch (bp.activityStatus) {
Expand Down Expand Up @@ -141,12 +141,14 @@ DefinitionExecution.prototype.execute = function execute(executeMessage) {
if (content.processId) {
const startWithProcess = this.getProcessById(content.processId);
if (startWithProcess) {
executable.splice(0);
executable.push(startWithProcess);
executable.clear();
executable.add(startWithProcess);
}
}
this._debug('execute definition');
running.push(...executable);
for (const bp of executable) {
running.add(bp);
}
this._activate(executable);
this._start();
return true;
Expand All @@ -159,7 +161,7 @@ DefinitionExecution.prototype.resume = function resume() {
postponed
} = this[kProcesses];
this._activate(running);
postponed.splice(0);
postponed.clear();
this[kProcessesQ].consume(this[kMessageHandlers].onProcessMessage, {
prefetch: 1000,
consumerTag: `_definition-activity-${this.executionId}`
Expand All @@ -175,20 +177,20 @@ DefinitionExecution.prototype.recover = function recover(state) {
this[kStatus] = state.status;
this._debug(`recover ${this[kStatus]} definition execution`);
const running = this[kProcesses].running;
running.splice(0);
const ids = [];
running.clear();
const ids = new Set();
for (const bpState of state.processes) {
const bpid = bpState.id;
let bp;
if (ids.indexOf(bpid) > -1) {
if (ids.has(bpid)) {
bp = this.context.getNewProcessById(bpid);
} else {
bp = this.getProcessById(bpid);
}
if (!bp) continue;
ids.push(bpid);
ids.add(bpid);
bp.recover(bpState);
running.push(bp);
running.add(bp);
}
return this;
};
Expand All @@ -200,7 +202,7 @@ DefinitionExecution.prototype.getProcesses = function getProcesses() {
running,
processes
} = this[kProcesses];
const result = running.slice();
const result = [...running];
for (const bp of processes) {
if (!result.find(runningBp => bp.id === runningBp.id)) result.push(bp);
}
Expand All @@ -213,23 +215,27 @@ DefinitionExecution.prototype.getProcessesById = function getProcessesById(proce
return this.getProcesses().filter(bp => bp.id === processId);
};
DefinitionExecution.prototype.getProcessByExecutionId = function getProcessByExecutionId(processExecutionId) {
const running = this[kProcesses].running;
return running.find(bp => bp.executionId === processExecutionId);
for (const bp of this[kProcesses].running) {
if (bp.executionId === processExecutionId) return bp;
}
};
DefinitionExecution.prototype.getRunningProcesses = function getRunningProcesses() {
const running = this[kProcesses].running;
return running.filter(bp => bp.executionId);
return [...this[kProcesses].running].filter(bp => bp.executionId);
};
DefinitionExecution.prototype.getExecutableProcesses = function getExecutableProcesses() {
return this[kProcesses].executable.slice();
return [...this[kProcesses].executable];
};
DefinitionExecution.prototype.getState = function getState() {
const processes = [];
for (const bp of this[kProcesses].running) {
processes.push(bp.getState());
}
return {
executionId: this.executionId,
stopped: this[kStopped],
completed: this[kCompleted],
status: this[kStatus],
processes: this[kProcesses].running.map(bp => bp.getState())
processes
};
};
DefinitionExecution.prototype.getApi = function getApi(apiMessage) {
Expand All @@ -244,39 +250,40 @@ DefinitionExecution.prototype.getApi = function getApi(apiMessage) {
const postponed = this[kProcesses].postponed;
const self = this;
api.getExecuting = function getExecuting() {
return postponed.reduce((result, msg) => {
const apis = [];
for (const msg of postponed) {
const bpApi = self._getProcessApi(msg);
if (bpApi) result.push(bpApi);
return result;
}, []);
if (bpApi) apis.push(bpApi);
}
return apis;
};
return api;
};
DefinitionExecution.prototype.getPostponed = function getPostponed(...args) {
const running = this[kProcesses].running;
return running.reduce((result, p) => {
result = result.concat(p.getPostponed(...args));
return result;
}, []);
let result = [];
for (const bp of this[kProcesses].running) {
result = result.concat(bp.getPostponed(...args));
}
return result;
};
DefinitionExecution.prototype._start = function start() {
const {
ids,
executable,
postponed
} = this[kProcesses];
if (!ids.length) {
if (!ids.size) {
return this._complete('completed');
}
if (!executable.length) {
if (!executable.size) {
return this._complete('error', {
error: new Error('No executable process')
});
}
this[kStatus] = 'start';
for (const bp of executable) bp.init();
for (const bp of executable) bp.run();
postponed.splice(0);
postponed.clear();
this[kProcessesQ].assertConsumer(this[kMessageHandlers].onProcessMessage, {
prefetch: 1000,
consumerTag: `_definition-activity-${this.executionId}`
Expand Down Expand Up @@ -331,7 +338,7 @@ DefinitionExecution.prototype._onChildEvent = function onChildEvent(routingKey,
const message = (0, _messageHelper.cloneMessage)(originalMessage);
const content = message.content;
const parent = content.parent = content.parent || {};
const isDirectChild = this[kProcesses].ids.indexOf(content.id) > -1;
const isDirectChild = this[kProcesses].ids.has(content.id);
if (isDirectChild) {
parent.executionId = this.executionId;
} else {
Expand Down Expand Up @@ -424,7 +431,7 @@ DefinitionExecution.prototype._onProcessMessage = function onProcessMessage(rout
type: 'error'
});
} else {
for (const bp of this[kProcesses].running.slice()) {
for (const bp of new Set(this[kProcesses].running)) {
if (bp.id !== childId) bp.stop();
}
Object.assign(this.environment.output, content.output);
Expand All @@ -439,12 +446,15 @@ DefinitionExecution.prototype._onProcessMessage = function onProcessMessage(rout
DefinitionExecution.prototype._stateChangeMessage = function stateChangeMessage(message, postponeMessage) {
let previousMsg;
const postponed = this[kProcesses].postponed;
const idx = postponed.findIndex(msg => msg.content.executionId === message.content.executionId);
if (idx > -1) {
previousMsg = postponed.splice(idx, 1)[0];
for (const msg of postponed) {
if (msg.content.executionId === message.content.executionId) {
previousMsg = msg;
postponed.delete(msg);
break;
}
}
if (previousMsg) previousMsg.ack();
if (postponeMessage) postponed.push(message);
if (postponeMessage) postponed.add(message);
};
DefinitionExecution.prototype._onProcessCompleted = function onProcessCompleted(message) {
this._stateChangeMessage(message, false);
Expand All @@ -467,9 +477,9 @@ DefinitionExecution.prototype._onProcessCompleted = function onProcessCompleted(
};
DefinitionExecution.prototype._onStopped = function onStopped(message) {
const running = this[kProcesses].running;
this._debug(`stop definition execution (stop process executions ${running.length})`);
this._debug(`stop definition execution (stop process executions ${running.size})`);
this[kProcessesQ].close();
for (const bp of running.slice()) bp.stop();
for (const bp of new Set(running)) bp.stop();
this._deactivate();
this[kStopped] = true;
return this.broker.publish('execution', `execution.stopped.${this.executionId}`, (0, _messageHelper.cloneContent)(this[kExecuteMessage].content, {
Expand All @@ -490,7 +500,7 @@ DefinitionExecution.prototype._onApiMessage = function onApiMessage(routingKey,
});
}
if (delegate) {
for (const bp of this[kProcesses].running.slice()) {
for (const bp of new Set(this[kProcesses].running)) {
bp.broker.publish('api', routingKey, (0, _messageHelper.cloneContent)(message.content), message.properties);
}
}
Expand All @@ -515,7 +525,7 @@ DefinitionExecution.prototype._startProcessesByMessage = function startProcesses
if (!bp.executionId) {
this._debug(`start <${bp.id}> by <${reference.referenceId}> (${reference.referenceType})`);
this._activateProcess(bp);
running.push(bp);
running.add(bp);
bp.init();
bp.run();
if (reference.referenceType === 'message') return;
Expand All @@ -524,7 +534,7 @@ DefinitionExecution.prototype._startProcessesByMessage = function startProcesses
this._debug(`start new <${bp.id}> by <${reference.referenceId}> (${reference.referenceType})`);
const targetProcess = this.context.getNewProcessById(bp.id);
this._activateProcess(targetProcess);
running.push(targetProcess);
running.add(targetProcess);
targetProcess.init();
targetProcess.run();
if (reference.referenceType === 'message') return;
Expand All @@ -551,7 +561,7 @@ DefinitionExecution.prototype._onMessageOutbound = function onMessageOutbound(ro
if (found) return;
targetProcess = targetProcess || this.context.getNewProcessById(target.processId);
this._activateProcess(targetProcess);
this[kProcesses].running.push(targetProcess);
this[kProcesses].running.add(targetProcess);
targetProcess.init();
targetProcess.run();
targetProcess.sendMessage(message);
Expand Down Expand Up @@ -583,7 +593,7 @@ DefinitionExecution.prototype._onCallActivity = function onCallActivity(routingK
if (!targetProcess) return;
this._debug(`call from <${fromParent.id}.${fromId}> to <${calledElement}>`);
this._activateProcess(targetProcess);
this[kProcesses].running.push(targetProcess);
this[kProcesses].running.add(targetProcess);
targetProcess.init(bpExecutionId);
targetProcess.run({
inbound: [(0, _messageHelper.cloneContent)(content)]
Expand Down Expand Up @@ -639,10 +649,9 @@ DefinitionExecution.prototype._onDelegateMessage = function onDelegateMessage(ro
});
};
DefinitionExecution.prototype._removeProcessByExecutionId = function removeProcessByExecutionId(processExecutionId) {
const running = this[kProcesses].running;
const idx = running.findIndex(p => p.executionId === processExecutionId);
if (idx === -1) return;
return running.splice(idx, 1)[0];
const bp = this.getProcessByExecutionId(processExecutionId);
if (bp) this[kProcesses].running.delete(bp);
return bp;
};
DefinitionExecution.prototype._complete = function complete(completionType, content, options) {
this._deactivate();
Expand Down
Loading

0 comments on commit 730a194

Please sign in to comment.