From 0e32050ea2cd5e5b526b2bef469e84e361d0f2d4 Mon Sep 17 00:00:00 2001 From: Yasser Fadl Date: Wed, 30 Aug 2017 18:05:15 +0200 Subject: [PATCH] feat: individual presence awareness --- src/presence/presence-handler.js | 273 +++++++++++------- test-specs/features | 2 +- test-specs/steps/client/presence.js | 41 ++- .../presence-handlerDeprecatedSpec.js | 111 +++++++ .../unit/presence/presence-handlerSpec.js | 63 ++-- 5 files changed, 358 insertions(+), 132 deletions(-) create mode 100644 test-unit/unit/presence/presence-handlerDeprecatedSpec.js diff --git a/src/presence/presence-handler.js b/src/presence/presence-handler.js index bd0c6a41e..c0c83a2ab 100644 --- a/src/presence/presence-handler.js +++ b/src/presence/presence-handler.js @@ -4,6 +4,21 @@ const EventEmitter = require('component-emitter2') const C = require('../constants/constants') const ResubscribeNotifier = require('../utils/resubscribe-notifier') +function validateArguments (userId, callback, defaultAction) { + if (typeof userId === 'function' && callback === undefined) { + callback = userId // eslint-disable-line + userId = defaultAction // eslint-disable-line + } else { + userId = [userId] // eslint-disable-line + } + + if (callback !== undefined && typeof callback !== 'function') { + throw new Error('invalid argument callback') + } + + return { userId, callback } +} + /** * The main class for presence in deepstream * @@ -17,119 +32,181 @@ const ResubscribeNotifier = require('../utils/resubscribe-notifier') * @constructor * @public */ -const PresenceHandler = function (options, connection, client) { - this._options = options - this._connection = connection - this._client = client - this._emitter = new EventEmitter() - this._ackTimeoutRegistry = client._$getAckTimeoutRegistry() - this._resubscribeNotifier = new ResubscribeNotifier(this._client, this._resubscribe.bind(this)) -} +module.exports = class PresenceHandler { + constructor (options, connection, client) { + this._options = options + this._connection = connection + this._client = client + this._queryEmitter = new EventEmitter() + this._subscribeEmitter = new EventEmitter() + this._ackTimeoutRegistry = client._$getAckTimeoutRegistry() + this._resubscribeNotifier = new ResubscribeNotifier(this._client, this._resubscribe.bind(this)) + this._counter = 1 -/** - * Queries for clients logged into deepstream. - * - * @param {Function} callback Will be invoked with an array of clients - * - * @public - * @returns {void} - */ -PresenceHandler.prototype.getAll = function (callback) { - if (!this._emitter.hasListeners(C.ACTIONS.QUERY)) { - // At least one argument is required for a message to be permissionable - this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [C.ACTIONS.QUERY]) + this._flush = this._flush.bind(this) + this._flushTimeout = null + this._pendingSubscribes = {} + this._pendingUnsubscribes = {} + } + /** + * Queries for clients logged into deepstream. + * + * @param {Function} callback Will be invoked with an array of clients + * + * @public + * @returns {void} + */ + getAll (users, callback) { + if (typeof users === 'function') { + this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [C.ACTIONS.QUERY]) + this._queryEmitter.once(C.ACTIONS.QUERY, users) + } else { + const queryId = this._counter++ + this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [queryId, users]) + this._queryEmitter.once(queryId, callback) + } } - this._emitter.once(C.ACTIONS.QUERY, callback) -} -/** - * Subscribes to client logins or logouts in deepstream - * - * @param {Function} callback Will be invoked with the username of a client, - * and a boolean to indicate if it was a login or - * logout event - * @public - * @returns {void} - */ -PresenceHandler.prototype.subscribe = function (callback) { - if (callback !== undefined && typeof callback !== 'function') { - throw new Error('invalid argument callback') + /** + * Subscribes to client logins or logouts in deepstream + * + * @param {Function} callback Will be invoked with the username of a client, + * and a boolean to indicate if it was a login or + * logout event + * @public + * @returns {void} + */ + subscribe (userId, callback) { + const args = validateArguments(userId, callback, C.ACTIONS.SUBSCRIBE) + if (!this._subscribeEmitter.hasListeners(args.userId)) { + if (args.userId === C.ACTIONS.SUBSCRIBE) { + this._sendGlobalSubscription(C.ACTIONS.SUBSCRIBE) + this._subscribeEmitter.on(C.ACTIONS.SUBSCRIBE, args.callback) + } else { + delete this._pendingUnsubscribes[args.userId] + this._pendingSubscribes[args.userId] = true + if (!this._flushTimeout) { + this._flushTimeout = setTimeout(this._flush, 0) + } + this._subscribeEmitter.on(args.userId, args.callback) + } + } } - if (!this._emitter.hasListeners(C.TOPIC.PRESENCE)) { - this._ackTimeoutRegistry.add({ - topic: C.TOPIC.PRESENCE, - action: C.ACTIONS.SUBSCRIBE, - name: C.TOPIC.PRESENCE - }) - this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.SUBSCRIBE, [C.ACTIONS.SUBSCRIBE]) + /** + * Removes a callback for a specified presence event + * + * @param {Function} callback The callback to unregister via {PresenceHandler#unsubscribe} + * + * @public + * @returns {void} + */ + unsubscribe (userId, callback) { + const args = validateArguments(userId, callback, C.ACTIONS.UNSUBSCRIBE) + + if (args.userId === C.ACTIONS.UNSUBSCRIBE) { + this._subscribeEmitter.off(C.ACTIONS.SUBSCRIBE, args.callback) + } else { + this._subscribeEmitter.off(args.userId, args.callback) + } + + if (!this._subscribeEmitter.hasListeners(args.userId)) { + if (args.userId === C.ACTIONS.UNSUBSCRIBE) { + this._sendGlobalSubscription(C.ACTIONS.UNSUBSCRIBE) + } else { + delete this._pendingSubscribes[args.userId] + this._pendingUnsubscribes[args.userId] = true + if (!this._flushTimeout) { + this._flushTimeout = setTimeout(this._flush, 0) + } + } + } } - this._emitter.on(C.TOPIC.PRESENCE, callback) -} + /** + * Handles incoming messages from the server + * + * @param {Object} message parsed deepstream message + * + * @package private + * @returns {void} + */ + _$handle (message) { + if (message.action === C.ACTIONS.ERROR && message.data[0] === C.EVENT.MESSAGE_DENIED) { + this._ackTimeoutRegistry.remove(C.TOPIC.PRESENCE, message.data[1]) + message.processedError = true + this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.MESSAGE_DENIED, message.data[1]) + } else if (message.action === C.ACTIONS.ACK) { + this._ackTimeoutRegistry.clear(message) + } else if (message.action === C.ACTIONS.PRESENCE_JOIN) { + this._subscribeEmitter.emit(C.ACTIONS.SUBSCRIBE, message.data[0], true) + this._subscribeEmitter.emit(message.data[0], true, message.data[0]) + } else if (message.action === C.ACTIONS.PRESENCE_LEAVE) { + this._subscribeEmitter.emit(C.ACTIONS.SUBSCRIBE, message.data[0], false) + this._subscribeEmitter.emit(message.data[0], false, message.data[0]) + } else if (message.action === C.ACTIONS.QUERY) { + try { + const data = JSON.parse(message.data[1]) + if (typeof data === 'object') { + this._queryEmitter.emit(message.data[0], data) + return + } + } catch (e) { + // not json, old event + } + this._queryEmitter.emit(C.ACTIONS.QUERY, message.data) + } else { + this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.UNSOLICITED_MESSAGE, message.action) + } + } -/** - * Removes a callback for a specified presence event - * - * @param {Function} callback The callback to unregister via {PresenceHandler#unsubscribe} - * - * @public - * @returns {void} - */ -PresenceHandler.prototype.unsubscribe = function (callback) { - if (callback !== undefined && typeof callback !== 'function') { - throw new Error('invalid argument callback') + /** + * Resubscribes to presence subscription when connection is lost + * + * @package private + * @returns {void} + */ + _resubscribe () { + const callbacks = Object.keys(this._subscribeEmitter._callbacks || {}) + if (callbacks.indexOf(C.ACTIONS.SUBSCRIBE) > -1) { + callbacks.splice(callbacks.indexOf(C.ACTIONS.SUBSCRIBE), 1) + this._sendGlobalSubscription(C.ACTIONS.SUBSCRIBE) + } + if (callbacks.length > 0) { + this._sendSubscriptionBulk(C.ACTIONS.SUBSCRIBE, callbacks) + } } - this._emitter.off(C.TOPIC.PRESENCE, callback) + _flush () { + const pendingSubscribes = Object.keys(this._pendingSubscribes) + if (pendingSubscribes.length > 0) { + this._sendSubscriptionBulk(C.ACTIONS.SUBSCRIBE, pendingSubscribes) + this._pendingSubscribes = {} + } + const pendingUnsubscribes = Object.keys(this._pendingUnsubscribes) + if (pendingUnsubscribes.length > 0) { + this._sendSubscriptionBulk(C.ACTIONS.UNSUBSCRIBE, pendingUnsubscribes) + this._pendingUnsubscribes = {} + } + this._flushTimeout = null + } - if (!this._emitter.hasListeners(C.TOPIC.PRESENCE)) { + _sendSubscriptionBulk (action, names) { + const correlationId = this._counter++ this._ackTimeoutRegistry.add({ topic: C.TOPIC.PRESENCE, - action: C.ACTIONS.UNSUBSCRIBE, - name: C.TOPIC.PRESENCE + action, + name: correlationId }) - this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.UNSUBSCRIBE, [C.ACTIONS.UNSUBSCRIBE]) + this._connection.sendMsg(C.TOPIC.PRESENCE, action, [correlationId, names]) } -} -/** - * Handles incoming messages from the server - * - * @param {Object} message parsed deepstream message - * - * @package private - * @returns {void} - */ -PresenceHandler.prototype._$handle = function (message) { - if (message.action === C.ACTIONS.ERROR && message.data[0] === C.EVENT.MESSAGE_DENIED) { - this._ackTimeoutRegistry.remove(C.TOPIC.PRESENCE, message.data[1]) - message.processedError = true - this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.MESSAGE_DENIED, message.data[1]) - } else if (message.action === C.ACTIONS.ACK) { - this._ackTimeoutRegistry.clear(message) - } else if (message.action === C.ACTIONS.PRESENCE_JOIN) { - this._emitter.emit(C.TOPIC.PRESENCE, message.data[0], true) - } else if (message.action === C.ACTIONS.PRESENCE_LEAVE) { - this._emitter.emit(C.TOPIC.PRESENCE, message.data[0], false) - } else if (message.action === C.ACTIONS.QUERY) { - this._emitter.emit(C.ACTIONS.QUERY, message.data) - } else { - this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.UNSOLICITED_MESSAGE, message.action) - } -} - -/** - * Resubscribes to presence subscription when connection is lost - * - * @package private - * @returns {void} - */ -PresenceHandler.prototype._resubscribe = function () { - const callbacks = this._emitter._callbacks - if (callbacks && callbacks[C.TOPIC.PRESENCE]) { - this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.SUBSCRIBE, [C.ACTIONS.SUBSCRIBE]) + _sendGlobalSubscription (action) { + this._ackTimeoutRegistry.add({ + topic: C.TOPIC.PRESENCE, + action, + name: action + }) + this._connection.sendMsg(C.TOPIC.PRESENCE, action, [action]) } } - -module.exports = PresenceHandler diff --git a/test-specs/features b/test-specs/features index 5aab03364..0d956ffe8 160000 --- a/test-specs/features +++ b/test-specs/features @@ -1 +1 @@ -Subproject commit 5aab0336494ac9e2f8b79e99d9583d9760dc87fc +Subproject commit 0d956ffe83d58dd196b525d2cb03e99fca445f20 diff --git a/test-specs/steps/client/presence.js b/test-specs/steps/client/presence.js index e49a91809..1a10248cb 100644 --- a/test-specs/steps/client/presence.js +++ b/test-specs/steps/client/presence.js @@ -13,12 +13,23 @@ const subscribeCallback = sinon.spy() setTimeout(callback, config.messageWaitTime) }) + When(/^the client queries if "([^"]*)" are online$/, (users, callback) => { + global.dsClient.presence.getAll(users.split(','), queryCallback) + setTimeout(callback, config.messageWaitTime) + }) + Then(/^the client is notified that no clients are connected$/, () => { sinon.assert.calledWith(queryCallback, []) sinon.assert.calledOnce(queryCallback) queryCallback.reset() }) + Then(/^the client is notified with '(.*)'$/, (users) => { + sinon.assert.calledWith(queryCallback, JSON.parse(users)) + sinon.assert.calledOnce(queryCallback) + queryCallback.reset() + }) + Then(/^the client is notified that clients "([^"]*)" are connected$/, (clients) => { const connected_clients = clients.split(',') sinon.assert.calledWith(queryCallback, connected_clients) @@ -29,24 +40,46 @@ const subscribeCallback = sinon.spy() /** * Subscribes */ - When(/^the client subscribes to presence events$/, (callback) => { + When(/^the client subscribes to all presence events$/, (callback) => { global.dsClient.presence.subscribe(subscribeCallback) setTimeout(callback, config.messageWaitTime) }) - When(/^the client unsubscribes to presence events$/, (callback) => { + When(/^the client unsubscribes to all presence events$/, (callback) => { global.dsClient.presence.unsubscribe(subscribeCallback) setTimeout(callback, config.messageWaitTime) }) + When(/^the client subscribes to presence events for "([^"]*)"$/, (usernames, callback) => { + usernames.split(',').forEach(username => + global.dsClient.presence.subscribe(username, subscribeCallback) + ) + setTimeout(callback, config.messageWaitTime) + }) + + When(/^the client unsubscribes to presence events for "([^"]*)"$/, (usernames, callback) => { + usernames.split(',').forEach(username => + global.dsClient.presence.unsubscribe(username, subscribeCallback) + ) + setTimeout(callback, config.messageWaitTime) + }) + When(/^the client is notified that client "(\w*)" logged in$/, (username) => { - sinon.assert.calledWith(subscribeCallback, username, true) + try { + sinon.assert.calledWith(subscribeCallback, true, username) + } catch (e) { + sinon.assert.calledWith(subscribeCallback, username, true) + } sinon.assert.calledOnce(subscribeCallback) subscribeCallback.reset() }) When(/^the client is notified that client "(\w*)" logged out$/, (username) => { - sinon.assert.calledWith(subscribeCallback, username, false) + try { + sinon.assert.calledWith(subscribeCallback, false, username) + } catch (e) { + sinon.assert.calledWith(subscribeCallback, username, false) + } sinon.assert.calledOnce(subscribeCallback) subscribeCallback.reset() }) diff --git a/test-unit/unit/presence/presence-handlerDeprecatedSpec.js b/test-unit/unit/presence/presence-handlerDeprecatedSpec.js new file mode 100644 index 000000000..d8967c4a9 --- /dev/null +++ b/test-unit/unit/presence/presence-handlerDeprecatedSpec.js @@ -0,0 +1,111 @@ +'use strict' +/* global describe, it, expect, jasmine */ + +let PresenceHandler = require('../../../src/presence/presence-handler'), + connectionMock = new (require('../../mocks/message/connection-mock'))(), + mockClient = new (require('../../mocks/client-mock'))(), + msg = require('../../test-helper/test-helper').msg, + C = require('../../../src/constants/constants'), + options = {} + +describe('presence handler deprecated', () => { + let presenceHandler, + callback = jasmine.createSpy('presenceCallback') + + beforeEach(() => { + connectionMock.lastSendMessage = null + callback.calls.reset() + }) + + it('creates the presenceHandler', () => { + presenceHandler = new PresenceHandler(options, connectionMock, mockClient) + }) + + it('subscribes to presence', () => { + presenceHandler.subscribe(callback) + expect(connectionMock.lastSendMessage).toBe(msg('U|S|S+')) + }) + + it('emits an error if no ack message is received for presence subscription', (done) => { + expect(mockClient.lastError).toBe(null) + setTimeout(() => { + const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for S'] + expect(mockClient.lastError).toEqual(errorParams) + mockClient.lastError = null + done() + }, 20) + }) + + it('notified when client logs in', () => { + expect(callback).not.toHaveBeenCalled() + presenceHandler._$handle({ + topic: 'U', + action: 'PNJ', + data: ['Homer'] + }) + expect(callback).toHaveBeenCalledWith('Homer', true) + }) + + it('notified when client logs out', () => { + presenceHandler._$handle({ + topic: 'U', + action: 'PNL', + data: ['Marge'] + }) + expect(callback).toHaveBeenCalledWith('Marge', false) + }) + + it('queries for clients', () => { + presenceHandler.getAll(callback) + expect(connectionMock.lastSendMessage).toBe(msg('U|Q|Q+')) + }) + + it('receives data for query', () => { + presenceHandler._$handle({ + topic: 'U', + action: 'Q', + data: ['Marge', 'Homer', 'Bart'] + }) + expect(callback).toHaveBeenCalledWith(['Marge', 'Homer', 'Bart']) + }) + + it('unsubscribes to client logins', () => { + presenceHandler.unsubscribe(callback) + expect(connectionMock.lastSendMessage).toBe(msg('U|US|US+')) + }) + + it('emits an error if no ack message is received for presence unsubscribes', (done) => { + expect(mockClient.lastError).toBe(null) + setTimeout(() => { + const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for US'] + expect(mockClient.lastError).toEqual(errorParams) + mockClient.lastError = null + done() + }, 20) + }) + + it('not notified of future actions', () => { + expect(callback).not.toHaveBeenCalled() + presenceHandler._$handle({ + topic: 'U', + action: 'PNJ', + data: ['Homer'] + }) + expect(callback).not.toHaveBeenCalled() + + presenceHandler._$handle({ + topic: 'U', + action: 'PNL', + data: ['Homer'] + }) + expect(callback).not.toHaveBeenCalled() + + presenceHandler._$handle({ + topic: 'U', + action: 'Q', + data: ['Marge', 'Homer', 'Bart'] + }) + expect(callback).not.toHaveBeenCalled() + }) + +}) diff --git a/test-unit/unit/presence/presence-handlerSpec.js b/test-unit/unit/presence/presence-handlerSpec.js index 25aea7cc9..8463b7cc8 100644 --- a/test-unit/unit/presence/presence-handlerSpec.js +++ b/test-unit/unit/presence/presence-handlerSpec.js @@ -21,72 +21,77 @@ describe('presence handler', () => { presenceHandler = new PresenceHandler(options, connectionMock, mockClient) }) - it('subscribes to presence', () => { - presenceHandler.subscribe(callback) - expect(connectionMock.lastSendMessage).toBe(msg('U|S|S+')) + it('subscribes to presence with user a', (done) => { + presenceHandler.subscribe('userA', callback) + setTimeout(() => { + expect(connectionMock.lastSendMessage).toBe(msg('U|S|1|["userA"]+')) + done() + }, 1) }) - it('emits an error if no ack message is received for presence subscription', (done) => { + it('subscribes to presence with user b', (done) => { + presenceHandler.subscribe('userB', callback) + setTimeout(() => { + expect(connectionMock.lastSendMessage).toBe(msg('U|S|2|["userB"]+')) + done() + }, 1) + }) + + it('emits an error if no ack message is received for userB presence subscription', (done) => { expect(mockClient.lastError).toBe(null) setTimeout(() => { - const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for U'] + const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for 2'] expect(mockClient.lastError).toEqual(errorParams) mockClient.lastError = null done() }, 20) }) - xit('receives ack for subscribe', () => { - presenceHandler._$handle({ - topic: 'U', - action: 'A', - data: ['S'] - }) - expect(connectionMock.lastSendMessage).toBeNull() - }).pend('Throws unsolicitated error message since timeout has been cleared') - it('notified when client logs in', () => { expect(callback).not.toHaveBeenCalled() presenceHandler._$handle({ topic: 'U', action: 'PNJ', - data: ['Homer'] + data: ['userA'] }) - expect(callback).toHaveBeenCalledWith('Homer', true) + expect(callback).toHaveBeenCalledWith(true, 'userA') }) it('notified when client logs out', () => { presenceHandler._$handle({ topic: 'U', action: 'PNL', - data: ['Marge'] + data: ['userB'] }) - expect(callback).toHaveBeenCalledWith('Marge', false) + expect(callback).toHaveBeenCalledWith(false, 'userB') }) it('queries for clients', () => { - presenceHandler.getAll(callback) - expect(connectionMock.lastSendMessage).toBe(msg('U|Q|Q+')) + presenceHandler.getAll(['userA','userB'], callback) + expect(connectionMock.lastSendMessage).toBe(msg('U|Q|3|["userA","userB"]+')) }) it('receives data for query', () => { - presenceHandler._$handle({ + presenceHandler._$handle({ topic: 'U', action: 'Q', - data: ['Marge', 'Homer', 'Bart'] + data: [3, '{"userA": true, "userB": false }'] }) - expect(callback).toHaveBeenCalledWith(['Marge', 'Homer', 'Bart']) + expect(callback).toHaveBeenCalledWith({'userA': true, 'userB': false }) }) - it('unsubscribes to client logins', () => { - presenceHandler.unsubscribe(callback) - expect(connectionMock.lastSendMessage).toBe(msg('U|US|US+')) + it('unsubscribes to client logins', (done) => { + presenceHandler.unsubscribe('userA', callback) + setTimeout(() => { + expect(connectionMock.lastSendMessage).toBe(msg('U|US|4|["userA"]+')) + done() + }, 1) }) it('emits an error if no ack message is received for presence unsubscribes', (done) => { expect(mockClient.lastError).toBe(null) setTimeout(() => { - const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for U'] + const errorParams = ['U', 'ACK_TIMEOUT', 'No ACK message received in time for 4'] expect(mockClient.lastError).toEqual(errorParams) mockClient.lastError = null done() @@ -118,10 +123,10 @@ describe('presence handler', () => { }) expect(callback).not.toHaveBeenCalled() - presenceHandler._$handle({ + presenceHandler._$handle({ topic: 'U', action: 'Q', - data: ['Marge', 'Homer', 'Bart'] + data: [1, {'userA': true, 'userB': false }] }) expect(callback).not.toHaveBeenCalled() })