diff --git a/.gitignore b/.gitignore index 53ba5df..40f8f31 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ lib-cov *.out *.pid *.gz +*.swp pids logs diff --git a/lib/Connection.js b/lib/Connection.js new file mode 100644 index 0000000..60c5edb --- /dev/null +++ b/lib/Connection.js @@ -0,0 +1,104 @@ +var Writable = require('readable-stream').Writable; +var net = require('net'); + +var Connection = function(port, host) { + Writable.call(this, {objectMode: true}); + this.port = port; + this.host = host; + + this.state = Connection.DISCONNECTED; + this._connection = null; +}; + +Connection.DISCONNECTED = 0; +Connection.CONNECTING = 1; +Connection.CONNECTED = 2; + +Connection.prototype = Object.create(Writable.prototype); + +Connection.prototype.connect = function() { + var that = this; + + if (this.state === Connection.CONNECTED) { + this.emit('connect'); + return; + } else if (this.state === Connection.CONNECTING) { + return; + } + + this.state = Connection.CONNECTING; + + this._connection = net.createConnection(this.port, this.host); + this._connection.setKeepAlive(true, 1000); + + this._connection.once('connect', function() { + that.state = Connection.CONNECTED; + that.emit('connect'); + }); + this._connection.on('error', function(err) { + that.state = Connection.DISCONNECTED; + that.emit('error', err); + }); + this._connection.on('close', function() { + that.state = Connection.DISCONNECTED; + that.emit('close'); + }); +}; + +Connection.prototype._reconnect = function(callback) { + var that = this; + + if (this.state === Connection.CONNECTED) { + return callback(); + } + + var onConnect = function() { + that.removeListener('brokerReconnectError', onReconnectError); + that._connection.removeListener('error', onConnectError); + callback(); + }; + + var onReconnectError = function() { + that.removeListener('connect', onConnect); + callback('brokerReconnectError'); + }; + + var onConnectError = function(err) { + if (!!err.message && err.message === 'connect ECONNREFUSED') { + that.emit('brokerReconnectError', err); + } else { + callback(err); + } + }; + + this.once('connect', onConnect); + this.once('brokerReconnectError', onReconnectError); + + if (this.state === Connection.CONNECTING) return; + + this.connect(); + + this._connection.once('error', onConnectError); +}; + +Connection.prototype._write = function(data, encoding, callback) { + var that = this; + + this._connection.write(data, function(err) { + if (!!err && err.message === 'This socket is closed.') { + that._reconnect(function(err) { + if (err) { + return callback(err); + } + + that._connection.write(data, function(err) { + return callback(err); + }); + }); + } else { + callback(err); + } + }); +}; + +module.exports = Connection; diff --git a/lib/ConnectionCache.js b/lib/ConnectionCache.js new file mode 100644 index 0000000..77247b1 --- /dev/null +++ b/lib/ConnectionCache.js @@ -0,0 +1,24 @@ +var Connection = require('./Connection'); +var net = require('net'); + +var ConnectionCache = function() { + this.connections = {}; +} + +ConnectionCache.prototype.clear = function() { + this.connections = {}; +}; + +ConnectionCache.prototype.getConnection = function(port, host) { + var connectionString = host + ':' + port; + var connection = this.connections[connectionString]; + + if (!connection) { + connection = new Connection(port, host); + this.connections[connectionString] = connection; + } + + return connection; +}; + +module.exports = ConnectionCache; diff --git a/lib/Producer.js b/lib/Producer.js index 97a46c6..2d45f3a 100644 --- a/lib/Producer.js +++ b/lib/Producer.js @@ -1,8 +1,9 @@ -var net = require('net'); var EventEmitter = require('events').EventEmitter; var _ = require('underscore'); var Message = require('./Message'); var ProduceRequest = require('./ProduceRequest'); +var Connection = require('./Connection'); +var ConnectionCache = require('./ConnectionCache'); var Producer = function(topic, options){ if (!topic || (!_.isString(topic))){ @@ -14,67 +15,38 @@ var Producer = function(topic, options){ this.partition = options.partition || 0; this.host = options.host || 'localhost'; this.port = options.port || 9092; + this.useConnectionCache = options.connectionCache; this.connection = null; - this.connecting = false; }; Producer.prototype = Object.create(EventEmitter.prototype); Producer.prototype.connect = function(){ var that = this; - this.connecting = true; - this.connection = net.createConnection(this.port, this.host); - this.connection.setKeepAlive(true, 1000); + if (this.useConnectionCache) { + this.connection = Producer._connectionCache.getConnection(this.port, this.host); + } else { + this.connection = new Connection(this.port, this.host); + } this.connection.once('connect', function(){ - that.connecting = false; that.emit('connect'); }); - this.connection.once('error', function(err){ - if (!!err.message && err.message === 'connect ECONNREFUSED'){ + this.connection.on('error', function(err){ + if (!!err.message && err.message === 'connect ECONNREFUSED') { that.emit('error', err); - that.connecting = false; } }); + this.connection.connect(); }; -Producer.prototype._reconnect = function(cb){ - var producer = this; - - var onConnect = function(){ - producer.removeListener('brokerReconnectError', onBrokerReconnectError); - return cb(); - }; - producer.once('connect', onConnect); - - var onBrokerReconnectError = function(err){ - producer.removeListener('connect', onConnect); - return cb('brokerReconnectError'); - }; - producer.once('brokerReconnectError', onBrokerReconnectError); - - if (!producer.connecting){ - producer.connect(); - producer.connection.on('error', function(err){ - if (!!err.message && err.message === 'connect ECONNREFUSED'){ - producer.emit("brokerReconnectError", err); - } else { - return cb(err); - } - }); - } else { - // reconnect already in progress. wait. - } -}; - - Producer.prototype.send = function(messages, options, cb) { var that = this; if (arguments.length === 2){ // "options" is not a required parameter, so handle the // case when it's not set. cb = options; - options = {}; + options = {}; } if (!cb || (typeof cb != 'function')){ throw "A callback with an error parameter must be supplied"; @@ -83,20 +55,14 @@ Producer.prototype.send = function(messages, options, cb) { options.topic = options.topic || this.topic; messages = toListOfMessages(toArray(messages)); var request = new ProduceRequest(options.topic, options.partition, messages); - this.connection.write(request.toBytes(), function(err) { - if (!!err && err.message === 'This socket is closed.') { - that._reconnect(function(err){ - if (err){ - return cb(err); - } - that.connection.write(request.toBytes(), function(err) { - return cb(err); - }); - }); - } else { - cb(err); - } - }); + + this.connection.write(request.toBytes(), cb); +}; + +Producer._connectionCache = new ConnectionCache(); + +Producer.clearConnectionCache = function() { + Producer._connectionCache.clear(); }; module.exports = Producer; diff --git a/package.json b/package.json index a5da82a..6723459 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "producer", "consumer" ], - "version": "0.6.4", + "version": "0.6.5", "bugs": { "url": "https://github.com/cainus/Prozess/issues" }, @@ -29,7 +29,8 @@ "binary": "0.3.0", "crc32": "0.2.2", "buffer-crc32": "0.2.1", - "bignum": "0.6.2" + "bignum": "0.6.2", + "readable-stream": "1.0.x" }, "devDependencies": { "jscoverage": "0.3.6", diff --git a/test/Producer.js b/test/Producer.js index 33c2521..3b6284f 100644 --- a/test/Producer.js +++ b/test/Producer.js @@ -1,4 +1,6 @@ var should = require('should'); +var assert = require('assert'); +var mockSocket = require('./lib/mock_socket'); var Producer = require('../index').Producer; var Message = require('../index').Message; var ProduceRequest = require('../index').ProduceRequest; @@ -9,278 +11,479 @@ var EventEmitter = require('events').EventEmitter; var sinon = require('../lib/sinonPatch'); function closeServer(server, cb){ + var called = false; + function callback() { + if (!called) { + called = true; + cb(); + } + } if (!!server){ try { server.close(function(){ - cb(); + callback(); }); server = null; - setTimeout(function(){cb();}, 1000); + setTimeout(function(){callback();}, 1000); } catch(ex){ server = null; - cb(); + callback(); } } else { - cb(); + callback(); } } - -describe("Producer", function(){ - beforeEach(function(done){ - this.producer = new Producer('test'); - if (!this.server) { - this.server = null; - } - closeServer(this.server, done); - }); - afterEach(function(){ - sinon.restoreAll(); - }); - - describe("Kafka Producer", function(){ - it("should have a default topic id", function(){ - this.producer.topic.should.equal('test'); - }); - - it("should have a default partition", function(){ - this.producer.partition.should.equal(0); +function testProducer(useConnCache) { + describe("Producer with" + (useConnCache ? "" : "out") + " connection cache", function() { + beforeEach(function(done){ + Producer.clearConnectionCache(); + this.producer = new Producer('test', { + connectionCache: useConnCache + }); + if (!this.server) { + this.server = null; + } + closeServer(this.server, done); }); - - it("should have a default host", function(){ - this.producer.host.should.equal('localhost'); + afterEach(function(){ + sinon.restoreAll(); }); - it("should have a default port", function(){ - this.producer.port.should.equal(9092); - }); + describe("Kafka Producer", function(){ + it("should have a default topic id", function(){ + this.producer.topic.should.equal('test'); + }); - it("should error if topic is not supplied", function(){ - try { - new Producer(); - should.fail("expected exception was not raised"); - } catch(ex){ - ex.should.equal("the first parameter, topic, is mandatory."); - } - }); - describe("no server is present", function(){ - it("emits an error", function(done) { - this.producer.port = 8542; - this.producer.on('error', function(err) { - should.exist(err); - done(); - }); - this.producer.connect(function(err) { - }); + it("should have a default partition", function(){ + this.producer.partition.should.equal(0); }); - }); - describe("#connect", function() { - it("sets keep-alive on underlying socket on connect", function(done) { - /* decorate underlying Socket function to set a property */ - var isSetKeepAliveSet = false; - var setKeepAlive = net.Socket.prototype.setKeepAlive; - net.Socket.prototype.setKeepAlive = function(setting, msecs) { - isSetKeepAliveSet = true; - setKeepAlive(setting, msecs); - }; - this.server = net.createServer(function(connection) { - }); - this.server.listen(9998); - this.producer.port = 9998; - this.producer.on('connect', function(){ - done(); - }); - this.producer.connect(); - isSetKeepAliveSet.should.equal(true); - net.Socket.prototype.setKeepAlive = setKeepAlive; + + it("should have a default host", function(){ + this.producer.host.should.equal('localhost'); }); - }); - describe("#send", function() { - it("should attempt a reconnect on send if disconnected", function(done) { - var connectionCount = 0; - sinon.stub(net, "createConnection", function(port, host){ - host.should.equal("localhost"); - port.should.equal(8544); - var fakeConn = new EventEmitter(); - fakeConn.setKeepAlive = function(somebool, interval){ - somebool.should.equal(true); - interval.should.equal(1000); - }; - fakeConn.write = function(data, cb){ - if (connectionCount === 0){ - connectionCount = connectionCount + 1; - cb(new Error('This socket is closed.')); - } else { - connectionCount = connectionCount + 1; - cb(); - } - }; - setTimeout(function(){ - fakeConn.emit('connect'); - }, 1); - return fakeConn; - }); - var that = this; - this.producer = new Producer('test'); - this.producer.port = 8544; - this.producer.once('connect', function(){ - that.producer.send('foo', function(err) { - should.not.exist(err); - connectionCount.should.equal(2); - done(); - }); - }); - this.producer.connect(); + it("should have a default port", function(){ + this.producer.port.should.equal(9092); }); - it("should coerce a non-Message object into a Message object before sending", function(done) { - var that = this; - this.server = net.createServer(function (socket) { -     socket.on('data', function(data){ - var unpacked = binary.parse(data) - .word32bu('length') - .word16bs('error') - .tap( function(vars) { - this.buffer('body', vars.length); - }) - .vars; - var request = ProduceRequest.fromBytes(data); - request.messages.length.should.equal(1); - request.messages[0].payload.toString().should.equal('this is not a message'); - done(); - }); + + it("should error if topic is not supplied", function(){ + try { + new Producer(); + should.fail("expected exception was not raised"); + } catch(ex){ + ex.should.equal("the first parameter, topic, is mandatory."); + } }); - this.server.listen(8542, function() { - that.producer.port = 8542; - that.producer.on('error', function() { - }); - that.producer.on('connect', function(){ - var messages = ['this is not a message']; - that.producer.send(messages, function(err){ - should.not.exist(err); + describe("no server is present", function(){ + it("emits an error", function(done) { + this.producer.port = 8542; + this.producer.on('error', function(err) { + should.exist(err); + done(); + }); + this.producer.connect(function(err) { }); }); - that.producer.connect(); }); - }); - it("if there's an error, report it in the callback", function(done) { - var that = this; - this.server = net.createServer(function(socket) {}); - this.server.listen(8542, function() { - that.producer.port = 8542; - that.producer.on('connect', function(){ - that.producer.connection.write = function(bytes, cb) { - should.exist(cb); - cb('some error'); + describe("#connect", function() { + it("sets keep-alive on underlying socket on connect", function(done) { + /* decorate underlying Socket function to set a property */ + var isSetKeepAliveSet = false; + var setKeepAlive = net.Socket.prototype.setKeepAlive; + net.Socket.prototype.setKeepAlive = function(setting, msecs) { + isSetKeepAliveSet = true; + setKeepAlive(setting, msecs); }; - var message = new Message('foo'); - that.producer.send(message, function(err) { - err.toString().should.equal('some error'); + this.server = net.createServer(function(connection) { + }); + this.server.listen(9998); + this.producer.port = 9998; + this.producer.on('connect', function(){ done(); }); + this.producer.connect(); + isSetKeepAliveSet.should.equal(true); + net.Socket.prototype.setKeepAlive = setKeepAlive; }); - that.producer.connect(); - }); - }); - it("should coerce a single message into a list", function(done) { - var that = this; - this.server = net.createServer(function (socket) { -     socket.on('data', function(data){ - var unpacked = binary.parse(data) - .word32bu('length') - .word16bs('error') - .tap( function(vars) { - this.buffer('body', vars.length); - }) - .vars; - var request = ProduceRequest.fromBytes(data); - request.messages.length.should.equal(1); - request.messages[0].payload.toString().should.equal("foo"); - }); + if (useConnCache) { + it("should reuse connections to the same host and port", function(done) { + this.server = net.createServer(function(connection) { + }); + this.server.listen(9998); + var connected = 0; + function onConnect() { + ++connected; + if (connected === 2) { + producer1.connection.should.equal(producer2.connection); + done(); + } + } + var producer1 = new Producer('test1', { + port: 9998, + connectionCache: true + }); + producer1.on('connect', onConnect); + var producer2 = new Producer('test2', { + port: 9998, + connectionCache: true + }); + producer2.on('connect', onConnect); + producer1.connect(); + producer2.connect(); + }); + } }); - this.server.listen(8542, function() { - that.producer.port = 8542; - that.producer.on('error', function(err) { - should.fail('should not get here'); + describe("#send", function() { + it("should attempt a reconnect on send if disconnected", function(done) { + var connectionCount = 0; + sinon.stub(net, "createConnection", function(port, host){ + host.should.equal("localhost"); + port.should.equal(8544); + var fakeConn = new EventEmitter(); + fakeConn.setKeepAlive = function(somebool, interval){ + somebool.should.equal(true); + interval.should.equal(1000); + }; + fakeConn.write = function(data, cb){ + if (connectionCount === 0){ + connectionCount = connectionCount + 1; + cb(new Error('This socket is closed.')); + } else { + connectionCount = connectionCount + 1; + cb(); + } + }; + setTimeout(function(){ + fakeConn.emit('connect'); + }, 1); + return fakeConn; + }); + var that = this; + this.producer = new Producer('test', { + connectionCache: useConnCache + }); + this.producer.port = 8544; + + this.producer.once('connect', function(){ + that.producer.send('foo', function(err) { + should.not.exist(err); + connectionCount.should.equal(2); + done(); + }); + }); + this.producer.connect(); }); - that.producer.on('connect', function(){ - var message = new Message('foo'); - that.producer.send(message, function(err) { - should.not.exist(err); + it("should coerce a non-Message object into a Message object before sending", function(done) { + var that = this; + this.server = net.createServer(function (socket) { +   socket.on('data', function(data){ + var unpacked = binary.parse(data) + .word32bu('length') + .word16bs('error') + .tap( function(vars) { + this.buffer('body', vars.length); + }) + .vars; + var request = ProduceRequest.fromBytes(data); + request.messages.length.should.equal(1); + request.messages[0].payload.toString().should.equal('this is not a message'); done(); + }); + }); + this.server.listen(8542, function() { + that.producer.port = 8542; + that.producer.on('error', function() { + }); + that.producer.on('connect', function(){ + var messages = ['this is not a message']; + that.producer.send(messages, function(err){ + should.not.exist(err); + }); + }); + that.producer.connect(); }); }); - that.producer.connect(); - }); - }); - it("should allow an options parameter to specify topic and partition", function(done) { - var that = this; - this.server = net.createServer(function (socket) { -     socket.on('data', function(data){ - var unpacked = binary.parse(data) - .word32bu('length') - .word16bs('error') - .tap( function(vars) { - this.buffer('body', vars.length); - }) - .vars; - var request = ProduceRequest.fromBytes(data); - request.messages.length.should.equal(1); - request.messages[0].payload.toString().should.equal("foo"); - request.topic.should.equal("newtopic"); - request.partition.should.equal(1337); + it("if there's an error, report it in the callback", function(done) { + var that = this; + this.server = net.createServer(function(socket) {}); + this.server.listen(8542, function() { + that.producer.port = 8542; + that.producer.on('connect', function(){ + that.producer.connection.write = function(bytes, cb) { + should.exist(cb); + cb('some error'); + }; + var message = new Message('foo'); + that.producer.send(message, function(err) { + err.toString().should.equal('some error'); + done(); + }); + }); + that.producer.connect(); + }); }); - }); - this.server.listen(8542, function() { - that.producer.port = 8542; - that.producer.on('error', function(err) { - should.fail('should not get here'); + it("should coerce a single message into a list", function(done) { + var that = this; + this.server = net.createServer(function (socket) { +   socket.on('data', function(data){ + var unpacked = binary.parse(data) + .word32bu('length') + .word16bs('error') + .tap( function(vars) { + this.buffer('body', vars.length); + }) + .vars; + var request = ProduceRequest.fromBytes(data); + request.messages.length.should.equal(1); + request.messages[0].payload.toString().should.equal("foo"); + }); + }); + this.server.listen(8542, function() { + that.producer.port = 8542; + that.producer.on('error', function(err) { + should.fail('should not get here'); + }); + that.producer.on('connect', function(){ + var message = new Message('foo'); + that.producer.send(message, function(err) { + should.not.exist(err); + done(); + }); + }); + that.producer.connect(); + }); }); - that.producer.on('connect', function(){ - var message = new Message('foo'); - var options = { - topic : "newtopic", - partition : 1337 - }; - that.producer.send(message, options, function(err) { - should.not.exist(err); - done(); + it("should allow an options parameter to specify topic and partition", function(done) { + var that = this; + this.server = net.createServer(function (socket) { +   socket.on('data', function(data){ + var unpacked = binary.parse(data) + .word32bu('length') + .word16bs('error') + .tap( function(vars) { + this.buffer('body', vars.length); + }) + .vars; + var request = ProduceRequest.fromBytes(data); + request.messages.length.should.equal(1); + request.messages[0].payload.toString().should.equal("foo"); + request.topic.should.equal("newtopic"); + request.partition.should.equal(1337); + }); + }); + this.server.listen(8542, function() { + that.producer.port = 8542; + that.producer.on('error', function(err) { + should.fail('should not get here'); + }); + that.producer.on('connect', function(){ + var message = new Message('foo'); + var options = { + topic : "newtopic", + partition : 1337 + }; + that.producer.send(message, options, function(err) { + should.not.exist(err); + done(); + }); + }); + that.producer.connect(); }); }); - that.producer.connect(); - }); - }); - it("handle non-ascii utf-8", function(done) { - var that = this; - var testString = "fo\u00a0o"; - this.server = net.createServer(function (socket) { -     socket.on('data', function(data){ - var unpacked = binary.parse(data) - .word32bu('length') - .word16bs('error') - .tap( function(vars) { - this.buffer('body', vars.length); - }) - .vars; - var request = ProduceRequest.fromBytes(data); - request.messages.length.should.equal(1); - request.messages[0].payload.toString().should.equal(testString); + it("handle non-ascii utf-8", function(done) { + var that = this; + var testString = "fo\u00a0o"; + this.server = net.createServer(function (socket) { +   socket.on('data', function(data){ + var unpacked = binary.parse(data) + .word32bu('length') + .word16bs('error') + .tap( function(vars) { + this.buffer('body', vars.length); + }) + .vars; + var request = ProduceRequest.fromBytes(data); + request.messages.length.should.equal(1); + request.messages[0].payload.toString().should.equal(testString); + }); + }); + this.server.listen(8542, function() { + that.producer.port = 8542; + that.producer.on('error', function(err) { + should.fail('should not get here'); + }); + that.producer.on('connect', function(){ + var message = new Message(testString); + that.producer.send(message, function(err) { + should.not.exist(err); + done(); + }); + }); + that.producer.connect(); + }); }); }); - this.server.listen(8542, function() { - that.producer.port = 8542; - that.producer.on('error', function(err) { - should.fail('should not get here'); + describe("With mock sockets", function() { + before(function() { + mockSocket.install(); + }); + after(function() { + mockSocket.restore(); }); - that.producer.on('connect', function(){ - var message = new Message(testString); - that.producer.send(message, function(err) { - should.not.exist(err); + beforeEach(function() { + mockSocket.socketBehavior = {}; + mockSocket.openSockets = {}; + Producer.clearConnectionCache(); + }); + it("handles connection failure", function(done) { + mockSocket.socketBehavior["localhost:9092"] = {connect: {type: 'error'}}; + var producer = new Producer("test", { + connectionCache: useConnCache + }); + producer.on('error', function(err) { done(); }); + producer.on('connect', function() { + done(new Error("should fail")); + }); + producer.connect(); + }); + it("handles write failure", function(done) { + mockSocket.socketBehavior["localhost:9092"] = {write: {type: 'error'}}; + var producer = new Producer("test", { + connectionCache: useConnCache + }); + producer.on('error', done); + producer.on('connect', function() { + producer.send("testingtesting", function(err) { + if (err) { + done(); + } else { + done(true); + } + }); + }); + producer.connect(); }); - that.producer.connect(); + it("reconnects on write failure", function(done) { + mockSocket.socketBehavior["localhost:9092"] = {write: {type: 'error', single: true}}; + var producer = new Producer("test", { + connectionCache: useConnCache + }); + producer.on('error', done); + producer.on('connect', function() { + producer.send("testingtesting", done); + }); + producer.connect(); + }); + if (useConnCache) { + it("should work when two are connecting simultaneously", function(done) { + var producer1 = new Producer("test1", { + connectionCache: useConnCache + }); + var producer2 = new Producer("test2", { + connectionCache: useConnCache + }); + mockSocket.socketBehavior["localhost:9092"] = { + connect: { + type: 'wait', + wait: function(callback) { + // after producer1 started to connect + producer2.on('connect', function() { + producer2.send("testingtesting", done); + }); + producer2.on('error', done); + producer2.connect(); + callback(); + } + } + }; + producer1.connect(); + }); + it("should work when one is connecting while the other reconnects", function(done) { + var producer1 = new Producer("test1", { + connectionCache: useConnCache + }); + var producer2 = new Producer("test2", { + connectionCache: useConnCache + }); + mockSocket.socketBehavior["localhost:9092"] = { + write: { + type: 'error', + single: true + }, + connect: { + type: 'series', + series: [ + {type: 'ok'}, + { + type: 'wait', + wait: function(callback) { + // after producer1 started to reconnect + producer2.on('connect', function() { + producer2.send("testingtesting", done); + }); + producer2.on('error', done); + producer2.connect(); + callback(); + } + } + ] + } + }; + producer1.on('connect', function() { + producer1.send("testing", function(err) { + assert(!err); + }); + }); + producer1.connect(); + }); + it("should work when one is writing while the other reconnects", function(done) { + var producer1 = new Producer("test1", { + connectionCache: useConnCache + }); + var producer2 = new Producer("test2", { + connectionCache: useConnCache + }); + mockSocket.socketBehavior["localhost:9092"] = { + write: { + type: 'error', + single: true + }, + connect: { + type: 'series', + series: [ + {type: 'ok'}, + { + type: 'wait', + wait: function(callback) { + // after producer1 started to reconnect + producer2.send("testingtesting", done); + callback(); + } + } + ] + } + }; + var connected = 0; + function onConnect() { + ++connected; + if (connected === 2) { + producer1.send("testing", function(err) { + assert(!err); + }); + } + } + producer1.on('connect', onConnect); + producer2.on('connect', onConnect); + producer1.connect(); + producer2.connect(); + }); + } }); }); }); -}); -}); +} + +testProducer(false); +testProducer(true); diff --git a/test/lib/mock_socket.js b/test/lib/mock_socket.js new file mode 100644 index 0000000..2e1db74 --- /dev/null +++ b/test/lib/mock_socket.js @@ -0,0 +1,160 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var net = require('net'); + +var MockSocket = function(opt) { + EventEmitter.call(this); + this.connected = false; +}; + +util.inherits(MockSocket, EventEmitter); + +exports.socketBehavior = {}; + +function handleBehavior(behavior, type, succ, fail) { + function handle(behaviorType) { + if (behaviorType.type === 'error') { + fail(); + } else if (behaviorType.type === 'wait') { + behaviorType.wait(function(err) { + if (err) { + fail(); + } else { + succ(); + } + }); + } else if (behaviorType.type === 'series') { + var sub = behaviorType.series.shift(); + if (sub) { + handle(sub); + } else { + succ(); + } + } else { + succ(); + } + } + if (behavior && behavior[type]) { + var behaviorType = behavior[type]; + if (behaviorType.single) { + delete behavior[type]; + } + handle(behaviorType); + } else { + succ(); + } +} + +MockSocket.prototype.connect = function(port, host, listener) { + if (!listener && typeof host === 'function') { + listener = host; + host = undefined; + } + if (listener) { + this.on('connect', listener); + } + if (!host) { + host = 'localhost'; + } + var path; + if (typeof port !== 'number') { + path = port; + port = undefined; + this.key = path; + } else { + this.key = host + ":" + port; + } + var behavior = exports.socketBehavior[this.key]; + + function fail() { + that.emit('error', new Error('connect ECONNREFUSED')); + } + function succeed() { + that.connected = true; + that.emit('connect'); + } + + var that = this; + setTimeout(function() { + handleBehavior(behavior, 'connect', succeed, fail); + }, 10); +}; + +MockSocket.prototype.setEncoding = function() {}; + +MockSocket.prototype.write = MockSocket.prototype._write = function(data, encoding, callback) { + var that = this; + if (!callback && typeof encoding === 'function') { + callback = encoding; + encoding = undefined; + } + + function fail() { + var err = new Error('This socket is closed.'); + that.emit('error', err); + if (callback) { + callback(err); + } + } + + if (!this.connected) { + return fail(); + } + var behavior = exports.socketBehavior[this.key]; + handleBehavior(behavior, 'write', callback, fail); + return true; +}; + +MockSocket.prototype._read = function(n) { +}; + +MockSocket.prototype.end = function(data, encoding) { + this.write(data, encoding); + this.connected = false; +}; + +MockSocket.prototype.destroy = function() { + this.connected = false; +}; + +MockSocket.prototype.pause = function() { +}; + +MockSocket.prototype.resume = function() { +}; + +MockSocket.prototype.setTimeout = function(timeout, callback) { + if (callback) { + this.once('timeout', callback); + } +}; + +MockSocket.prototype.setNoDelay = function() { +}; + +MockSocket.prototype.setKeepAlive = function() { +}; + +MockSocket.prototype.address = function() { + return {}; +}; + +MockSocket.prototype.unref = function() {}; +MockSocket.prototype.ref = function() {}; + +var oldSocket = net.Socket; +var oldConnect = net.createConnection; + +exports.install = function() { + net.Socket = MockSocket; + net.createConnection = function(port, host, listener) { + var sock = new MockSocket({}); + sock.connect(port, host, listener); + return sock; + }; +}; + +exports.restore = function() { + net.Socket = oldSocket; + net.createConnection = oldConnect; +};