diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 38f8e00..97aff16 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -95,9 +95,12 @@ QueueWorker.prototype._getLogEntry = function(message) { * Returns the state of a task to the start state. * @param {firebase.database.Reference} taskRef Firebase Realtime Database * reference to the Firebase location of the task that's timed out. + * @param {Boolean} immediate Whether this is an immediate update to a task we + * expect this worker to own, or whether it's a timeout reset that we don't + * necessarily expect this worker to own. * @returns {RSVP.Promise} Whether the task was able to be reset. */ -QueueWorker.prototype._resetTask = function(taskRef, deferred) { +QueueWorker.prototype._resetTask = function(taskRef, immediate, deferred) { var self = this; var retries = 0; @@ -111,7 +114,12 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) { if (_.isNull(task)) { return task; } - if (task._state === self.inProgressState) { + var id = self.processId + ':' + self.taskNumber; + var correctState = (task._state === self.inProgressState); + var correctOwner = (task._owner === id || !immediate); + var timeSinceUpdate = Date.now() - _.get(task, '_state_changed', 0); + var timedOut = ((self.taskTimeout && timeSinceUpdate > self.taskTimeout) || immediate); + if (correctState && correctOwner && timedOut) { task._state = self.startState; task._state_changed = SERVER_TIMESTAMP; task._owner = null; @@ -125,7 +133,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) { if (error) { if (++retries < MAX_TRANSACTION_ATTEMPTS) { logger.debug(self._getLogEntry('reset task errored, retrying'), error); - setImmediate(self._resetTask.bind(self), taskRef, deferred); + setImmediate(self._resetTask.bind(self), taskRef, immediate, deferred); } else { var errorMsg = 'reset task errored too many times, no longer retrying'; logger.debug(self._getLogEntry(errorMsg), error); @@ -491,7 +499,7 @@ QueueWorker.prototype._tryToProcess = function(deferred) { if (self.busy) { // Worker has become busy while the transaction was processing // so give up the task for now so another worker can claim it - self._resetTask(nextTaskRef); + self._resetTask(nextTaskRef, true); } else { self.busy = true; self.taskNumber += 1; @@ -592,7 +600,7 @@ QueueWorker.prototype._setUpTimeouts = function() { self.expiryTimeouts[taskName] = setTimeout( self._resetTask.bind(self), expires, - ref); + ref, false); }; self.processingTaskAddedListener = self.processingTasksRef.on('child_added', @@ -704,7 +712,7 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) { self.currentTaskRef.child('_owner').off( 'value', self.currentTaskListener); - self._resetTask(self.currentTaskRef); + self._resetTask(self.currentTaskRef, true); self.currentTaskRef = null; self.currentTaskListener = null; } diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index e121861..540f0f9 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -105,6 +105,92 @@ describe('QueueWorker', function() { testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 10 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + return testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._resetTask(testRef, true); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state_changed']); + expect(task._state_changed).to.be.closeTo(new Date().getTime() + th.offset, 250); + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + + it('should not reset a task if immediate set but no longer owned by current worker', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validBasicTaskSpec); + var originalTask = { + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': 'someone-else', + '_progress': 0 + }; + testRef = tasksRef.push(originalTask, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + return qw._resetTask(testRef, true).then(function() { + testRef.once('value', function(snapshot) { + try { + expect(snapshot.val()).to.deep.equal(originalTask); + done(); + } catch (errorB) { + done(errorB); + } + }); + }).catch(done); + }); + }); + + it('should not reset a task if immediate not set and it is has changed state recently', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validBasicTaskSpec); + var originalTask = { + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': 'someone', + '_progress': 0 + }; + testRef = tasksRef.push(originalTask, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + return qw._resetTask(testRef, false).then(function() { + testRef.once('value', function(snapshot) { + try { + expect(snapshot.val()).to.deep.equal(originalTask); + done(); + } catch (errorB) { + done(errorB); + } + }); + }).catch(done); + }); + }); + + it('should reset a task that is currently in progress that has timed out', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validTaskSpecWithTimeout); + testRef = tasksRef.push({ + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime() - th.validTaskSpecWithTimeout.timeout, '_owner': 'someone', '_progress': 10 }, function(errorA) { @@ -116,7 +202,7 @@ describe('QueueWorker', function() { return testRef.on('value', function(snapshot) { if (initial) { initial = false; - qw._resetTask(testRef); + qw._resetTask(testRef, false); } else { try { var task = snapshot.val(); @@ -137,7 +223,7 @@ describe('QueueWorker', function() { testRef = tasksRef.push(); qw.currentTaskRef = testRef; - qw._resetTask(testRef).then(function() { + qw._resetTask(testRef, true).then(function() { testRef.once('value', function(snapshot) { try { expect(snapshot.val()).to.be.null; @@ -163,7 +249,7 @@ describe('QueueWorker', function() { return done(errorA); } qw.currentTaskRef = testRef; - return qw._resetTask(testRef).then(function() { + return qw._resetTask(testRef, true).then(function() { testRef.once('value', function(snapshot) { try { expect(snapshot.val()).to.deep.equal(originalTask); @@ -189,7 +275,7 @@ describe('QueueWorker', function() { return done(errorA); } qw.currentTaskRef = testRef; - return qw._resetTask(testRef).then(function() { + return qw._resetTask(testRef, true).then(function() { testRef.once('value', function(snapshot) { try { expect(snapshot.val()).to.deep.equal(originalTask);