Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #18

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.swp
/node_modules

#webstorm
.idea
64 changes: 19 additions & 45 deletions ShardedRedisClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class ShardedRedisClient extends EventEmitter {
});
}

};
}

SHARDABLE.forEach((cmd) => {

Expand All @@ -280,59 +280,33 @@ SHARDABLE.forEach((cmd) => {

mainCb = _.once(mainCb);

const isReadCmd = READ_ONLY.indexOf(cmd) !== -1;
const timeout = isReadCmd ? this._readTimeout : this._writeTimeout;

const _this = this;
let timeoutHandler = null;
let breaker = client._breaker;

const timeoutCb = args[args.length - 1] = function (err) {
if (timeoutHandler)
clearTimeout(timeoutHandler);

if (!err) {
if (breaker)
breaker.pass();

return mainCb.apply(this, arguments);
}

// For now, both emit and log. Eventually, logging will be removed
_this.emit('err', new Error(`sharded-redis-client [${client.address}] err: ${err}`));
console.error(new Date().toISOString(), `sharded-redis-client [${client.address }] err: ${err}`);

if (breaker && err.message !== 'breaker open')
breaker.fail();

if (!client._isMaster) {
client = wrappedClient.slaves.next(client);
execute();

if (client._rrindex == startIndex)
client = findMasterClient(shardKey, _this._wrappedClients);
function execute() {
breaker.exec(cmd, args)
.then(result => mainCb(null, result))
.catch((err) => {
_this.emit('err', new Error(`sharded-redis-client [${client.address}] err: ${err}`));
console.error(new Date().toISOString(), `sharded-redis-client [${client.address }] err: ${err}`);

breaker = client._breaker;
if(!client._isMaster) {
client = wrappedClient.slaves.next(client);

return wrappedCmd(client, args);
}

mainCb.apply(this, arguments);
};

wrappedCmd(client, args);
if (client._rrindex == startIndex) {
client = findMasterClient(shardKey, _this._wrappedClients);
}

function wrappedCmd(client, args) {
if (!breaker || breaker.closed()) {
// Intentionally don't do this if timeout was set to 0
if (timeout)
timeoutHandler = setTimeout(timeoutCb, timeout, new Error('Redis call timed out'));
breaker = client._breaker;
return execute();
}

return client[cmd].apply(client, args);
}

timeoutCb(new Error('breaker open'));
return mainCb(err);
});
}
}
};

ShardedRedisClient.prototype[cmd] = function () {
const args = arguments;
Expand Down
39 changes: 36 additions & 3 deletions lib/WrappedClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
const _ = require('lodash');
const redis = require('redis');
const EventEmitter = require('events').EventEmitter;
const CircuitBreaker = require('circuit-breaker');
const RoundRobinSet = require('./RoundRobinSet');
const Brakes = require('brakes');
const Promise = require('bluebird');

function createClient(port, host, options) {
const client = redis.createClient(port, host, options.redisOptions);
Expand All @@ -30,8 +31,40 @@ function createClient(port, host, options) {
if (options.usePing)
setTimeout(() => setInterval(() => client.ping(_.noop), interval), timeout);

if (options.breakerConfig)
client._breaker = new CircuitBreaker(options.breakerConfig);
/*** Circuit Breaker Config ***/

options.breakerConfig = _.defaults(options.breakerConfig, {
statInterval: 10000,
threshold: 0, // the breaker will be always closed by default
circuitDuration: 15000,
timeout: 5000
});

const brake = new Brakes((cmd, args) => new Promise((resolve, reject) => {
// convert args to array
args = _.cloneDeep(args);
if(!args.length) {
args.length = Object.keys(args).length;
}
args = Array.from(args);
// modify callback
args[args.length - 1] = (err, result) => {
if(err) {
return reject(err);
}
return resolve(result);
};

return client[cmd].apply(client, args);
}), options.breakerConfig);

brake.on('snapshot', (snapshot) => console.log(`Snapshot ${JSON.stringify(snapshot)} received`));
brake.on('circuitOpen', () => console.log(`Circuit Opened at ${new Date().toISOString()}`));
brake.on('circuitClosed', () => console.log(`Circuit Closed at ${new Date().toISOString()}`));

/*** Circuit Breaker Config ***/

client._breaker = brake;

client.on('error', (e) => console.log(`Redis Error [${host}:${port}]: ${e} : ${new Date().toISOString()}`));
client.on('end', (e) => console.log(`Redis End [${host}:${port}]: ${e} : ${new Date().toISOString()}`));
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"homepage": "https://github.com/Tinder/sharded-redis-client",
"dependencies": {
"async": "2.6.0",
"bluebird": "3.5.1",
"brakes": "2.6.0",
"circuit-breaker": "git://github.com/Tinder/circuit-breaker",
"hiredis": "0.5.0",
"lodash": "4.17.4",
Expand Down
32 changes: 14 additions & 18 deletions test/breaker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,48 +30,42 @@ describe('Test breakers', () => {
const redisHosts = global.generateRedisHosts(numMasterHosts);
const options = {
breakerConfig: {
failure_rate: 0.5,
failure_count: 10,
reset_timeout: 30000
threshold: 0.5,
circuitDuration: 15000,
timeout: 5000
}
};

const shardedClient = new ShardedRedis(redisHosts, options);
const mockedClient = shardedClient._getWrappedClient(key).get();

spyOn(mockedClient._breaker, 'pass').and.callThrough();
spyOn(MockRedisClient.prototype, 'get').and.callThrough();

shardedClient.get(key, (err) => {
expect(err).toBeUndefined();
expect(err).toBe(null);
expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1);
expect(mockedClient._breaker.pass).toHaveBeenCalledTimes(1);
done();
});
});

it('should tell the breaker about a failed redis command', function (done) {
it('should receive error when redis command fails', function (done) {
const key = 'key';
const numMasterHosts = 1;
const redisHosts = global.generateRedisHosts(numMasterHosts);
const options = {
breakerConfig: {
failure_rate: 0.5,
failure_count: 10,
reset_timeout: 30000
threshold: 0.5,
circuitDuration: 15000,
timeout: 5000
}
};

const shardedClient = new ShardedRedis(redisHosts, options);
const mockedClient = shardedClient._getWrappedClient(key).get();

spyOn(mockedClient._breaker, 'fail').and.callThrough();
spyOn(MockRedisClient.prototype, 'get').and.callFake((key, cb) => cb(new Error('an error from the redis client')));

shardedClient.get(key, (err) => {
expect(err instanceof Error).toBeTrue();
expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1);
expect(mockedClient._breaker.fail).toHaveBeenCalledTimes(1);
done();
});
});
Expand All @@ -82,19 +76,21 @@ describe('Test breakers', () => {
const redisHosts = global.generateRedisHosts(numMasterHosts);
const options = {
breakerConfig: {
failure_count: -1
threshold: 0.5,
circuitDuration: 15000,
timeout: 5000
}
};

const shardedClient = new ShardedRedis(redisHosts, options);
const mockedClient = shardedClient._getWrappedClient(key).get();

mockedClient._breaker.trip();

// manually open it first
mockedClient._breaker._open();
spyOn(MockRedisClient.prototype, 'get').and.callThrough();

shardedClient.get(key, (err) => {
expect(err instanceof Error).toBeTrue();
expect(err.message).toBe('breaker open');
expect(MockRedisClient.prototype.get).not.toHaveBeenCalled();
done();
});
Expand Down
41 changes: 23 additions & 18 deletions test/timeouts.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ describe('Test timeouts', function () {
const WrappedClient = proxyquire('../lib/WrappedClient', { redis: MockRedisClient });
const ShardedRedis = proxyquire('../ShardedRedisClient', { './lib/WrappedClient': WrappedClient });

beforeAll(() => jasmine.clock().install());
afterAll(() => jasmine.clock().uninstall());

it('should cb with an error on a read if redis is not responding if read timeouts enabled', function (done) {
const key = 'key';
const numMasterHosts = 1;
const options = { readTimeout: 500 };
const options = {
breakerConfig: {
timeout: 500
}
};
const redisHosts = global.generateRedisHosts(numMasterHosts);
const shardedClient = new ShardedRedis(redisHosts, options);

Expand All @@ -40,18 +41,20 @@ describe('Test timeouts', function () {
shardedClient.get(key, (err) => {
expect(err instanceof Error).toBeTrue();
expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1);
expect(err.message).toBe('Redis call timed out');
expect(err.message).toBe('[Breaker: defaultBrake] Request Timed out');
done();
});

jasmine.clock().tick(500);
});

it('should cb with an error on a write if redis is not responding if write timeouts enabled', function (done) {
const key = 'key';
const value = 'value';
const numMasterHosts = 1;
const options = { writeTimeout: 1000 };
const options = {
breakerConfig: {
timeout: 1000
}
};
const redisHosts = global.generateRedisHosts(numMasterHosts);
const shardedClient = new ShardedRedis(redisHosts, options);

Expand All @@ -60,17 +63,19 @@ describe('Test timeouts', function () {
shardedClient.set(key, value, (err) => {
expect(err instanceof Error).toBeTrue();
expect(MockRedisClient.prototype.set).toHaveBeenCalledTimes(1);
expect(err.message).toBe('Redis call timed out');
expect(err.message).toBe('[Breaker: defaultBrake] Request Timed out');
done();
});

jasmine.clock().tick(1000);
});

it('should only call the cb once if a timeout is not triggered', function (done) {
const key = 'key';
const numMasterHosts = 1;
const options = { readTimeout: 500 };
const options = {
breakerConfig: {
timeout: 500
}
};
const redisHosts = global.generateRedisHosts(numMasterHosts);
const shardedClient = new ShardedRedis(redisHosts, options);
const mainCbSpy = jasmine.createSpy('mainCb').and.callFake((err) => expect(MockRedisClient.prototype.get).toHaveBeenCalledTimes(1));
Expand All @@ -79,16 +84,18 @@ describe('Test timeouts', function () {

shardedClient.get(key, mainCbSpy);
setTimeout(() => expect(mainCbSpy).toHaveBeenCalledTimes(1) || done(), 600);

jasmine.clock().tick(600);
});

it('should move on to the next client if redis client times out on reads', function (done) {
const key = 'key';
const numPorts = 1;
const numSlaves = 2;
const numMasterHosts = 1;
const options = { readTimeout: 500 };
const options = {
breakerConfig: {
timeout: 100
}
};
const redisHosts = global.generateRedisHosts(numMasterHosts, numPorts, numSlaves);
const shardedClient = new ShardedRedis(redisHosts, options);
const wrappedClient = shardedClient._getWrappedClient(key);
Expand All @@ -101,13 +108,11 @@ describe('Test timeouts', function () {
spyOn(slaveClient2, 'get');

shardedClient.get(key, (err) => {
expect(err).toBeUndefined();
expect(err).toBe(null);
expect(slaveClient1.get).toHaveBeenCalledTimes(1);
expect(slaveClient2.get).toHaveBeenCalledTimes(1);
expect(masterClient.get).toHaveBeenCalledTimes(1);
done();
});

jasmine.clock().tick(1000);
});
});