Skip to content

Commit

Permalink
#24
Browse files Browse the repository at this point in the history
  • Loading branch information
veny committed Jun 26, 2015
1 parent be20436 commit 2c6c574
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 9 deletions.
6 changes: 0 additions & 6 deletions lib/gearmanode.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,3 @@ exports.worker = function (options) {
exports.Client = Client;
exports.Worker = Worker;
exports.Job = job.Job;


// TODOs
// review event model
// API documentation
// worker multiple function and ending
20 changes: 18 additions & 2 deletions lib/gearmanode/job-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var JobServer = exports.JobServer = function (options) {

this.connected = false;
this.jobsWaiting4Created = []; // submited jobs waiting for JOB_CREATED response
this.wrongDisconnectAt = 0;
};

// inheritance
Expand Down Expand Up @@ -80,7 +81,7 @@ JobServer.prototype.connect = function (callback) {
this.socket = net.createConnection(this.port, this.host);

// fallback event registration, only for debugging purposes
eventNames = [ 'close', 'timeout', 'drain' ];
eventNames = [ 'lookup', 'timeout', 'drain' ];
eventNames.forEach(function (name) {
self.socket.on(name, function() {
JobServer.logger.log('warn', 'unhandled event, name=%s', name);
Expand All @@ -91,6 +92,7 @@ JobServer.prototype.connect = function (callback) {
this.socket.on('connect', function () {
self.socket.setKeepAlive(true);
self.connected = true;
self.wrongDisconnectAt = 0;
self.emit('ConnectInternal140319214558');
self.clientOrWorker.emit('socketConnect', self.getUid()); // trigger event
JobServer.logger.log('debug', 'connection established, uid=%s', self.getUid());
Expand Down Expand Up @@ -132,14 +134,25 @@ JobServer.prototype.connect = function (callback) {
// }
});

// emitted once the socket is fully closed
this.socket.on('close', function (had_error) {
if (had_error) { // if the socket was closed due to a transmission error
JobServer.logger.log('warn', 'connection closed due to an transmission error, uid=%s', self.getUid());
// inform load balancer that this server is invalid
if (self.clientOrWorker._type === 'Client') {
self.clientOrWorker.loadBalancer.badOne(self.clientOrWorker._getJobServerIndexByUid(self.getUid()));
}
self.disconnect(true); // true => simulates an error to set `wrongDisconnectAt` attribute
}
});

// emitted when the other end of the socket sends a FIN packet (termination of other end)
this.socket.on('end', function (err) {
JobServer.logger.log('warn', 'connection terminated, uid=%s', self.getUid());
// inform load balancer that this server is invalid
if (self.clientOrWorker._type === 'Client') {
self.clientOrWorker.loadBalancer.badOne(self.clientOrWorker._getJobServerIndexByUid(self.getUid()));
}

self.disconnect(err);
});
}
Expand All @@ -162,6 +175,9 @@ JobServer.prototype.disconnect = function (err) {

this.connected = false;
this.removeAllListeners();
if (err !== undefined) {
this.wrongDisconnectAt = new Date();
}

// close jobs waiting for packet JOB_CREATED
for (i = 0; i < this.jobsWaiting4Created.length; i ++) {
Expand Down
1 change: 1 addition & 0 deletions lib/gearmanode/version.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


exports.VERSION_HISTORY = [
['0.9.1', '2015-06-26', 'PR #35, Enh #24'],
['0.9.0', '2015-06-19', 'PR #29, fully implemented Gearman Protocol'],
['0.2.2', '2015-02-04', 'BF #26'],
['0.2.1', '2015-01-04', 'BF #25'],
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "gearmanode",
"version": "0.9.0",
"version": "0.9.1",
"description": "Node.js library for the Gearman distributed job system with support for multiple servers",
"keywords": ["gearman", "distributed", "message queue", "job", "worker"],
"author": "Vaclav Sykora <[email protected]>",
Expand Down
13 changes: 13 additions & 0 deletions test/test-job-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ describe('JobServer', function() {
should.not.exist(js.clientOrWorker);
js.jobsWaiting4Created.length.should.equal(0);
js.getUid().should.equal('localhost:4730');
should.exist(js.wrongDisconnectAt);
js.wrongDisconnectAt.should.be.equal(0);
})
it('should return error when missing mandatory options', function() {
js = new JobServer();
Expand All @@ -42,6 +44,7 @@ describe('JobServer', function() {
it('should change inner state when connection OK', function(done) {
js.connect(function(err) {
js.connected.should.be.true;
js.wrongDisconnectAt.should.be.equal(0);
should.exist(js.socket);
js.socket.should.be.an.instanceof(net.Socket);
done();
Expand Down Expand Up @@ -75,6 +78,7 @@ describe('JobServer', function() {
err.should.be.an.instanceof(Error);
err.code.should.be.equal('ECONNREFUSED');
js.connected.should.be.false;
js.wrongDisconnectAt.should.be.greaterThan(0);
should.not.exist(js.socket);
done();
})
Expand Down Expand Up @@ -108,12 +112,21 @@ describe('JobServer', function() {
js.socket.listeners('connect').length.should.equal(2); // one is mine, the other from some infrastructure
js.disconnect();
js.connected.should.be.false;
js.wrongDisconnectAt.should.be.equal(0);
should.not.exist(js.socket);
should.exist(js.clientOrWorker);
js.jobsWaiting4Created.length.should.equal(0);
done();
})
})
it('should set `wrongDisconnectAt` when disconnect caused by a problem', function(done) {
var socket = js.connect(function(err, jobServer) {
should.not.exist(err);
js.disconnect(true); // true => simulate an error object
js.wrongDisconnectAt.should.be.greaterThan(0);
done();
})
})
it('should emit event on client/worker', function(done) {
js.clientOrWorker.emit = sinon.spy();
js.connect(function() {
Expand Down

0 comments on commit 2c6c574

Please sign in to comment.