Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection cache for producer #46

Merged
merged 2 commits into from
May 4, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ lib-cov
*.out
*.pid
*.gz
*.swp

pids
logs
Expand Down
104 changes: 104 additions & 0 deletions lib/Connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
var Writable = require('readable-stream').Writable;
var net = require('net');

var Connection = function(port, host) {
Writable.call(this, {objectMode: true});
this.port = port;
this.host = host;

this.state = Connection.DISCONNECTED;
this._connection = null;
};

Connection.DISCONNECTED = 0;
Connection.CONNECTING = 1;
Connection.CONNECTED = 2;

Connection.prototype = Object.create(Writable.prototype);

Connection.prototype.connect = function() {
var that = this;

if (this.state === Connection.CONNECTED) {
this.emit('connect');
return;
} else if (this.state === Connection.CONNECTING) {
return;
}

this.state = Connection.CONNECTING;

this._connection = net.createConnection(this.port, this.host);
this._connection.setKeepAlive(true, 1000);

this._connection.once('connect', function() {
that.state = Connection.CONNECTED;
that.emit('connect');
});
this._connection.on('error', function(err) {
that.state = Connection.DISCONNECTED;
that.emit('error', err);
});
this._connection.on('close', function() {
that.state = Connection.DISCONNECTED;
that.emit('close');
});
};

Connection.prototype._reconnect = function(callback) {
var that = this;

if (this.state === Connection.CONNECTED) {
return callback();
}

var onConnect = function() {
that.removeListener('brokerReconnectError', onReconnectError);
that._connection.removeListener('error', onConnectError);
callback();
};

var onReconnectError = function() {
that.removeListener('connect', onConnect);
callback('brokerReconnectError');
};

var onConnectError = function(err) {
if (!!err.message && err.message === 'connect ECONNREFUSED') {
that.emit('brokerReconnectError', err);
} else {
callback(err);
}
};

this.once('connect', onConnect);
this.once('brokerReconnectError', onReconnectError);

if (this.state === Connection.CONNECTING) return;

this.connect();

this._connection.once('error', onConnectError);
};

Connection.prototype._write = function(data, encoding, callback) {
var that = this;

this._connection.write(data, function(err) {
if (!!err && err.message === 'This socket is closed.') {
that._reconnect(function(err) {
if (err) {
return callback(err);
}

that._connection.write(data, function(err) {
return callback(err);
});
});
} else {
callback(err);
}
});
};

module.exports = Connection;
24 changes: 24 additions & 0 deletions lib/ConnectionCache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var Connection = require('./Connection');
var net = require('net');

var ConnectionCache = function() {
this.connections = {};
}

ConnectionCache.prototype.clear = function() {
this.connections = {};
};

ConnectionCache.prototype.getConnection = function(port, host) {
var connectionString = host + ':' + port;
var connection = this.connections[connectionString];

if (!connection) {
connection = new Connection(port, host);
this.connections[connectionString] = connection;
}

return connection;
};

module.exports = ConnectionCache;
74 changes: 20 additions & 54 deletions lib/Producer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
var net = require('net');
var EventEmitter = require('events').EventEmitter;
var _ = require('underscore');
var Message = require('./Message');
var ProduceRequest = require('./ProduceRequest');
var Connection = require('./Connection');
var ConnectionCache = require('./ConnectionCache');

var Producer = function(topic, options){
if (!topic || (!_.isString(topic))){
Expand All @@ -14,67 +15,38 @@ var Producer = function(topic, options){
this.partition = options.partition || 0;
this.host = options.host || 'localhost';
this.port = options.port || 9092;
this.useConnectionCache = options.connectionCache;

this.connection = null;
this.connecting = false;
};

Producer.prototype = Object.create(EventEmitter.prototype);

Producer.prototype.connect = function(){
var that = this;
this.connecting = true;
this.connection = net.createConnection(this.port, this.host);
this.connection.setKeepAlive(true, 1000);
if (this.useConnectionCache) {
this.connection = Producer._connectionCache.getConnection(this.port, this.host);
} else {
this.connection = new Connection(this.port, this.host);
}
this.connection.once('connect', function(){
that.connecting = false;
that.emit('connect');
});
this.connection.once('error', function(err){
if (!!err.message && err.message === 'connect ECONNREFUSED'){
this.connection.on('error', function(err){
if (!!err.message && err.message === 'connect ECONNREFUSED') {
that.emit('error', err);
that.connecting = false;
}
});
this.connection.connect();
};

Producer.prototype._reconnect = function(cb){
var producer = this;

var onConnect = function(){
producer.removeListener('brokerReconnectError', onBrokerReconnectError);
return cb();
};
producer.once('connect', onConnect);

var onBrokerReconnectError = function(err){
producer.removeListener('connect', onConnect);
return cb('brokerReconnectError');
};
producer.once('brokerReconnectError', onBrokerReconnectError);

if (!producer.connecting){
producer.connect();
producer.connection.on('error', function(err){
if (!!err.message && err.message === 'connect ECONNREFUSED'){
producer.emit("brokerReconnectError", err);
} else {
return cb(err);
}
});
} else {
// reconnect already in progress. wait.
}
};


Producer.prototype.send = function(messages, options, cb) {
var that = this;
if (arguments.length === 2){
// "options" is not a required parameter, so handle the
// case when it's not set.
cb = options;
options = {};
options = {};
}
if (!cb || (typeof cb != 'function')){
throw "A callback with an error parameter must be supplied";
Expand All @@ -83,20 +55,14 @@ Producer.prototype.send = function(messages, options, cb) {
options.topic = options.topic || this.topic;
messages = toListOfMessages(toArray(messages));
var request = new ProduceRequest(options.topic, options.partition, messages);
this.connection.write(request.toBytes(), function(err) {
if (!!err && err.message === 'This socket is closed.') {
that._reconnect(function(err){
if (err){
return cb(err);
}
that.connection.write(request.toBytes(), function(err) {
return cb(err);
});
});
} else {
cb(err);
}
});

this.connection.write(request.toBytes(), cb);
};

Producer._connectionCache = new ConnectionCache();

Producer.clearConnectionCache = function() {
Producer._connectionCache.clear();
};

module.exports = Producer;
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"binary": "0.3.0",
"crc32": "0.2.2",
"buffer-crc32": "0.2.1",
"bignum": "0.6.2"
"bignum": "0.6.2",
"readable-stream": "1.0.x"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set this to 1.0.26 or something.

},
"devDependencies": {
"jscoverage": "0.3.6",
Expand Down
Loading