From c86cb310e7e7cc9c21384a62d91ee98e2980faa7 Mon Sep 17 00:00:00 2001 From: matthieurobin Date: Thu, 25 Feb 2021 14:31:00 +0100 Subject: [PATCH] closes #8 --- CHANGELOG.md | 4 ++ lib/broker/index.js | 20 ++++----- lib/broker/queue.js | 6 +-- lib/client/hosts.js | 8 ++-- test/test.broker.queue.js | 91 ++++++++++++++++++++------------------- test/test.kittenMQ.js | 5 ++- 6 files changed, 70 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c4a310..6d0ea2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Kitten mq +### v0.10.4 +*2021-02-xx* + - Fix issue #8: The prefetch feature has changed ids generation for messages. Ids were generated by queues. However, it is too late to generate an id in the broker. So, message's id is generated in client. + ### v0.10.3 *2021-01-29* - Verify channel argument of methods `client.consume` and `client.listen`. diff --git a/lib/broker/index.js b/lib/broker/index.js index 1912677..aea819c 100644 --- a/lib/broker/index.js +++ b/lib/broker/index.js @@ -152,7 +152,7 @@ function broker (configPath, isProd = false) { } if (_route === routes.QUEUE_SEND) { - return _onReveiveSendAction(client, packet.data.headers.handlerId, packet.data.data.channel, packet.data.data.data, _route); + return _onReveiveSendAction(client, packet.data.headers, packet.data.data.channel, packet.data.data.data, _route); } if (_route === routes.QUEUE_ACK || _route === routes.QUEUE_NACK) { @@ -238,14 +238,14 @@ function broker (configPath, isProd = false) { /** * handler when on receiving send action * @param {String} client clientId#nodeId - * @param {Int} handlerId id of the sender's callback + * @param {Object} headers { handlerId : id of the sender's callback, messageId } * @param {String} channel endpoint/version/param * @param {*} data packet's data * @param {String} route */ - function _onReveiveSendAction (client, handlerId, channel, data, route) { + function _onReveiveSendAction (client, headers, channel, data, route) { let _channelParts = channel.split('/'); - let _fullyQualifiedClient = client + '@' + handlerId; + let _fullyQualifiedClient = client + '@' + headers.handlerId; let _channel = {}; _channel.endpoint = _channelParts[0]; @@ -258,13 +258,13 @@ function broker (configPath, isProd = false) { // A client cannot broadcast a message if (_channel.endpoint === '*') { log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, cannot publish on "*"'); - return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL)); + return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL)); } // A client cannot broadcast a message to * versions if (_channel.version === '*') { log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, cannot publish on "endpoint/*"'); - return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL)); + return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL)); } let _keyQueue = _channel.endpoint + '/' + _channel.version; @@ -277,17 +277,17 @@ function broker (configPath, isProd = false) { let _client = client.split('#'); if (!_rules.isAllowed(_client[0], _client[1], _channel, false)) { log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, not allowed'); - return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.NOT_ALLOWED)); + return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.NOT_ALLOWED)); } - let _errors = _queue.addInQueue(_channel.id, _item, handlerId); + let _errors = _queue.addInQueue(_channel.id, _item, headers.messageId || headers.handlerId); if (_errors) { log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, validation failed'); - return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_FORMAT, _errors)); + return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_FORMAT, _errors)); } // Send confirmation to client - _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false ); + _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false ); log(logger.LEVELS.INFO, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';published'); } diff --git a/lib/broker/queue.js b/lib/broker/queue.js index 58c59bc..e1d9e59 100644 --- a/lib/broker/queue.js +++ b/lib/broker/queue.js @@ -349,10 +349,6 @@ function queue (id, handlerForQueue, config, configQueue = {}) { _queue.lastItem = _queue.currentItem; let headers = _queue.currentItem[2]; - if (!headers) { - headers = { messageId : utils.randU32Sync() } - } - _acks[headers.messageId] = { message : _queue.currentItem }; handlerForQueue( @@ -577,7 +573,7 @@ function queue (id, handlerForQueue, config, configQueue = {}) { return _resValidation; } - _queue.queue.push([id, item, null, messageId]); + _queue.queue.push([id, item, { messageId }, Date.now()]); _queue.nbMessagesReceived++; diff --git a/lib/client/hosts.js b/lib/client/hosts.js index ec5c624..0a3df28 100644 --- a/lib/client/hosts.js +++ b/lib/client/hosts.js @@ -1,6 +1,6 @@ -const Socket = require('kitten-socket'); -const utils = require('../utils'); -const jwt = require('kitten-jwt'); +const Socket = require('kitten-socket'); +const jwt = require('kitten-jwt'); +const { randU32Sync } = require('../utils'); function hosts (config) { let _hosts = { @@ -125,6 +125,8 @@ function hosts (config) { * @param {Int} messageId id gereated by broker */ _hosts.broadcast = function broadcast (clientId, privateKey, route, handlerId, data, messageId) { + messageId = messageId || randU32Sync(); + _iterator(socket => { write(socket, clientId, privateKey, route, handlerId, data, messageId); }); diff --git a/test/test.broker.queue.js b/test/test.broker.queue.js index 2530fde..da66902 100644 --- a/test/test.broker.queue.js +++ b/test/test.broker.queue.js @@ -2,6 +2,7 @@ const should = require('should'); const queueTree = require('../lib/broker/queue').queueTree; const queue = require('../lib/broker/queue').queue; const constants = require('../lib/broker/constants'); +const { randU32Sync } = require('../lib/utils'); describe('broker queue & tree', () => { @@ -453,7 +454,7 @@ describe('broker queue & tree', () => { it('should add an item in the waiting queue if no one is listening', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 10 }); - queueObject.addInQueue(1, { data : { label : 'bla' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(1); @@ -465,7 +466,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 5 }); for (let i = 0; i < 10; i++) { - queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, randU32Sync()); } @@ -485,7 +486,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, {}, { ttl : .5 }); for (let i = 0; i < 10; i++) { - queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, randU32Sync()); } should(queueObject.queue).have.lengthOf(0); @@ -508,10 +509,10 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, {}, { ttl : .5 }); for (let i = 0; i < 10; i++) { - queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_' + i }}, randU32Sync()); if (i % 2) { - queueObject.addInQueue(2, { data : { label : 'bla_' + i }}, Date.now()); + queueObject.addInQueue(2, { data : { label : 'bla_' + i }}, randU32Sync()); } } @@ -545,7 +546,7 @@ describe('broker queue & tree', () => { }; let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10 }); - queueObject.addInQueue(1, { data : { label : 'bla' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla' }}, randU32Sync()); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); }); @@ -561,16 +562,16 @@ describe('broker queue & tree', () => { }; let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10 }); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); }); it('should not process items that belongs to another id', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 10 }); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); queueObject.addClient(2, 'client-2', '987654', constants.LISTENER_TYPES.CONSUME); @@ -598,7 +599,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -627,7 +628,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueLimit : 2, requeueInterval : 0.5 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -662,7 +663,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueLimit : 2, requeueInterval : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -700,8 +701,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueLimit : 2, requeueInterval : 0.5 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(1); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -732,7 +733,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueLimit : 2, requeueInterval : 0.2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(1); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -765,7 +766,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueLimit : 2, requeueInterval : 0.2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -791,7 +792,7 @@ describe('broker queue & tree', () => { queueObject.addClient(1, 'client-2', '123456', constants.LISTENER_TYPES.LISTEN); queueObject.addClient(1, 'client-3', '123456', constants.LISTENER_TYPES.LISTEN); - queueObject.addInQueue(1, { data : { label : 'bla' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -817,11 +818,11 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 5 }, { prefetch : 3 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_3' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_4' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_5' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync() + 'a'); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync() + 'b'); + queueObject.addInQueue(1, { data : { label : 'bla_3' }}, randU32Sync() + 'c'); + queueObject.addInQueue(1, { data : { label : 'bla_4' }}, randU32Sync() + 'd'); + queueObject.addInQueue(1, { data : { label : 'bla_5' }}, randU32Sync() + 'e'); setTimeout(() => { should(iterator).eql(3); @@ -859,11 +860,11 @@ describe('broker queue & tree', () => { queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); queueObject.addClient(1, 'client-2', '123457', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_3' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_4' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_5' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync() + 'a'); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync() + 'b'); + queueObject.addInQueue(1, { data : { label : 'bla_3' }}, randU32Sync() + 'c'); + queueObject.addInQueue(1, { data : { label : 'bla_4' }}, randU32Sync() + 'd'); + queueObject.addInQueue(1, { data : { label : 'bla_5' }}, randU32Sync() + 'e'); setTimeout(() => { should(iterator).eql(3); @@ -901,8 +902,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.1, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); setTimeout(() => { should(iterator).eql(3); @@ -940,8 +941,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.1, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); setTimeout(() => { should(iterator).eql(3); @@ -982,8 +983,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.1, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); setTimeout(() => { should(iterator).eql(3); @@ -1027,8 +1028,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.2, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); setTimeout(() => { should(iterator).eql(3); @@ -1077,8 +1078,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.1, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -1122,8 +1123,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.2, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -1172,8 +1173,8 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', handler, { maxItemsInQueue : 10, requeueInterval : 0.2, requeueLimit : 2 }, { prefetch : 2 }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - queueObject.addInQueue(1, { data : { label : 'bla_1' }}, Date.now()); - queueObject.addInQueue(1, { data : { label : 'bla_2' }}, Date.now()); + queueObject.addInQueue(1, { data : { label : 'bla_1' }}, randU32Sync()); + queueObject.addInQueue(1, { data : { label : 'bla_2' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -1183,7 +1184,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 10 }, { id : ['int'], label : ['string'] }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - let res = queueObject.addInQueue(1, {}, Date.now()); + let res = queueObject.addInQueue(1, {}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -1197,7 +1198,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 10 }, { id : ['int'], label : ['string'] }); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - let res = queueObject.addInQueue(1, { data : null }, Date.now()); + let res = queueObject.addInQueue(1, { data : null }, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); @@ -1211,7 +1212,7 @@ describe('broker queue & tree', () => { let queueObject = queue('endpoint/v1', () => {}, { maxItemsInQueue : 10 }, { map : { id : ['int'], label : ['string'] }}); queueObject.addClient(1, 'client-1', '123456', constants.LISTENER_TYPES.CONSUME); - let res = queueObject.addInQueue(1, { data : { label : 'bla' }}, Date.now()); + let res = queueObject.addInQueue(1, { data : { label : 'bla' }}, randU32Sync()); should(queueObject.queue).have.lengthOf(0); should(queueObject.queueSecondary._nbMessages).eql(0); diff --git a/test/test.kittenMQ.js b/test/test.kittenMQ.js index b9b6f0f..ed73923 100644 --- a/test/test.kittenMQ.js +++ b/test/test.kittenMQ.js @@ -240,10 +240,13 @@ describe('kitten-mq', () => { }); setTimeout(() => { - should(_broker1._queues['endpoint/1.0'].currentItem).eql(undefined); should(_broker1._queues['endpoint/1.0'].currentItem).eql(undefined); should(_broker1._queues['endpoint/1.0'].queueSecondary._nbMessages).eql(0); should(_broker1._queues['endpoint/1.0'].queueSecondary._nbMessages).eql(0); + + should(_broker2._queues['endpoint/1.0'].currentItem).eql(undefined); + should(_broker2._queues['endpoint/1.0'].queueSecondary._nbMessages).eql(0); + should(_broker2._queues['endpoint/1.0'].queueSecondary._nbMessages).eql(0); should(_nbCalls).eql(1); _client1.disconnect(() => {