Skip to content

Commit 7b3d59c

Browse files
committed
Merge pull request #46 from uber/connection-cache
Connection cache for producer
2 parents d7a6e4f + 8d6051b commit 7b3d59c

7 files changed

+744
-285
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ lib-cov
66
*.out
77
*.pid
88
*.gz
9+
*.swp
910

1011
pids
1112
logs

lib/Connection.js

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
var Writable = require('readable-stream').Writable;
2+
var net = require('net');
3+
4+
var Connection = function(port, host) {
5+
Writable.call(this, {objectMode: true});
6+
this.port = port;
7+
this.host = host;
8+
9+
this.state = Connection.DISCONNECTED;
10+
this._connection = null;
11+
};
12+
13+
Connection.DISCONNECTED = 0;
14+
Connection.CONNECTING = 1;
15+
Connection.CONNECTED = 2;
16+
17+
Connection.prototype = Object.create(Writable.prototype);
18+
19+
Connection.prototype.connect = function() {
20+
var that = this;
21+
22+
if (this.state === Connection.CONNECTED) {
23+
this.emit('connect');
24+
return;
25+
} else if (this.state === Connection.CONNECTING) {
26+
return;
27+
}
28+
29+
this.state = Connection.CONNECTING;
30+
31+
this._connection = net.createConnection(this.port, this.host);
32+
this._connection.setKeepAlive(true, 1000);
33+
34+
this._connection.once('connect', function() {
35+
that.state = Connection.CONNECTED;
36+
that.emit('connect');
37+
});
38+
this._connection.on('error', function(err) {
39+
that.state = Connection.DISCONNECTED;
40+
that.emit('error', err);
41+
});
42+
this._connection.on('close', function() {
43+
that.state = Connection.DISCONNECTED;
44+
that.emit('close');
45+
});
46+
};
47+
48+
Connection.prototype._reconnect = function(callback) {
49+
var that = this;
50+
51+
if (this.state === Connection.CONNECTED) {
52+
return callback();
53+
}
54+
55+
var onConnect = function() {
56+
that.removeListener('brokerReconnectError', onReconnectError);
57+
that._connection.removeListener('error', onConnectError);
58+
callback();
59+
};
60+
61+
var onReconnectError = function() {
62+
that.removeListener('connect', onConnect);
63+
callback('brokerReconnectError');
64+
};
65+
66+
var onConnectError = function(err) {
67+
if (!!err.message && err.message === 'connect ECONNREFUSED') {
68+
that.emit('brokerReconnectError', err);
69+
} else {
70+
callback(err);
71+
}
72+
};
73+
74+
this.once('connect', onConnect);
75+
this.once('brokerReconnectError', onReconnectError);
76+
77+
if (this.state === Connection.CONNECTING) return;
78+
79+
this.connect();
80+
81+
this._connection.once('error', onConnectError);
82+
};
83+
84+
Connection.prototype._write = function(data, encoding, callback) {
85+
var that = this;
86+
87+
this._connection.write(data, function(err) {
88+
if (!!err && err.message === 'This socket is closed.') {
89+
that._reconnect(function(err) {
90+
if (err) {
91+
return callback(err);
92+
}
93+
94+
that._connection.write(data, function(err) {
95+
return callback(err);
96+
});
97+
});
98+
} else {
99+
callback(err);
100+
}
101+
});
102+
};
103+
104+
module.exports = Connection;

lib/ConnectionCache.js

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
var Connection = require('./Connection');
2+
var net = require('net');
3+
4+
var ConnectionCache = function() {
5+
this.connections = {};
6+
}
7+
8+
ConnectionCache.prototype.clear = function() {
9+
this.connections = {};
10+
};
11+
12+
ConnectionCache.prototype.getConnection = function(port, host) {
13+
var connectionString = host + ':' + port;
14+
var connection = this.connections[connectionString];
15+
16+
if (!connection) {
17+
connection = new Connection(port, host);
18+
this.connections[connectionString] = connection;
19+
}
20+
21+
return connection;
22+
};
23+
24+
module.exports = ConnectionCache;

lib/Producer.js

+20-54
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
var net = require('net');
21
var EventEmitter = require('events').EventEmitter;
32
var _ = require('underscore');
43
var Message = require('./Message');
54
var ProduceRequest = require('./ProduceRequest');
5+
var Connection = require('./Connection');
6+
var ConnectionCache = require('./ConnectionCache');
67

78
var Producer = function(topic, options){
89
if (!topic || (!_.isString(topic))){
@@ -14,67 +15,38 @@ var Producer = function(topic, options){
1415
this.partition = options.partition || 0;
1516
this.host = options.host || 'localhost';
1617
this.port = options.port || 9092;
18+
this.useConnectionCache = options.connectionCache;
1719

1820
this.connection = null;
19-
this.connecting = false;
2021
};
2122

2223
Producer.prototype = Object.create(EventEmitter.prototype);
2324

2425
Producer.prototype.connect = function(){
2526
var that = this;
26-
this.connecting = true;
27-
this.connection = net.createConnection(this.port, this.host);
28-
this.connection.setKeepAlive(true, 1000);
27+
if (this.useConnectionCache) {
28+
this.connection = Producer._connectionCache.getConnection(this.port, this.host);
29+
} else {
30+
this.connection = new Connection(this.port, this.host);
31+
}
2932
this.connection.once('connect', function(){
30-
that.connecting = false;
3133
that.emit('connect');
3234
});
33-
this.connection.once('error', function(err){
34-
if (!!err.message && err.message === 'connect ECONNREFUSED'){
35+
this.connection.on('error', function(err){
36+
if (!!err.message && err.message === 'connect ECONNREFUSED') {
3537
that.emit('error', err);
36-
that.connecting = false;
3738
}
3839
});
40+
this.connection.connect();
3941
};
4042

41-
Producer.prototype._reconnect = function(cb){
42-
var producer = this;
43-
44-
var onConnect = function(){
45-
producer.removeListener('brokerReconnectError', onBrokerReconnectError);
46-
return cb();
47-
};
48-
producer.once('connect', onConnect);
49-
50-
var onBrokerReconnectError = function(err){
51-
producer.removeListener('connect', onConnect);
52-
return cb('brokerReconnectError');
53-
};
54-
producer.once('brokerReconnectError', onBrokerReconnectError);
55-
56-
if (!producer.connecting){
57-
producer.connect();
58-
producer.connection.on('error', function(err){
59-
if (!!err.message && err.message === 'connect ECONNREFUSED'){
60-
producer.emit("brokerReconnectError", err);
61-
} else {
62-
return cb(err);
63-
}
64-
});
65-
} else {
66-
// reconnect already in progress. wait.
67-
}
68-
};
69-
70-
7143
Producer.prototype.send = function(messages, options, cb) {
7244
var that = this;
7345
if (arguments.length === 2){
7446
// "options" is not a required parameter, so handle the
7547
// case when it's not set.
7648
cb = options;
77-
options = {};
49+
options = {};
7850
}
7951
if (!cb || (typeof cb != 'function')){
8052
throw "A callback with an error parameter must be supplied";
@@ -83,20 +55,14 @@ Producer.prototype.send = function(messages, options, cb) {
8355
options.topic = options.topic || this.topic;
8456
messages = toListOfMessages(toArray(messages));
8557
var request = new ProduceRequest(options.topic, options.partition, messages);
86-
this.connection.write(request.toBytes(), function(err) {
87-
if (!!err && err.message === 'This socket is closed.') {
88-
that._reconnect(function(err){
89-
if (err){
90-
return cb(err);
91-
}
92-
that.connection.write(request.toBytes(), function(err) {
93-
return cb(err);
94-
});
95-
});
96-
} else {
97-
cb(err);
98-
}
99-
});
58+
59+
this.connection.write(request.toBytes(), cb);
60+
};
61+
62+
Producer._connectionCache = new ConnectionCache();
63+
64+
Producer.clearConnectionCache = function() {
65+
Producer._connectionCache.clear();
10066
};
10167

10268
module.exports = Producer;

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
"binary": "0.3.0",
3030
"crc32": "0.2.2",
3131
"buffer-crc32": "0.2.1",
32-
"bignum": "0.6.2"
32+
"bignum": "0.6.2",
33+
"readable-stream": "1.0.26"
3334
},
3435
"devDependencies": {
3536
"jscoverage": "0.3.6",

0 commit comments

Comments
 (0)