From fa309f081aae74617edafbf561ce7b82a8e284ae Mon Sep 17 00:00:00 2001 From: Chris Raynor Date: Fri, 22 Jul 2016 17:27:22 -0700 Subject: [PATCH] [Feature] Add/Remove workers dynamically. (#72) --- changelog.txt | 1 + docs/guide.md | 12 +++++- src/lib/queue_worker.js | 4 ++ src/queue.js | 74 +++++++++++++++++++++++++++++++---- test/lib/queue_worker.spec.js | 23 ++++++++++- test/queue.spec.js | 59 ++++++++++++++++++++++++++++ 6 files changed, 162 insertions(+), 11 deletions(-) diff --git a/changelog.txt b/changelog.txt index e69de29..ecf79d0 100644 --- a/changelog.txt +++ b/changelog.txt @@ -0,0 +1 @@ +feature - Added ability to dynamically manage workers using `q.getWorkerCount()`, `q.addWorker()`, and `q.shutdownWorker()` (thanks to @startswithaj) diff --git a/docs/guide.md b/docs/guide.md index ca30900..f920aa3 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -10,6 +10,7 @@ * [Queue Security](#queue-security) * [Defining Specs (Optional)](#defining-specs-optional) * [Graceful Shutdown](#graceful-shutdown) + * [Dynamic Worker Count](#dynamic-worker-count) * [Message Sanitization, Revisited](#message-sanitization-revisited) * [Custom references to tasks and specs](#custom-references-to-tasks-and-specs) * [Wrap Up](#wrap-up) @@ -87,7 +88,7 @@ Multiple queue workers can be initialized on multiple machines and Firebase-Queu Queue workers can take an optional options object to specify: - `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec. - - `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker. + - `numWorkers` - specifies the number of initial workers to run simultaneously for this node.js thread. Defaults to 1 worker, and can be updated once the queue has been initialized (see the [Dynamic Worker Count](#dynamic-worker-count) section). - `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`. - `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error. @@ -368,6 +369,15 @@ process.on('SIGINT', function() { ``` +## Dynamic Worker Count + +The number of workers running simultaneously in the same node.js thread can be managed dynamically using the following three methods on the instantiated Queue object: + +- `getWorkerCount()` - This method returns the current number of workers on a queue. +- `addWorker()` - This method instantiates a new worker with the queue's current specs. +- `shutdownWorker()` - This method gracefully shuts down a worker and returns a promise fulfilled when shutdown. If there are no more workers to shutdown, the promise will be rejected. + + ## Message Sanitization, Revisited In our example at the beginning, you wanted to perform several actions on your chat system: diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index f9d2bb8..0674c88 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -750,6 +750,10 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) { QueueWorker.prototype.shutdown = function() { var self = this; + if (!_.isNull(self.shutdownDeferred)) { + return self.shutdownDeferred.promise; + } + logger.debug(self._getLogEntry('shutting down')); // Set the global shutdown deferred promise, which signals we're shutting down diff --git a/src/queue.js b/src/queue.js index a9ba83b..2c3f1ac 100644 --- a/src/queue.js +++ b/src/queue.js @@ -63,6 +63,7 @@ function Queue() { self.sanitize = DEFAULT_SANITIZE; self.suppressStack = DEFAULT_SUPPRESS_STACK; self.initialized = false; + self.shuttingDown = false; self.specChangeListener = null; @@ -173,6 +174,7 @@ function Queue() { for (var k = 0; k < self.numWorkers; k++) { self.workers[k].setTaskSpec(taskSpec); } + self.currentTaskSpec = taskSpec; self.initialized = true; }, /* istanbul ignore next */ function(err) { logger.debug('Queue(): Error connecting to Firebase reference', @@ -183,25 +185,81 @@ function Queue() { return self; } - /** * Gracefully shuts down a queue. * @returns {RSVP.Promise} A promise fulfilled when all the worker processes * have finished their current tasks and are no longer listening for new ones. */ Queue.prototype.shutdown = function() { - var self = this; - + this.shuttingDown = true; logger.debug('Queue: Shutting down'); - if (!_.isNull(self.specChangeListener)) { - self.specsRef.child(self.specId).off('value', - self.specChangeListener); - self.specChangeListener = null; + if (!_.isNull(this.specChangeListener)) { + this.specsRef.child(this.specId).off('value', + this.specChangeListener); + this.specChangeListener = null; } - return RSVP.all(_.map(self.workers, function(worker) { + return RSVP.all(_.map(this.workers, function(worker) { return worker.shutdown(); })); }; +/** + * Gets queue worker count. + * @returns {Number} Total number of workers for this queue. + */ +Queue.prototype.getWorkerCount = function() { + return this.workers.length; +}; + +/** + * Adds a queue worker. + * @returns {QueueWorker} the worker created. + */ +Queue.prototype.addWorker = function() { + if (this.shuttingDown) { + throw new Error('Cannot add worker while queue is shutting down'); + } + + logger.debug('Queue: adding worker'); + var processId = (this.specId ? this.specId + ':' : '') + this.workers.length; + var worker = new QueueWorker( + this.tasksRef, + processId, + this.sanitize, + this.suppressStack, + this.processingFunction + ); + this.workers.push(worker); + + if (_.isUndefined(this.specId)) { + worker.setTaskSpec(DEFAULT_TASK_SPEC); + // if the currentTaskSpec is not yet set it will be called once it's fetched + } else if (!_.isUndefined(this.currentTaskSpec)) { + worker.setTaskSpec(this.currentTaskSpec); + } + + return worker; +}; + +/** + * Shutdowns a queue worker if one exists. + * @returns {RSVP.Promise} A promise fulfilled once the worker is shutdown + * or rejected if there are no workers left to shutdown. + */ +Queue.prototype.shutdownWorker = function() { + var worker = this.workers.pop(); + + var promise; + if (_.isUndefined(worker)) { + promise = RSVP.reject(new Error('No workers to shutdown')); + } else { + logger.debug('Queue: shutting down worker'); + promise = worker.shutdown(); + } + + return promise; +}; + + module.exports = Queue; diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index 1be2812..e121861 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -2117,7 +2117,7 @@ describe('QueueWorker', function() { setTimeout(function() { callbackComplete = true; resolve(); - }, 250); + }, 500); }); }); @@ -2147,7 +2147,26 @@ describe('QueueWorker', function() { } catch (errorB) { done(errorB); } - }, 100); + }, 500); + }); + }); + + it('should return the same shutdown promise if shutdown is called twice', function(done) { + qw.setTaskSpec(th.validBasicTaskSpec); + tasksRef.push({ + foo: 'bar' + }, function(errorA) { + if (errorA) { + return done(errorA); + } + try { + var firstPromise = qw.shutdown(); + var secondPromise = qw.shutdown(); + expect(firstPromise).to.deep.equal(secondPromise); + return done(); + } catch (errorB) { + return done(errorB); + } }); }); }); diff --git a/test/queue.spec.js b/test/queue.spec.js index e51d772..4f6c554 100644 --- a/test/queue.spec.js +++ b/test/queue.spec.js @@ -144,6 +144,65 @@ describe('Queue', function() { }); }); + describe('#getWorkerCount', function() { + it('should return worker count with options.numWorkers', function() { + var numWorkers = 10; + var q = new th.Queue(th.testRef, { numWorkers: numWorkers }, _.noop); + expect(q.getWorkerCount()).to.equal(numWorkers); + }); + }); + + describe('#addWorker', function() { + it('should add worker', function() { + var q = new th.Queue(th.testRef, _.noop); + expect(q.getWorkerCount()).to.equal(1); + q.addWorker(); + expect(q.getWorkerCount()).to.equal(2); + }); + + it('should add worker with correct process id', function() { + var specId = 'test_task'; + var q = new th.Queue(th.testRef, { specId: specId }, _.noop); + var worker = q.addWorker(); + var specRegex = new RegExp('^' + specId + ':1:[a-f0-9\\-]{36}$'); + expect(worker.processId).to.match(specRegex); + }); + + it('should not allow a worker to be added if the queue is shutting down', function() { + var q = new th.Queue(th.testRef, _.noop); + expect(q.getWorkerCount()).to.equal(1); + q.shutdown(); + expect(function() { + q.addWorker(); + }).to.throw('Cannot add worker while queue is shutting down'); + }); + }); + + describe('#shutdownWorker', function() { + it('should remove worker', function() { + var q = new th.Queue(th.testRef, _.noop); + expect(q.getWorkerCount()).to.equal(1); + q.shutdownWorker(); + expect(q.getWorkerCount()).to.equal(0); + }); + + it('should shutdown worker', function() { + var q = new th.Queue(th.testRef, _.noop); + expect(q.getWorkerCount()).to.equal(1); + var workerShutdownPromise = q.shutdownWorker(); + return workerShutdownPromise; + }); + + it('should reject when no workers remaining', function() { + var q = new th.Queue(th.testRef, _.noop); + expect(q.getWorkerCount()).to.equal(1); + q.shutdownWorker(); + return q.shutdownWorker().catch(function(error) { + expect(error.message).to.equal('No workers to shutdown'); + }); + }); + }); + describe('#shutdown', function() { var q;