Skip to content

Commit

Permalink
[Feature] Add/Remove workers dynamically. (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Raynor authored Jul 23, 2016
1 parent f9915c9 commit fa309f0
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feature - Added ability to dynamically manage workers using `q.getWorkerCount()`, `q.addWorker()`, and `q.shutdownWorker()` (thanks to @startswithaj)
12 changes: 11 additions & 1 deletion docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [Queue Security](#queue-security)
* [Defining Specs (Optional)](#defining-specs-optional)
* [Graceful Shutdown](#graceful-shutdown)
* [Dynamic Worker Count](#dynamic-worker-count)
* [Message Sanitization, Revisited](#message-sanitization-revisited)
* [Custom references to tasks and specs](#custom-references-to-tasks-and-specs)
* [Wrap Up](#wrap-up)
Expand Down Expand Up @@ -87,7 +88,7 @@ Multiple queue workers can be initialized on multiple machines and Firebase-Queu

Queue workers can take an optional options object to specify:
- `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec.
- `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker.
- `numWorkers` - specifies the number of initial workers to run simultaneously for this node.js thread. Defaults to 1 worker, and can be updated once the queue has been initialized (see the [Dynamic Worker Count](#dynamic-worker-count) section).
- `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`.
- `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error.

Expand Down Expand Up @@ -368,6 +369,15 @@ process.on('SIGINT', function() {
```


## Dynamic Worker Count

The number of workers running simultaneously in the same node.js thread can be managed dynamically using the following three methods on the instantiated Queue object:

- `getWorkerCount()` - This method returns the current number of workers on a queue.
- `addWorker()` - This method instantiates a new worker with the queue's current specs.
- `shutdownWorker()` - This method gracefully shuts down a worker and returns a promise fulfilled when shutdown. If there are no more workers to shutdown, the promise will be rejected.


## Message Sanitization, Revisited

In our example at the beginning, you wanted to perform several actions on your chat system:
Expand Down
4 changes: 4 additions & 0 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,10 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) {
QueueWorker.prototype.shutdown = function() {
var self = this;

if (!_.isNull(self.shutdownDeferred)) {
return self.shutdownDeferred.promise;
}

logger.debug(self._getLogEntry('shutting down'));

// Set the global shutdown deferred promise, which signals we're shutting down
Expand Down
74 changes: 66 additions & 8 deletions src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ function Queue() {
self.sanitize = DEFAULT_SANITIZE;
self.suppressStack = DEFAULT_SUPPRESS_STACK;
self.initialized = false;
self.shuttingDown = false;

self.specChangeListener = null;

Expand Down Expand Up @@ -173,6 +174,7 @@ function Queue() {
for (var k = 0; k < self.numWorkers; k++) {
self.workers[k].setTaskSpec(taskSpec);
}
self.currentTaskSpec = taskSpec;
self.initialized = true;
}, /* istanbul ignore next */ function(err) {
logger.debug('Queue(): Error connecting to Firebase reference',
Expand All @@ -183,25 +185,81 @@ function Queue() {
return self;
}


/**
* Gracefully shuts down a queue.
* @returns {RSVP.Promise} A promise fulfilled when all the worker processes
* have finished their current tasks and are no longer listening for new ones.
*/
Queue.prototype.shutdown = function() {
var self = this;

this.shuttingDown = true;
logger.debug('Queue: Shutting down');
if (!_.isNull(self.specChangeListener)) {
self.specsRef.child(self.specId).off('value',
self.specChangeListener);
self.specChangeListener = null;
if (!_.isNull(this.specChangeListener)) {
this.specsRef.child(this.specId).off('value',
this.specChangeListener);
this.specChangeListener = null;
}

return RSVP.all(_.map(self.workers, function(worker) {
return RSVP.all(_.map(this.workers, function(worker) {
return worker.shutdown();
}));
};

/**
* Gets queue worker count.
* @returns {Number} Total number of workers for this queue.
*/
Queue.prototype.getWorkerCount = function() {
return this.workers.length;
};

/**
* Adds a queue worker.
* @returns {QueueWorker} the worker created.
*/
Queue.prototype.addWorker = function() {
if (this.shuttingDown) {
throw new Error('Cannot add worker while queue is shutting down');
}

logger.debug('Queue: adding worker');
var processId = (this.specId ? this.specId + ':' : '') + this.workers.length;
var worker = new QueueWorker(
this.tasksRef,
processId,
this.sanitize,
this.suppressStack,
this.processingFunction
);
this.workers.push(worker);

if (_.isUndefined(this.specId)) {
worker.setTaskSpec(DEFAULT_TASK_SPEC);
// if the currentTaskSpec is not yet set it will be called once it's fetched
} else if (!_.isUndefined(this.currentTaskSpec)) {
worker.setTaskSpec(this.currentTaskSpec);
}

return worker;
};

/**
* Shutdowns a queue worker if one exists.
* @returns {RSVP.Promise} A promise fulfilled once the worker is shutdown
* or rejected if there are no workers left to shutdown.
*/
Queue.prototype.shutdownWorker = function() {
var worker = this.workers.pop();

var promise;
if (_.isUndefined(worker)) {
promise = RSVP.reject(new Error('No workers to shutdown'));
} else {
logger.debug('Queue: shutting down worker');
promise = worker.shutdown();
}

return promise;
};


module.exports = Queue;
23 changes: 21 additions & 2 deletions test/lib/queue_worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2117,7 +2117,7 @@ describe('QueueWorker', function() {
setTimeout(function() {
callbackComplete = true;
resolve();
}, 250);
}, 500);
});
});

Expand Down Expand Up @@ -2147,7 +2147,26 @@ describe('QueueWorker', function() {
} catch (errorB) {
done(errorB);
}
}, 100);
}, 500);
});
});

it('should return the same shutdown promise if shutdown is called twice', function(done) {
qw.setTaskSpec(th.validBasicTaskSpec);
tasksRef.push({
foo: 'bar'
}, function(errorA) {
if (errorA) {
return done(errorA);
}
try {
var firstPromise = qw.shutdown();
var secondPromise = qw.shutdown();
expect(firstPromise).to.deep.equal(secondPromise);
return done();
} catch (errorB) {
return done(errorB);
}
});
});
});
Expand Down
59 changes: 59 additions & 0 deletions test/queue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,65 @@ describe('Queue', function() {
});
});

describe('#getWorkerCount', function() {
it('should return worker count with options.numWorkers', function() {
var numWorkers = 10;
var q = new th.Queue(th.testRef, { numWorkers: numWorkers }, _.noop);
expect(q.getWorkerCount()).to.equal(numWorkers);
});
});

describe('#addWorker', function() {
it('should add worker', function() {
var q = new th.Queue(th.testRef, _.noop);
expect(q.getWorkerCount()).to.equal(1);
q.addWorker();
expect(q.getWorkerCount()).to.equal(2);
});

it('should add worker with correct process id', function() {
var specId = 'test_task';
var q = new th.Queue(th.testRef, { specId: specId }, _.noop);
var worker = q.addWorker();
var specRegex = new RegExp('^' + specId + ':1:[a-f0-9\\-]{36}$');
expect(worker.processId).to.match(specRegex);
});

it('should not allow a worker to be added if the queue is shutting down', function() {
var q = new th.Queue(th.testRef, _.noop);
expect(q.getWorkerCount()).to.equal(1);
q.shutdown();
expect(function() {
q.addWorker();
}).to.throw('Cannot add worker while queue is shutting down');
});
});

describe('#shutdownWorker', function() {
it('should remove worker', function() {
var q = new th.Queue(th.testRef, _.noop);
expect(q.getWorkerCount()).to.equal(1);
q.shutdownWorker();
expect(q.getWorkerCount()).to.equal(0);
});

it('should shutdown worker', function() {
var q = new th.Queue(th.testRef, _.noop);
expect(q.getWorkerCount()).to.equal(1);
var workerShutdownPromise = q.shutdownWorker();
return workerShutdownPromise;
});

it('should reject when no workers remaining', function() {
var q = new th.Queue(th.testRef, _.noop);
expect(q.getWorkerCount()).to.equal(1);
q.shutdownWorker();
return q.shutdownWorker().catch(function(error) {
expect(error.message).to.equal('No workers to shutdown');
});
});
});

describe('#shutdown', function() {
var q;

Expand Down

1 comment on commit fa309f0

@varungupta85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drtriumph how is this feature intended to be used? It would be great if you could add an example of how/when to add a worker and when to shut it down. I want to add a new worker when all the workers for a given queue are busy and then shut it down once the pre-configured workers are available again. But I am not sure how to detect when a queue had run out of workers?

Please sign in to comment.