Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
closes #8
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieurobin committed Feb 25, 2021
1 parent af813da commit c86cb31
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 64 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Kitten mq

### v0.10.4
*2021-02-xx*
- Fix issue #8: The prefetch feature has changed ids generation for messages. Ids were generated by queues. However, it is too late to generate an id in the broker. So, message's id is generated in client.

### v0.10.3
*2021-01-29*
- Verify channel argument of methods `client.consume` and `client.listen`.
Expand Down
20 changes: 10 additions & 10 deletions lib/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ function broker (configPath, isProd = false) {
}

if (_route === routes.QUEUE_SEND) {
return _onReveiveSendAction(client, packet.data.headers.handlerId, packet.data.data.channel, packet.data.data.data, _route);
return _onReveiveSendAction(client, packet.data.headers, packet.data.data.channel, packet.data.data.data, _route);
}

if (_route === routes.QUEUE_ACK || _route === routes.QUEUE_NACK) {
Expand Down Expand Up @@ -238,14 +238,14 @@ function broker (configPath, isProd = false) {
/**
* handler when on receiving send action
* @param {String} client clientId#nodeId
* @param {Int} handlerId id of the sender's callback
* @param {Object} headers { handlerId : id of the sender's callback, messageId }
* @param {String} channel endpoint/version/param
* @param {*} data packet's data
* @param {String} route
*/
function _onReveiveSendAction (client, handlerId, channel, data, route) {
function _onReveiveSendAction (client, headers, channel, data, route) {
let _channelParts = channel.split('/');
let _fullyQualifiedClient = client + '@' + handlerId;
let _fullyQualifiedClient = client + '@' + headers.handlerId;

let _channel = {};
_channel.endpoint = _channelParts[0];
Expand All @@ -258,13 +258,13 @@ function broker (configPath, isProd = false) {
// A client cannot broadcast a message
if (_channel.endpoint === '*') {
log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, cannot publish on "*"');
return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL));
return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL));
}

// A client cannot broadcast a message to * versions
if (_channel.version === '*') {
log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, cannot publish on "endpoint/*"');
return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL));
return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_ENPOINT_ALL));
}

let _keyQueue = _channel.endpoint + '/' + _channel.version;
Expand All @@ -277,17 +277,17 @@ function broker (configPath, isProd = false) {
let _client = client.split('#');
if (!_rules.isAllowed(_client[0], _client[1], _channel, false)) {
log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, not allowed');
return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.NOT_ALLOWED));
return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.NOT_ALLOWED));
}

let _errors = _queue.addInQueue(_channel.id, _item, handlerId);
let _errors = _queue.addInQueue(_channel.id, _item, headers.messageId || headers.handlerId);
if (_errors) {
log(logger.LEVELS.DEBUG, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';failed to publish, validation failed');
return _sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_FORMAT, _errors));
return _sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false, _getError(constants.ERRORS.BAD_FORMAT, _errors));
}

// Send confirmation to client
_sendToClients(null, [_fullyQualifiedClient], { handlerId, data : null, channel : _channel }, null, false );
_sendToClients(null, [_fullyQualifiedClient], { handlerId : headers.handlerId, data : null, channel : _channel }, null, false );
log(logger.LEVELS.INFO, logger.NAMESPACES.PUBLISHER, 'from=' + client + ';router=' + formatRoute(_channel) + ';published');
}

Expand Down
6 changes: 1 addition & 5 deletions lib/broker/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ function queue (id, handlerForQueue, config, configQueue = {}) {
_queue.lastItem = _queue.currentItem;

let headers = _queue.currentItem[2];
if (!headers) {
headers = { messageId : utils.randU32Sync() }
}

_acks[headers.messageId] = { message : _queue.currentItem };

handlerForQueue(
Expand Down Expand Up @@ -577,7 +573,7 @@ function queue (id, handlerForQueue, config, configQueue = {}) {
return _resValidation;
}

_queue.queue.push([id, item, null, messageId]);
_queue.queue.push([id, item, { messageId }, Date.now()]);
_queue.nbMessagesReceived++;


Expand Down
8 changes: 5 additions & 3 deletions lib/client/hosts.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Socket = require('kitten-socket');
const utils = require('../utils');
const jwt = require('kitten-jwt');
const Socket = require('kitten-socket');
const jwt = require('kitten-jwt');
const { randU32Sync } = require('../utils');

function hosts (config) {
let _hosts = {
Expand Down Expand Up @@ -125,6 +125,8 @@ function hosts (config) {
* @param {Int} messageId id gereated by broker
*/
_hosts.broadcast = function broadcast (clientId, privateKey, route, handlerId, data, messageId) {
messageId = messageId || randU32Sync();

_iterator(socket => {
write(socket, clientId, privateKey, route, handlerId, data, messageId);
});
Expand Down
Loading

0 comments on commit c86cb31

Please sign in to comment.