From 37d0c491b26ced37208f4f68459c0fd8634de3a2 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Thu, 18 Jan 2018 15:53:08 -0800 Subject: [PATCH 1/8] gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 088da81..445e068 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ *.swp /node_modules + +#webstorm +.idea \ No newline at end of file From 877f808f22ed4603140679f065bf536c6261b0b7 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 12:14:01 -0800 Subject: [PATCH 2/8] change to new circuit breaker --- ShardedRedisClient.js | 61 ++++++++++++------------------------------- lib/WrappedClient.js | 22 +++++++++++++--- package.json | 2 ++ 3 files changed, 37 insertions(+), 48 deletions(-) diff --git a/ShardedRedisClient.js b/ShardedRedisClient.js index 4f0cd2c..de675de 100644 --- a/ShardedRedisClient.js +++ b/ShardedRedisClient.js @@ -254,7 +254,7 @@ class ShardedRedisClient extends EventEmitter { }); } -}; +} SHARDABLE.forEach((cmd) => { @@ -280,59 +280,30 @@ SHARDABLE.forEach((cmd) => { mainCb = _.once(mainCb); - const isReadCmd = READ_ONLY.indexOf(cmd) !== -1; - const timeout = isReadCmd ? this._readTimeout : this._writeTimeout; + // const isReadCmd = READ_ONLY.indexOf(cmd) !== -1; + // const timeout = isReadCmd ? this._readTimeout : this._writeTimeout; const _this = this; - let timeoutHandler = null; let breaker = client._breaker; - const timeoutCb = args[args.length - 1] = function (err) { - if (timeoutHandler) - clearTimeout(timeoutHandler); - - if (!err) { - if (breaker) - breaker.pass(); - - return mainCb.apply(this, arguments); - } - - // For now, both emit and log. Eventually, logging will be removed - _this.emit('err', new Error(`sharded-redis-client [${client.address}] err: ${err}`)); - console.error(new Date().toISOString(), `sharded-redis-client [${client.address }] err: ${err}`); + execute(); - if (breaker && err.message !== 'breaker open') - breaker.fail(); + function execute() { + breaker.exec(cmd, args).then(result => mainCb(null, result)).catch((err) => { + if(!client._isMaster) { + client = wrappedClient.slaves.next(client); - if (!client._isMaster) { - client = wrappedClient.slaves.next(client); + if (client._rrindex == startIndex) + client = findMasterClient(shardKey, _this._wrappedClients); - if (client._rrindex == startIndex) - client = findMasterClient(shardKey, _this._wrappedClients); + breaker = client._breaker; + execute(); + } - breaker = client._breaker; - - return wrappedCmd(client, args); - } - - mainCb.apply(this, arguments); - }; - - wrappedCmd(client, args); - - function wrappedCmd(client, args) { - if (!breaker || breaker.closed()) { - // Intentionally don't do this if timeout was set to 0 - if (timeout) - timeoutHandler = setTimeout(timeoutCb, timeout, new Error('Redis call timed out')); - - return client[cmd].apply(client, args); - } - - timeoutCb(new Error('breaker open')); + mainCb(err); + }); } - } + }; ShardedRedisClient.prototype[cmd] = function () { const args = arguments; diff --git a/lib/WrappedClient.js b/lib/WrappedClient.js index 4fc7e90..c6f5bb1 100644 --- a/lib/WrappedClient.js +++ b/lib/WrappedClient.js @@ -19,8 +19,9 @@ const _ = require('lodash'); const redis = require('redis'); const EventEmitter = require('events').EventEmitter; -const CircuitBreaker = require('circuit-breaker'); const RoundRobinSet = require('./RoundRobinSet'); +const Brakes = require('brakes'); +const Promise = require('bluebird'); function createClient(port, host, options) { const client = redis.createClient(port, host, options.redisOptions); @@ -30,8 +31,23 @@ function createClient(port, host, options) { if (options.usePing) setTimeout(() => setInterval(() => client.ping(_.noop), interval), timeout); - if (options.breakerConfig) - client._breaker = new CircuitBreaker(options.breakerConfig); + if (options.breakerConfig) { + const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => { + args = _.cloneDeep(args); + args[length - 1] = (err, result) => { + if(err) + return reject(err); + return resolve(result); + }; + + return client[cmd].apply(client, args); + }), options.breakerConfig); + + brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`)); + brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`)); + + client._breaker = brake; + } client.on('error', (e) => console.log(`Redis Error [${host}:${port}]: ${e} : ${new Date().toISOString()}`)); client.on('end', (e) => console.log(`Redis End [${host}:${port}]: ${e} : ${new Date().toISOString()}`)); diff --git a/package.json b/package.json index 432b911..5603857 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,8 @@ "homepage": "https://github.com/Tinder/sharded-redis-client", "dependencies": { "async": "2.6.0", + "bluebird": "3.5.1", + "brakes": "2.6.0", "circuit-breaker": "git://github.com/Tinder/circuit-breaker", "hiredis": "0.5.0", "lodash": "4.17.4", From fe616b930edca11cf83c916bf946d0853b29799d Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 12:26:32 -0800 Subject: [PATCH 3/8] remove --- ShardedRedisClient.js | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/ShardedRedisClient.js b/ShardedRedisClient.js index de675de..27c73b3 100644 --- a/ShardedRedisClient.js +++ b/ShardedRedisClient.js @@ -280,28 +280,27 @@ SHARDABLE.forEach((cmd) => { mainCb = _.once(mainCb); - // const isReadCmd = READ_ONLY.indexOf(cmd) !== -1; - // const timeout = isReadCmd ? this._readTimeout : this._writeTimeout; - const _this = this; let breaker = client._breaker; execute(); function execute() { - breaker.exec(cmd, args).then(result => mainCb(null, result)).catch((err) => { - if(!client._isMaster) { - client = wrappedClient.slaves.next(client); + breaker.exec(cmd, args) + .then(result => mainCb(null, result)) + .catch((err) => { + if(!client._isMaster) { + client = wrappedClient.slaves.next(client); - if (client._rrindex == startIndex) - client = findMasterClient(shardKey, _this._wrappedClients); + if (client._rrindex == startIndex) + client = findMasterClient(shardKey, _this._wrappedClients); - breaker = client._breaker; - execute(); - } + breaker = client._breaker; + return execute(); + } - mainCb(err); - }); + mainCb(err); + }); } }; From e25b01c6f7658aa832f8ef77a50522f60caf67f1 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 12:51:42 -0800 Subject: [PATCH 4/8] emit and log --- ShardedRedisClient.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ShardedRedisClient.js b/ShardedRedisClient.js index 27c73b3..1b235cd 100644 --- a/ShardedRedisClient.js +++ b/ShardedRedisClient.js @@ -289,6 +289,9 @@ SHARDABLE.forEach((cmd) => { breaker.exec(cmd, args) .then(result => mainCb(null, result)) .catch((err) => { + _this.emit('err', new Error(`sharded-redis-client [${client.address}] err: ${err}`)); + console.error(new Date().toISOString(), `sharded-redis-client [${client.address }] err: ${err}`); + if(!client._isMaster) { client = wrappedClient.slaves.next(client); From a0524a83282e3d7bcdb17439d7b4266fcb889682 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 13:08:58 -0800 Subject: [PATCH 5/8] brake config defaults and snapshot event --- lib/WrappedClient.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/WrappedClient.js b/lib/WrappedClient.js index c6f5bb1..8640675 100644 --- a/lib/WrappedClient.js +++ b/lib/WrappedClient.js @@ -32,6 +32,13 @@ function createClient(port, host, options) { setTimeout(() => setInterval(() => client.ping(_.noop), interval), timeout); if (options.breakerConfig) { + options.breakerConfig = _.defaults(options.breakerConfig, { + statInterval: 2500, + threshold: 0.5, + circuitDuration: 15000, + timeout: 5000 + }); + const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => { args = _.cloneDeep(args); args[length - 1] = (err, result) => { @@ -43,6 +50,7 @@ function createClient(port, host, options) { return client[cmd].apply(client, args); }), options.breakerConfig); + brake.on('snapshot', (snapshot) => console.log(`Snapshot ${snapshot} received`)); brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`)); brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`)); From 8c11f7b57b7c66b618ce22eec88fd59e0a156a65 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 15:20:57 -0800 Subject: [PATCH 6/8] fixed unit tests in breaker --- ShardedRedisClient.js | 2 +- lib/WrappedClient.js | 8 +++++++- test/breaker.test.js | 29 ++++++++++++----------------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/ShardedRedisClient.js b/ShardedRedisClient.js index 1b235cd..63ca287 100644 --- a/ShardedRedisClient.js +++ b/ShardedRedisClient.js @@ -302,7 +302,7 @@ SHARDABLE.forEach((cmd) => { return execute(); } - mainCb(err); + return mainCb(err); }); } }; diff --git a/lib/WrappedClient.js b/lib/WrappedClient.js index 8640675..15759b1 100644 --- a/lib/WrappedClient.js +++ b/lib/WrappedClient.js @@ -40,8 +40,14 @@ function createClient(port, host, options) { }); const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => { + // convert args to array args = _.cloneDeep(args); - args[length - 1] = (err, result) => { + if(!args.length) { + args.length = Object.keys(args).length; + } + args = Array.from(args); + // modify callback + args[args.length - 1] = (err, result) => { if(err) return reject(err); return resolve(result); diff --git a/test/breaker.test.js b/test/breaker.test.js index c182420..2f2c0e6 100644 --- a/test/breaker.test.js +++ b/test/breaker.test.js @@ -30,48 +30,42 @@ describe('Test breakers', () => { const redisHosts = global.generateRedisHosts(numMasterHosts); const options = { breakerConfig: { - failure_rate: 0.5, - failure_count: 10, - reset_timeout: 30000 + threshold: 0.5, + circuitDuration: 15000, + timeout: 5000 } }; const shardedClient = new ShardedRedis(redisHosts, options); - const mockedClient = shardedClient._getWrappedClient(key).get(); - spyOn(mockedClient._breaker, 'pass').and.callThrough(); spyOn(MockRedisClient.prototype, 'get').and.callThrough(); shardedClient.get(key, (err) => { - expect(err).toBeUndefined(); + expect(err).toBe(null); expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1); - expect(mockedClient._breaker.pass).toHaveBeenCalledTimes(1); done(); }); }); - it('should tell the breaker about a failed redis command', function (done) { + it('should receive error when redis command fails', function (done) { const key = 'key'; const numMasterHosts = 1; const redisHosts = global.generateRedisHosts(numMasterHosts); const options = { breakerConfig: { - failure_rate: 0.5, - failure_count: 10, - reset_timeout: 30000 + threshold: 0.5, + circuitDuration: 15000, + timeout: 5000 } }; const shardedClient = new ShardedRedis(redisHosts, options); - const mockedClient = shardedClient._getWrappedClient(key).get(); - spyOn(mockedClient._breaker, 'fail').and.callThrough(); spyOn(MockRedisClient.prototype, 'get').and.callFake((key, cb) => cb(new Error('an error from the redis client'))); shardedClient.get(key, (err) => { expect(err instanceof Error).toBeTrue(); expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1); - expect(mockedClient._breaker.fail).toHaveBeenCalledTimes(1); done(); }); }); @@ -82,19 +76,20 @@ describe('Test breakers', () => { const redisHosts = global.generateRedisHosts(numMasterHosts); const options = { breakerConfig: { - failure_count: -1 + threshold: 0.5, + circuitDuration: 15000, + timeout: 5000 } }; const shardedClient = new ShardedRedis(redisHosts, options); const mockedClient = shardedClient._getWrappedClient(key).get(); - mockedClient._breaker.trip(); + mockedClient._breaker._open(); spyOn(MockRedisClient.prototype, 'get').and.callThrough(); shardedClient.get(key, (err) => { expect(err instanceof Error).toBeTrue(); - expect(err.message).toBe('breaker open'); expect(MockRedisClient.prototype.get).not.toHaveBeenCalled(); done(); }); From b1254daa1c691eb8b86e2d6fdebcac70acaf60ad Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Mon, 22 Jan 2018 19:30:11 -0800 Subject: [PATCH 7/8] fixed unit tests in timeout.test.js --- ShardedRedisClient.js | 3 ++- lib/WrappedClient.js | 61 ++++++++++++++++++++++--------------------- test/breaker.test.js | 3 ++- test/timeouts.test.js | 41 ++++++++++++++++------------- 4 files changed, 58 insertions(+), 50 deletions(-) diff --git a/ShardedRedisClient.js b/ShardedRedisClient.js index 63ca287..ca7b37e 100644 --- a/ShardedRedisClient.js +++ b/ShardedRedisClient.js @@ -295,8 +295,9 @@ SHARDABLE.forEach((cmd) => { if(!client._isMaster) { client = wrappedClient.slaves.next(client); - if (client._rrindex == startIndex) + if (client._rrindex == startIndex) { client = findMasterClient(shardKey, _this._wrappedClients); + } breaker = client._breaker; return execute(); diff --git a/lib/WrappedClient.js b/lib/WrappedClient.js index 15759b1..cbf199d 100644 --- a/lib/WrappedClient.js +++ b/lib/WrappedClient.js @@ -31,37 +31,38 @@ function createClient(port, host, options) { if (options.usePing) setTimeout(() => setInterval(() => client.ping(_.noop), interval), timeout); - if (options.breakerConfig) { - options.breakerConfig = _.defaults(options.breakerConfig, { - statInterval: 2500, - threshold: 0.5, - circuitDuration: 15000, - timeout: 5000 - }); - - const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => { - // convert args to array - args = _.cloneDeep(args); - if(!args.length) { - args.length = Object.keys(args).length; + /*** Circuit Breaker Config ***/ + options.breakerConfig = _.defaults(options.breakerConfig, { + statInterval: 2500, + threshold: 0, // the breaker will be always closed + circuitDuration: 15000, + timeout: 5000 + }); + + const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => { + // convert args to array + args = _.cloneDeep(args); + if(!args.length) { + args.length = Object.keys(args).length; + } + args = Array.from(args); + // modify callback + args[args.length - 1] = (err, result) => { + if(err) { + return reject(err); } - args = Array.from(args); - // modify callback - args[args.length - 1] = (err, result) => { - if(err) - return reject(err); - return resolve(result); - }; - - return client[cmd].apply(client, args); - }), options.breakerConfig); - - brake.on('snapshot', (snapshot) => console.log(`Snapshot ${snapshot} received`)); - brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`)); - brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`)); - - client._breaker = brake; - } + return resolve(result); + }; + + return client[cmd].apply(client, args); + }), options.breakerConfig); + + // brake.on('snapshot', (snapshot) => console.log(`Snapshot ${JSON.stringify(snapshot)} received`)); + brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`)); + brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`)); + /*** Circuit Breaker Config ***/ + + client._breaker = brake; client.on('error', (e) => console.log(`Redis Error [${host}:${port}]: ${e} : ${new Date().toISOString()}`)); client.on('end', (e) => console.log(`Redis End [${host}:${port}]: ${e} : ${new Date().toISOString()}`)); diff --git a/test/breaker.test.js b/test/breaker.test.js index 2f2c0e6..ab9e632 100644 --- a/test/breaker.test.js +++ b/test/breaker.test.js @@ -84,7 +84,8 @@ describe('Test breakers', () => { const shardedClient = new ShardedRedis(redisHosts, options); const mockedClient = shardedClient._getWrappedClient(key).get(); - + + // manually open it first mockedClient._breaker._open(); spyOn(MockRedisClient.prototype, 'get').and.callThrough(); diff --git a/test/timeouts.test.js b/test/timeouts.test.js index b0f925a..1c7b494 100644 --- a/test/timeouts.test.js +++ b/test/timeouts.test.js @@ -25,13 +25,14 @@ describe('Test timeouts', function () { const WrappedClient = proxyquire('../lib/WrappedClient', { redis: MockRedisClient }); const ShardedRedis = proxyquire('../ShardedRedisClient', { './lib/WrappedClient': WrappedClient }); - beforeAll(() => jasmine.clock().install()); - afterAll(() => jasmine.clock().uninstall()); - it('should cb with an error on a read if redis is not responding if read timeouts enabled', function (done) { const key = 'key'; const numMasterHosts = 1; - const options = { readTimeout: 500 }; + const options = { + breakerConfig: { + timeout: 500 + } + }; const redisHosts = global.generateRedisHosts(numMasterHosts); const shardedClient = new ShardedRedis(redisHosts, options); @@ -40,18 +41,20 @@ describe('Test timeouts', function () { shardedClient.get(key, (err) => { expect(err instanceof Error).toBeTrue(); expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1); - expect(err.message).toBe('Redis call timed out'); + expect(err.message).toBe('[Breaker: defaultBrake] Request Timed out'); done(); }); - - jasmine.clock().tick(500); }); it('should cb with an error on a write if redis is not responding if write timeouts enabled', function (done) { const key = 'key'; const value = 'value'; const numMasterHosts = 1; - const options = { writeTimeout: 1000 }; + const options = { + breakerConfig: { + timeout: 1000 + } + }; const redisHosts = global.generateRedisHosts(numMasterHosts); const shardedClient = new ShardedRedis(redisHosts, options); @@ -60,17 +63,19 @@ describe('Test timeouts', function () { shardedClient.set(key, value, (err) => { expect(err instanceof Error).toBeTrue(); expect(MockRedisClient.prototype.set).toHaveBeenCalledTimes(1); - expect(err.message).toBe('Redis call timed out'); + expect(err.message).toBe('[Breaker: defaultBrake] Request Timed out'); done(); }); - - jasmine.clock().tick(1000); }); it('should only call the cb once if a timeout is not triggered', function (done) { const key = 'key'; const numMasterHosts = 1; - const options = { readTimeout: 500 }; + const options = { + breakerConfig: { + timeout: 500 + } + }; const redisHosts = global.generateRedisHosts(numMasterHosts); const shardedClient = new ShardedRedis(redisHosts, options); const mainCbSpy = jasmine.createSpy('mainCb').and.callFake((err) => expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1)); @@ -79,8 +84,6 @@ describe('Test timeouts', function () { shardedClient.get(key, mainCbSpy); setTimeout(() => expect(mainCbSpy).toHaveBeenCalledTimes(1) || done(), 600); - - jasmine.clock().tick(600); }); it('should move on to the next client if redis client times out on reads', function (done) { @@ -88,7 +91,11 @@ describe('Test timeouts', function () { const numPorts = 1; const numSlaves = 2; const numMasterHosts = 1; - const options = { readTimeout: 500 }; + const options = { + breakerConfig: { + timeout: 100 + } + }; const redisHosts = global.generateRedisHosts(numMasterHosts, numPorts, numSlaves); const shardedClient = new ShardedRedis(redisHosts, options); const wrappedClient = shardedClient._getWrappedClient(key); @@ -101,13 +108,11 @@ describe('Test timeouts', function () { spyOn(slaveClient2, 'get'); shardedClient.get(key, (err) => { - expect(err).toBeUndefined(); + expect(err).toBe(null); expect(slaveClient1.get).toHaveBeenCalledTimes(1); expect(slaveClient2.get).toHaveBeenCalledTimes(1); expect(masterClient.get).toHaveBeenCalledTimes(1); done(); }); - - jasmine.clock().tick(1000); }); }); From 4c3c03e0cc1c247404d51cf5a681e0a34f667915 Mon Sep 17 00:00:00 2001 From: Yaoling Dong Date: Tue, 23 Jan 2018 11:00:39 -0800 Subject: [PATCH 8/8] comment --- lib/WrappedClient.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/WrappedClient.js b/lib/WrappedClient.js index cbf199d..aa70d37 100644 --- a/lib/WrappedClient.js +++ b/lib/WrappedClient.js @@ -32,9 +32,10 @@ function createClient(port, host, options) { setTimeout(() => setInterval(() => client.ping(_.noop), interval), timeout); /*** Circuit Breaker Config ***/ + options.breakerConfig = _.defaults(options.breakerConfig, { - statInterval: 2500, - threshold: 0, // the breaker will be always closed + statInterval: 10000, + threshold: 0, // the breaker will be always closed by default circuitDuration: 15000, timeout: 5000 }); @@ -57,9 +58,10 @@ function createClient(port, host, options) { return client[cmd].apply(client, args); }), options.breakerConfig); - // brake.on('snapshot', (snapshot) => console.log(`Snapshot ${JSON.stringify(snapshot)} received`)); + brake.on('snapshot', (snapshot) => console.log(`Snapshot ${JSON.stringify(snapshot)} received`)); brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`)); brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`)); + /*** Circuit Breaker Config ***/ client._breaker = brake;