Skip to content

Commit

Permalink
Merge pull request #400 from deepstreamIO/presence-2.0
Browse files Browse the repository at this point in the history
feat: individual presence awareness
  • Loading branch information
yasserf authored Aug 30, 2017
2 parents b79f666 + 0e32050 commit 0a2f8be
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 132 deletions.
273 changes: 175 additions & 98 deletions src/presence/presence-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ const EventEmitter = require('component-emitter2')
const C = require('../constants/constants')
const ResubscribeNotifier = require('../utils/resubscribe-notifier')

function validateArguments (userId, callback, defaultAction) {
if (typeof userId === 'function' && callback === undefined) {
callback = userId // eslint-disable-line
userId = defaultAction // eslint-disable-line
} else {
userId = [userId] // eslint-disable-line
}

if (callback !== undefined && typeof callback !== 'function') {
throw new Error('invalid argument callback')
}

return { userId, callback }
}

/**
* The main class for presence in deepstream
*
Expand All @@ -17,119 +32,181 @@ const ResubscribeNotifier = require('../utils/resubscribe-notifier')
* @constructor
* @public
*/
const PresenceHandler = function (options, connection, client) {
this._options = options
this._connection = connection
this._client = client
this._emitter = new EventEmitter()
this._ackTimeoutRegistry = client._$getAckTimeoutRegistry()
this._resubscribeNotifier = new ResubscribeNotifier(this._client, this._resubscribe.bind(this))
}
module.exports = class PresenceHandler {
constructor (options, connection, client) {
this._options = options
this._connection = connection
this._client = client
this._queryEmitter = new EventEmitter()
this._subscribeEmitter = new EventEmitter()
this._ackTimeoutRegistry = client._$getAckTimeoutRegistry()
this._resubscribeNotifier = new ResubscribeNotifier(this._client, this._resubscribe.bind(this))
this._counter = 1

/**
* Queries for clients logged into deepstream.
*
* @param {Function} callback Will be invoked with an array of clients
*
* @public
* @returns {void}
*/
PresenceHandler.prototype.getAll = function (callback) {
if (!this._emitter.hasListeners(C.ACTIONS.QUERY)) {
// At least one argument is required for a message to be permissionable
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [C.ACTIONS.QUERY])
this._flush = this._flush.bind(this)
this._flushTimeout = null
this._pendingSubscribes = {}
this._pendingUnsubscribes = {}
}
/**
* Queries for clients logged into deepstream.
*
* @param {Function} callback Will be invoked with an array of clients
*
* @public
* @returns {void}
*/
getAll (users, callback) {
if (typeof users === 'function') {
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [C.ACTIONS.QUERY])
this._queryEmitter.once(C.ACTIONS.QUERY, users)
} else {
const queryId = this._counter++
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.QUERY, [queryId, users])
this._queryEmitter.once(queryId, callback)
}
}
this._emitter.once(C.ACTIONS.QUERY, callback)
}

/**
* Subscribes to client logins or logouts in deepstream
*
* @param {Function} callback Will be invoked with the username of a client,
* and a boolean to indicate if it was a login or
* logout event
* @public
* @returns {void}
*/
PresenceHandler.prototype.subscribe = function (callback) {
if (callback !== undefined && typeof callback !== 'function') {
throw new Error('invalid argument callback')
/**
* Subscribes to client logins or logouts in deepstream
*
* @param {Function} callback Will be invoked with the username of a client,
* and a boolean to indicate if it was a login or
* logout event
* @public
* @returns {void}
*/
subscribe (userId, callback) {
const args = validateArguments(userId, callback, C.ACTIONS.SUBSCRIBE)
if (!this._subscribeEmitter.hasListeners(args.userId)) {
if (args.userId === C.ACTIONS.SUBSCRIBE) {
this._sendGlobalSubscription(C.ACTIONS.SUBSCRIBE)
this._subscribeEmitter.on(C.ACTIONS.SUBSCRIBE, args.callback)
} else {
delete this._pendingUnsubscribes[args.userId]
this._pendingSubscribes[args.userId] = true
if (!this._flushTimeout) {
this._flushTimeout = setTimeout(this._flush, 0)
}
this._subscribeEmitter.on(args.userId, args.callback)
}
}
}

if (!this._emitter.hasListeners(C.TOPIC.PRESENCE)) {
this._ackTimeoutRegistry.add({
topic: C.TOPIC.PRESENCE,
action: C.ACTIONS.SUBSCRIBE,
name: C.TOPIC.PRESENCE
})
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.SUBSCRIBE, [C.ACTIONS.SUBSCRIBE])
/**
* Removes a callback for a specified presence event
*
* @param {Function} callback The callback to unregister via {PresenceHandler#unsubscribe}
*
* @public
* @returns {void}
*/
unsubscribe (userId, callback) {
const args = validateArguments(userId, callback, C.ACTIONS.UNSUBSCRIBE)

if (args.userId === C.ACTIONS.UNSUBSCRIBE) {
this._subscribeEmitter.off(C.ACTIONS.SUBSCRIBE, args.callback)
} else {
this._subscribeEmitter.off(args.userId, args.callback)
}

if (!this._subscribeEmitter.hasListeners(args.userId)) {
if (args.userId === C.ACTIONS.UNSUBSCRIBE) {
this._sendGlobalSubscription(C.ACTIONS.UNSUBSCRIBE)
} else {
delete this._pendingSubscribes[args.userId]
this._pendingUnsubscribes[args.userId] = true
if (!this._flushTimeout) {
this._flushTimeout = setTimeout(this._flush, 0)
}
}
}
}

this._emitter.on(C.TOPIC.PRESENCE, callback)
}
/**
* Handles incoming messages from the server
*
* @param {Object} message parsed deepstream message
*
* @package private
* @returns {void}
*/
_$handle (message) {
if (message.action === C.ACTIONS.ERROR && message.data[0] === C.EVENT.MESSAGE_DENIED) {
this._ackTimeoutRegistry.remove(C.TOPIC.PRESENCE, message.data[1])
message.processedError = true
this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.MESSAGE_DENIED, message.data[1])
} else if (message.action === C.ACTIONS.ACK) {
this._ackTimeoutRegistry.clear(message)
} else if (message.action === C.ACTIONS.PRESENCE_JOIN) {
this._subscribeEmitter.emit(C.ACTIONS.SUBSCRIBE, message.data[0], true)
this._subscribeEmitter.emit(message.data[0], true, message.data[0])
} else if (message.action === C.ACTIONS.PRESENCE_LEAVE) {
this._subscribeEmitter.emit(C.ACTIONS.SUBSCRIBE, message.data[0], false)
this._subscribeEmitter.emit(message.data[0], false, message.data[0])
} else if (message.action === C.ACTIONS.QUERY) {
try {
const data = JSON.parse(message.data[1])
if (typeof data === 'object') {
this._queryEmitter.emit(message.data[0], data)
return
}
} catch (e) {
// not json, old event
}
this._queryEmitter.emit(C.ACTIONS.QUERY, message.data)
} else {
this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.UNSOLICITED_MESSAGE, message.action)
}
}

/**
* Removes a callback for a specified presence event
*
* @param {Function} callback The callback to unregister via {PresenceHandler#unsubscribe}
*
* @public
* @returns {void}
*/
PresenceHandler.prototype.unsubscribe = function (callback) {
if (callback !== undefined && typeof callback !== 'function') {
throw new Error('invalid argument callback')
/**
* Resubscribes to presence subscription when connection is lost
*
* @package private
* @returns {void}
*/
_resubscribe () {
const callbacks = Object.keys(this._subscribeEmitter._callbacks || {})
if (callbacks.indexOf(C.ACTIONS.SUBSCRIBE) > -1) {
callbacks.splice(callbacks.indexOf(C.ACTIONS.SUBSCRIBE), 1)
this._sendGlobalSubscription(C.ACTIONS.SUBSCRIBE)
}
if (callbacks.length > 0) {
this._sendSubscriptionBulk(C.ACTIONS.SUBSCRIBE, callbacks)
}
}

this._emitter.off(C.TOPIC.PRESENCE, callback)
_flush () {
const pendingSubscribes = Object.keys(this._pendingSubscribes)
if (pendingSubscribes.length > 0) {
this._sendSubscriptionBulk(C.ACTIONS.SUBSCRIBE, pendingSubscribes)
this._pendingSubscribes = {}
}
const pendingUnsubscribes = Object.keys(this._pendingUnsubscribes)
if (pendingUnsubscribes.length > 0) {
this._sendSubscriptionBulk(C.ACTIONS.UNSUBSCRIBE, pendingUnsubscribes)
this._pendingUnsubscribes = {}
}
this._flushTimeout = null
}

if (!this._emitter.hasListeners(C.TOPIC.PRESENCE)) {
_sendSubscriptionBulk (action, names) {
const correlationId = this._counter++
this._ackTimeoutRegistry.add({
topic: C.TOPIC.PRESENCE,
action: C.ACTIONS.UNSUBSCRIBE,
name: C.TOPIC.PRESENCE
action,
name: correlationId
})
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.UNSUBSCRIBE, [C.ACTIONS.UNSUBSCRIBE])
this._connection.sendMsg(C.TOPIC.PRESENCE, action, [correlationId, names])
}
}

/**
* Handles incoming messages from the server
*
* @param {Object} message parsed deepstream message
*
* @package private
* @returns {void}
*/
PresenceHandler.prototype._$handle = function (message) {
if (message.action === C.ACTIONS.ERROR && message.data[0] === C.EVENT.MESSAGE_DENIED) {
this._ackTimeoutRegistry.remove(C.TOPIC.PRESENCE, message.data[1])
message.processedError = true
this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.MESSAGE_DENIED, message.data[1])
} else if (message.action === C.ACTIONS.ACK) {
this._ackTimeoutRegistry.clear(message)
} else if (message.action === C.ACTIONS.PRESENCE_JOIN) {
this._emitter.emit(C.TOPIC.PRESENCE, message.data[0], true)
} else if (message.action === C.ACTIONS.PRESENCE_LEAVE) {
this._emitter.emit(C.TOPIC.PRESENCE, message.data[0], false)
} else if (message.action === C.ACTIONS.QUERY) {
this._emitter.emit(C.ACTIONS.QUERY, message.data)
} else {
this._client._$onError(C.TOPIC.PRESENCE, C.EVENT.UNSOLICITED_MESSAGE, message.action)
}
}

/**
* Resubscribes to presence subscription when connection is lost
*
* @package private
* @returns {void}
*/
PresenceHandler.prototype._resubscribe = function () {
const callbacks = this._emitter._callbacks
if (callbacks && callbacks[C.TOPIC.PRESENCE]) {
this._connection.sendMsg(C.TOPIC.PRESENCE, C.ACTIONS.SUBSCRIBE, [C.ACTIONS.SUBSCRIBE])
_sendGlobalSubscription (action) {
this._ackTimeoutRegistry.add({
topic: C.TOPIC.PRESENCE,
action,
name: action
})
this._connection.sendMsg(C.TOPIC.PRESENCE, action, [action])
}
}

module.exports = PresenceHandler
41 changes: 37 additions & 4 deletions test-specs/steps/client/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@ const subscribeCallback = sinon.spy()
setTimeout(callback, config.messageWaitTime)
})

When(/^the client queries if "([^"]*)" are online$/, (users, callback) => {
global.dsClient.presence.getAll(users.split(','), queryCallback)
setTimeout(callback, config.messageWaitTime)
})

Then(/^the client is notified that no clients are connected$/, () => {
sinon.assert.calledWith(queryCallback, [])
sinon.assert.calledOnce(queryCallback)
queryCallback.reset()
})

Then(/^the client is notified with '(.*)'$/, (users) => {
sinon.assert.calledWith(queryCallback, JSON.parse(users))
sinon.assert.calledOnce(queryCallback)
queryCallback.reset()
})

Then(/^the client is notified that clients "([^"]*)" are connected$/, (clients) => {
const connected_clients = clients.split(',')
sinon.assert.calledWith(queryCallback, connected_clients)
Expand All @@ -34,24 +45,46 @@ const subscribeCallback = sinon.spy()
/**
* Subscribes
*/
When(/^the client subscribes to presence events$/, (callback) => {
When(/^the client subscribes to all presence events$/, (callback) => {
global.dsClient.presence.subscribe(subscribeCallback)
setTimeout(callback, config.messageWaitTime)
})

When(/^the client unsubscribes to presence events$/, (callback) => {
When(/^the client unsubscribes to all presence events$/, (callback) => {
global.dsClient.presence.unsubscribe(subscribeCallback)
setTimeout(callback, config.messageWaitTime)
})

When(/^the client subscribes to presence events for "([^"]*)"$/, (usernames, callback) => {
usernames.split(',').forEach(username =>
global.dsClient.presence.subscribe(username, subscribeCallback)
)
setTimeout(callback, config.messageWaitTime)
})

When(/^the client unsubscribes to presence events for "([^"]*)"$/, (usernames, callback) => {
usernames.split(',').forEach(username =>
global.dsClient.presence.unsubscribe(username, subscribeCallback)
)
setTimeout(callback, config.messageWaitTime)
})

When(/^the client is notified that client "(\w*)" logged in$/, (username) => {
sinon.assert.calledWith(subscribeCallback, username, true)
try {
sinon.assert.calledWith(subscribeCallback, true, username)
} catch (e) {
sinon.assert.calledWith(subscribeCallback, username, true)
}
sinon.assert.calledOnce(subscribeCallback)
subscribeCallback.reset()
})

When(/^the client is notified that client "(\w*)" logged out$/, (username) => {
sinon.assert.calledWith(subscribeCallback, username, false)
try {
sinon.assert.calledWith(subscribeCallback, false, username)
} catch (e) {
sinon.assert.calledWith(subscribeCallback, username, false)
}
sinon.assert.calledOnce(subscribeCallback)
subscribeCallback.reset()
})
Expand Down
Loading

0 comments on commit 0a2f8be

Please sign in to comment.