diff --git a/index.js b/index.js index cb1a12e..b64dd5a 100644 --- a/index.js +++ b/index.js @@ -4,7 +4,7 @@ var _ = require('underscore'); exports.setup = function(server, client, options) { var self = this; - self.engine = options.engine || RedisEngine.create(options.servers); + self.engine = options.engine || RedisEngine.create(server, options.servers); self.channelRe = options.channelRe || /^\/presence\//; server.addExtension({ diff --git a/lib/redis_engine.js b/lib/redis_engine.js index e79d6d0..6039e03 100644 --- a/lib/redis_engine.js +++ b/lib/redis_engine.js @@ -1,26 +1,75 @@ var ShardManager = require('shard-manager'); var redis = require('redis'); +var Shavaluator = require('redis-evalsha'); var _ = require('underscore'); -var RedisEngine = function(addresses) { +var RedisEngine = function(server, addresses) { var shards = _.map(addresses, function(address, index) { var components = address.split(':'); var hostname = components[0]; var port = components[1]; + var client = redis.createClient(port, hostname); + var evalsha = new Shavaluator(client); + + /* + ARGV[1] = clientId + ARGV[2] = data + ARGV[3] = presenceId + ARGV[4] = currentDate + KEYS[1] = data + KEYS[2] = pid + KEYS[3] = cids + KEYS[4] = pids + */ + evalsha.add('subscribe_presence', ` + local exists = redis.call('EXISTS', KEYS[1]) + + redis.call('SET', KEYS[1], ARGV[2]) + redis.call('SET', KEYS[2], ARGV[3]) + redis.call('ZADD', KEYS[3], ARGV[4], ARGV[1]) + redis.call('ZADD', KEYS[4], ARGV[4], ARGV[3]) + + return exists + `); + + /* + ARGV[1] = clientId + ARGV[2] = presenceId + KEYS[1] = data + KEYS[2] = pid + KEYS[3] = cids + KEYS[4] = pids + */ + evalsha.add('unsubscribe_presence', ` + redis.call('DEL', KEYS[2]) + redis.call('ZREM', KEYS[3], ARGV[1]) + + if redis.call('EXISTS', KEYS[3]) == 1 then + return 1 + end + + redis.call('ZREM', KEYS[4], ARGV[2]) + redis.call('DEL', KEYS[1]) + + return 0 + `); + return { shardName: 'node' + index, shard: { - redis: redis.createClient(port, hostname) + evalsha: evalsha, + redis: client } }; }); + this._server = server._server; this._shardManager = new ShardManager(shards); } -RedisEngine.create = function(server, options) { - return new RedisEngine(server, options); +RedisEngine.create = function(server, addresses, options) { + return new RedisEngine(server, addresses, options); } function key() { @@ -64,29 +113,50 @@ RedisEngine.prototype = { // ZADD cids:$channel:$presenceId $now $clientId // ZADD pids:$channel $now $presenceId var self = this; - var client = self._shardManager.getShard(channel).redis; - var isNew = true; + self._server.debug('Registering client ? for presence channel ? ?', clientId, channel, presenceId); - client.multi() - .exists(key('data', channel, presenceId), function(err, exists) { - if (exists == 1) { - isNew = false; - } - }) - .set(key('data', channel, presenceId), data) - .set(key('pid', channel, clientId), presenceId) - .zadd(key('cids', channel, presenceId), Date.now(), clientId) - .zadd(key('pids', channel), Date.now(), presenceId) - .exec(function(err) { - if (err) { - console.log('redis error: ', err); - done(err); - return; - } + self._server._engine.clientExists(clientId, function (existsResult) { + if(!existsResult) { + self._server.debug('Will not register client ? for presence channel ? ?', clientId, channel, presenceId); + + return; + } - done(null, isNew); + self._shardManager.getShard(channel).evalsha.exec('subscribe_presence', [ + key('data', channel, presenceId), + key('pid', channel, clientId), + key('cids', channel, presenceId), + key('pids', channel), + ], [ + clientId, + data, + presenceId, + Date.now(), + ], function (err, results) { + self._server._engine.clientExists(clientId, function (existsResult) { + if(!existsResult) { + self._server.debug('Client ? disappeared while registering for presence channel ? ?', clientId, channel, presenceId); + + self.unsubscribe(channel, clientId); + + done(null, false); + + return; + } else if(results === 1) { + self._server.debug('Registered new client ? for presence channel ? ? ?', clientId, channel, presenceId); + + done(null, false); + } else if(results === 0) { + self._server.debug('Registered initial client ? for presence channel ? ? ?', clientId, channel, presenceId); + + done(null, true); + } else { + self._server.debug('Failed to register client ? for presence channel ? ? ? ?', clientId, channel, presenceId, err, results); + } + }); }); + }); }, getSubscribers: function(channel, done) { @@ -203,51 +273,35 @@ RedisEngine.prototype = { var client = self._shardManager.getShard(channel).redis; client.get(key('pid', channel, clientId), function(err, presenceId) { - if (err) { - console.log('error loading client presenceId', err); - done(err); + if(presenceId === null) { + self._server.debug('Will not deregister client ? for presence channel ? ?', clientId, channel, presenceId); + return; } - var isLast = false; - - client.multi() - .del(key('pid', channel, clientId), channel) - .zrem(key('cids', channel, presenceId), clientId) - .exists(key('cids', channel, presenceId), function(err, exists) { - if (err) { - console.log('error removing client subscription', err); - return; - } - - if (exists == 0) { - isLast = true; - } - }) - .exec(function(err) { - if (err) { - console.log('error removing client subscription', err); - done(err); - return; - } - - if (!isLast) { - done(null, presenceId, false); - } else { - client.multi() - .zrem(key('pids', channel), presenceId) - .del(key('data', channel, presenceId)) - .exec(function(err) { - if (err) { - console.log('error removing client subscription', err); - done(err); - return; - } - - done(null, presenceId, true); - }); - } - }); + self._server.debug('Deregistering client ? for presence channel ? ?', clientId, channel, presenceId); + + self._shardManager.getShard(channel).evalsha.exec('unsubscribe_presence', [ + key('data', channel, presenceId), + key('pid', channel, clientId), + key('cids', channel, presenceId), + key('pids', channel), + ], [ + clientId, + presenceId, + ], function (err, results) { + if(results === 1) { + self._server.debug('Deregistered client ? for presence channel ? ?', clientId, channel, presenceId); + + if(done) done(null, presenceId, false); + } else if(results === 0) { + self._server.debug('Deregistered final client ? for presence channel ? ?', clientId, channel, presenceId); + + if(done) done(null, presenceId, true); + } else { + self._server.debug('Failed to deregister client ? for presence channel ? ? ? ?', clientId, channel, presenceId, err, results); + } + }); }); } };