Skip to content

Commit

Permalink
fix even more redis callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed Nov 5, 2016
1 parent 8c11c1b commit d632a72
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
7 changes: 7 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
0.11.5 / 2016-11-05
===================

* Fix even more redis command callbacks
* Fix redis commands SLC integration #978


0.11.4 / 2016-10-21
===================

Expand Down
9 changes: 7 additions & 2 deletions lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ var EventEmitter = require('events').EventEmitter
, Job = require('./queue/job')
, Warlock = require('node-redis-warlock')
, _ = require('lodash')
, redis = require('./redis');
, redis = require('./redis')
, noop = function(){};

/**
* Expose `Queue`.
Expand Down Expand Up @@ -315,6 +316,7 @@ Queue.prototype.watchStuckJobs = function( ms ) {
*/

Queue.prototype.setting = function( name, fn ) {
fn = fn || noop;
this.client.hget(this.client.getKey('settings'), name, fn);
return this;
};
Expand Down Expand Up @@ -342,7 +344,7 @@ Queue.prototype.process = function( type, n, fn ) {
worker.on('job complete', function( job ) {
// guard against emit after shutdown
if( self.client ) {
self.client.incrby(self.client.getKey('stats:work-time'), job.duration, function () {});
self.client.incrby(self.client.getKey('stats:work-time'), job.duration, noop);
}
});
// Save worker so we can access it later
Expand Down Expand Up @@ -449,6 +451,7 @@ Queue.prototype.shutdown = function( timeout, type, fn ) {
*/

Queue.prototype.types = function( fn ) {
fn = fn || noop;
this.client.smembers(this.client.getKey('job:types'), fn);
return this;
};
Expand Down Expand Up @@ -501,6 +504,7 @@ Queue.prototype.workTime = function( fn ) {
*/

Queue.prototype.cardByType = function( type, state, fn ) {
fn = fn || noop;
this.client.zcard(this.client.getKey('jobs:' + type + ':' + state), fn);
return this;
};
Expand All @@ -515,6 +519,7 @@ Queue.prototype.cardByType = function( type, state, fn ) {
*/

Queue.prototype.card = function( state, fn ) {
fn = fn || noop;
this.client.zcard(this.client.getKey('jobs:' + state), fn);
return this;
};
Expand Down
11 changes: 7 additions & 4 deletions lib/queue/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ var EventEmitter = require('events').EventEmitter
, redis = require('../redis')
, _ = require('lodash')
, util = require('util')
, noop = function() {
};
, noop = function() {};

/**
* Expose `Job`.
Expand Down Expand Up @@ -339,7 +338,11 @@ Job.prototype.toJSON = function() {


Job.prototype.refreshTtl = function() {
('active' === this.state() && this._ttl > 0) ? this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid) : noop();
('active' === this.state() && this._ttl > 0)
?
this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop)
:
noop();
};


Expand Down Expand Up @@ -370,7 +373,7 @@ Job.prototype.log = function( str ) {
}else{
var formatted = util.inspect(str);
}
this.client.rpush(this.client.getKey('job:' + this.id + ':log'), formatted);
this.client.rpush(this.client.getKey('job:' + this.id + ':log'), formatted, noop);
this.set('updated_at', Date.now());
return this;
};
Expand Down
7 changes: 3 additions & 4 deletions lib/queue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ var EventEmitter = require('events').EventEmitter
, redis = require('../redis')
, events = require('./events')
, Job = require('./job')
, noop = function() {
};
, noop = function() {};

/**
* Expose `Worker`.
Expand Down Expand Up @@ -272,7 +271,7 @@ Worker.prototype.getJob = function( fn ) {
client.blpop(client.getKey(self.type + ':jobs'), 0, function( err ) {
if( err || !self.running ) {
if( self.client && self.client.connected && !self.client.closing ) {
self.client.lpush(self.client.getKey(self.type + ':jobs'), 1);
self.client.lpush(self.client.getKey(self.type + ':jobs'), 1, noop);
}
return fn(err); // SAE: Added to avoid crashing redis on zpop
}
Expand Down Expand Up @@ -326,7 +325,7 @@ Worker.prototype.shutdown = function( timeout, fn ) {
delete clients[ self.type ];
self.cleaned_up = true;
//fix half-blob job fetches if any
self.client.lpush(self.client.getKey(self.type + ':jobs'), 1, fn);
self.client.lpush(self.client.getKey(self.type + ':jobs'), 1, fn || noop);
};

if( !this.running ) return _fn();
Expand Down

0 comments on commit d632a72

Please sign in to comment.