- Purpose of a Queue
- The Queue in Your Firebase Database
- Queue Workers
- Pushing Tasks Onto the Queue
- Queue Security
- Defining Specs (Optional)
- Graceful Shutdown
- Dynamic Worker Count
- Message Sanitization, Revisited
- Custom references to tasks and specs
- Wrap Up
Queues can be used in your Firebase app to organize workers or perform background work like generating thumbnails of images, filtering message contents and censoring data, or fanning data out to multiple locations in your Firebase Database. First, let's define a few terms we'll use when talking about a queue:
task
- a unit of work that a queue worker can processspec
- a definition of an operation that the queue will perform on matching tasksjob
- one or morespec
's that specify a series of ordered operations to be performedworker
- an individual process that picks up tasks with a certain spec and processes them
Let's take a look at a simple example to see how this works. Imagine you wanted to build a chat application that does two things:
- Sanitize chat message input
- Fan data out to multiple rooms and users
Since chat message sanitization can't happen purely on the client side, as that would allow a malicious client to circumvent client side restrictions, you'll have to run this process on a trusted server process.
Using Firebase Queue, you can create specs for each of these tasks, and then use workers to process the individual tasks to complete the job. We'll explore the queue, adding tasks, assigning workers, and creating custom specs to create full jobs, then revisit the example above.
The queue relies on having a Firebase Database reference to coordinate workers e.g. https://databaseName.firebaseio.com/queue
. This queue can be stored at any path in your Firebase Database, and you can have multiple queues as well. The queue will respond to tasks pushed onto the tasks
subtree and optionally read specifications from a specs
subtree.
queue
- specs
- tasks
See Custom references to tasks and specs for defining the locations of these other than the default.
Firebase Queue works with a Firebase Database reference from either the firebase-admin
(for admin access) or firebase
(for end-user access) npm package, though it is mainly intended to perform administrative actions. Check out this blog post for an introduction to firebase-admin
.
The basic unit of the queue is the queue worker: the function that claims a task, performs the appropriate processing on the data, and either returns the transformed data, or an appropriate error.
You can start a worker process by passing in a Firebase Database ref
along with a processing function (described below), as follows:
// my_queue_worker.js
var Queue = require('firebase-queue');
var admin = require('firebase-admin');
var serviceAccount = require('path/to/serviceAccountCredentials.json');
admin.initializeApp({
credential: admin.credential.cert(serviceAccount),
databaseURL: '<your-database-url>'
});
var ref = admin.database().ref('queue');
var queue = new Queue(ref, function(data, progress, resolve, reject) {
// Read and process task data
console.log(data);
// Do some work
progress(50);
// Finish the task asynchronously
setTimeout(function() {
resolve();
}, 1000);
});
node my_queue_worker.js
Multiple queue workers can be initialized on multiple machines and Firebase-Queue will ensure that only one worker is processing a single queue task at a time.
Queue workers can take an optional options object to specify:
specId
- specifies the spec type for this worker. This is important when creating multiple specs. Defaults tonull
which uses the default spec.numWorkers
- specifies the number of initial workers to run simultaneously on a single node.js thread. Defaults to 1 worker, and can be updated once the queue has been initialized (see the Dynamic Worker Count section).sanitize
- specifies whether thedata
object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults totrue
.suppressStack
- specifies whether the queue will suppress error stack traces from being placed in the_error_details
of the task if it's rejected with an Error.
...
var options = {
'specId': 'spec_1',
'numWorkers': 5,
'sanitize': false,
'suppressStack': true
};
var queue = new Queue(ref, options, function(data, progress, resolve, reject) {
...
});
Using any Firebase client or the REST API, push an object with some data to the tasks
subtree of your queue. Queue workers listening on that subtree will automatically pick up and process the new task.
# Using curl in shell
curl -X POST -d '{"foo": "bar"}' https://databaseName.firebaseio.com/queue/tasks.json
or
// Using the web JavaScript client
var ref = firebase.database().ref('queue/tasks');
ref.push({'foo': 'bar'});
When using a custom spec, you can pass a _state
key in with your object, which will allow a custom spec's worker(s) to pick up your task at a specific spec, rather than starting with the starting spec.
{
"foo": "bar",
"boo": "baz",
"_state": "spec_n_start"
}
The processing function provides the body of the data transformation, and allows for completing tasks successfully or with error conditions, as well as reporting the progress of a task. As this function defines the work that the worker must do, this callback function is required. It should take the following four parameters:
A JavaScript object containing the claimed task's data, and can contain any keys and values with the exception of several reserved keys, which are used for tracking worker progress.
The reserved keys are:
_state
- The current state of the task. Will always be the task'sin_progress_state
when passed to the processing function._state_changed
- The timestamp that the task changed into its current state. This will always be the server time when the processing function was called._owner
- A unique ID for the worker and task number combination to ensure only one worker is responsible for the task at any time._progress
- A number between 0 and 100, reset at the start of each task to 0._error_details
- An object containing the error details from a previous task execution. If present, it may contain aprevious_state
string (ornull
if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, anerror
string from thereject()
callback of the previous task, and anattempts
field containing the number of retries attempted before failing a task. If thesuppressStack
queue option is not set totrue
, there may also be aerror_stack
field containg a stack dump of any error passed into thereject()
function._id
- The Firebase key of the task.
By default the data is sanitized of these keys, but you can disable this behavior by setting 'sanitize': false
in the queue options.
A callback function for reporting the progress of the task. progress()
takes a single parameter that must be a number between 0 and 100, and returns a RSVP.Promise that's fulfilled when successfully updated. If this promise is rejected, it's likely that the task is no longer owned by this process (perhaps it has timed out or the task specification has changed) or the worker has lost its connection to Firebase.
By catching when this call fails and cancelling the current task early, the worker can minimize the extra work it does and return to processing new queue tasks sooner:
...
var queue = new Queue(ref, options, function(data, progress, resolve, reject) {
...
function stopProcessing() {
...
}
...
// report current progress
progress(currentProgress).catch(function(errorMessage) {
// we've lost the current task, so stop processing
stopProcessing();
// and reject the task so that we can pick up new tasks
reject(errorMessage);
});
...
});
A callback function for reporting that the current task has been completed and the worker is ready to process another task. Any plain JavaScript object passed into the resolve()
function will be written to the tasks
location and will be available to the next task if the tasks are chained. When a task is resolved, the _progress
field is updated to 100 and the _state
is replaced with either the _new_state
key of the object passed in, or the finished_state
of the task spec. If the task does not have a finished_state
or the _new_state
key is set to false
, the task will be removed from the queue.
A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the error_state
for the job with an additional _error_details
object containing a previous_state
key referencing this task's in_progress_state
. If a string is passed into the reject()
function, the _error_details
will also contain an error
key containing that string. If an Error is passed into the reject()
function, the error
key will contain the error.message
, and if suppressStack
option has not been specified the error_stack
key will contain the error.stack
. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's start_state
.
Securing your queue is an important step in securely processing events that come in. Below is a sample set of security rules that can be tailored to your particular use case.
In this example, there are three categories of users, represented using fields of a Database Auth Variable Override:
auth.canAddTasks
: Users who can add tasks to the queue (could be an authenticated client or a secure server)auth.canProcessTasks
: Users who can process tasks (usually on a secure server)auth.canAddSpecs
: Users who can create and view task specifications (usually on a secure server)
These don't have to use a custom token, for instance you could use auth != null
in place of auth.canAddTasks
if application's users can write directly to the queue. Similarly, auth.canProcessTasks
and auth.canAddSpecs
could be auth.admin === true
if a single trusted server process was used to perform all queue functions.
{
"rules": {
"queue": {
"tasks": {
".read": "auth.canProcessTasks",
".write": "auth.canAddTasks || auth.canProcessTasks",
".indexOn": "_state",
"$taskId": {
".validate": "newData.hasChildren(['property_1', ..., 'property_n'])
|| (auth.canProcessTasks
&& newData.hasChildren(['_state', '_state_changed', '_progress']))",
"_state": {
".validate": "newData.isString()"
},
"_state_changed": {
".validate": "newData.isNumber() && (newData.val() === now
|| data.val() === newData.val())"
},
"_owner": {
".validate": "newData.isString()"
},
"_progress": {
".validate": "newData.isNumber()
&& newData.val() >= 0
&& newData.val() <= 100"
},
"_error_details": {
"error": {
".validate": "newData.isString()"
},
"error_stack": {
".validate": "newData.isString()"
},
"previous_state": {
".validate": "newData.isString()"
},
"original_task": {
/* This space intentionally left blank, for malformed tasks */
},
"attempts": {
".validate": "newData.isNumber() && newData.val() > 0"
},
"$other": {
".validate": false
}
},
"_id": {
".validate": "newData.isString()"
},
"property_1": {
".validate": "/* Insert custom data validation code here */"
},
...
"property_n": {
".validate": "/* Insert custom data validation code here */"
}
}
},
"specs" : {
".read": "auth.canAddSpecs || auth.canProcessTasks",
".write": "auth.canAddSpecs",
"$specId": {
".validate": "newData.hasChild('in_progress_state')",
"start_state": {
".validate": "newData.isString()"
},
"in_progress_state": {
".validate": "newData.isString()"
},
"finished_state": {
".validate": "newData.isString()"
},
"error_state": {
".validate": "newData.isString()"
},
"timeout": {
".validate": "newData.isNumber() && newData.val() > 0"
},
"retries": {
".validate": "newData.isNumber() && newData.val() >= 0"
},
"$other": {
".validate": false
}
}
}
}
}
}
A default spec configuration is assumed if no specs are specified in the specs
subtree of the queue. The default spec has the following characteristics:
{
"default_spec": {
"start_state": null,
"in_progress_state": "in_progress",
"finished_state": null,
"error_state": "error",
"timeout": 300000, // 5 minutes
"retries": 0 // don't retry
}
}
start_state
- The default spec has nostart_state
, which means any task pushed into thetasks
subtree without a_state
key will be picked up by default spec workers. Ifstart_state
is specified, only tasks with that_state
may be claimed by the worker.in_progress_state
- When a worker picks up a task and begins processing it, it will change the tasks's_state
to the value ofin_progress_state
. This is the only required spec property, and it cannot equal thestart_state
,finished_state
, orerror_state
. Thein_progress_state
is important for timeouts. If a timeout occurs on the specific task, any worker defined with thein_progress_state
in question will try to pick it up and reset it to its ownstart_state
. That means it is important thatin_progress_state
is unique, unless you want other workers to pick it up when it times outfinished_state
- The default spec has nofinished_state
so the worker will remove tasks from the queue upon successful completion. Iffinished_state
is specified, then the task's_state
value will be updated to thefinished_state
upon task completion. Setting this value to another spec'sstart_state
is useful for chaining tasks together to create a job. It's possible to override thefinished_state
on a per-task basis by setting the_new_state
key of the object passed intoresolve()
to a string to set the_state
explicitly,null
to remove the_state
so it gets picked up by any spec without astart_state
, orfalse
to remove the task from the queue.error_state
- If the task gets rejected the_state
will be updated to this value and an additional key_error_details
will be populated with theprevious_state
and an optional error message from thereject()
callback. If this isn't specified, it defaults to "error". This can be useful for specifying different error states for different tasks, or chaining errors so that they can be logged.timeout
- The default timeout is 5 minutes. When a task has been claimed by a worker but has not completed withintimeout
milliseconds, the queue will report that task as timed out, and reset that task to be claimable once again. If this is not specified, a task claimed by a worker could be orphaned and left in an unclaimable state if the worker processing it dies before the task is resolved or rejected.retries
- The default spec doesn't retry failed tasks. When a task fails, if there are any remaining attempts, the queue will restart the task by setting the task's_state
to its spec'sstart_state
.
In order to use a job specification other than the default, the specification must be defined in the Firebase under the specs
subtree. This allows us to coordinate job specification changes between workers and enforce expected behavior with Firebase security rules.
In this example, we're chaining three specs to make a job. New tasks pushed onto the queue without a _state
key will be picked up by "spec_1" and go into the spec_1_in_progress
state. Once "spec_1" completes and the task goes into the spec_1_finished
state, "spec_2" takes over and puts it into the spec_2_in_progress
state. Again, once "spec_2" completes and the task goes into the spec_2_finished
state, "spec_3" takes over and puts it into the spec_3_in_progress
state. Finally, "spec_3" removes it once complete. If, during any stage in the process there's an error, the task will end up in an "error" state.
queue
- specs
{
"spec_1": {
"in_progress_state": "spec_1_in_progress",
"finished_state": "spec_1_finished",
"timeout": 5000
},
"spec_2": {
"start_state": "spec_1_finished",
"in_progress_state": "spec_2_in_progress",
"finished_state": "spec_2_finished",
"timeout" : 20000
},
"spec_3": {
"start_state": "spec_2_finished",
"in_progress_state": "spec_3_in_progress",
"timeout": 3000
}
}
Once initialized, a queue can be gracefully shutdown by calling its shutdown()
function. This prevents workers from claiming new tasks, removes all Firebase listeners, and waits until all the current tasks have been completed before resolving the RSVP.Promise returned by the function.
By intercepting for the SIGINT
termination signal like this, you can ensure the queue shuts down gracefully so you don't have to rely on the jobs timing out and being picked up by another worker:
...
var queue = new Queue(ref, function(data, progress, resolve, reject) {
...
});
process.on('SIGINT', function() {
console.log('Starting queue shutdown');
queue.shutdown().then(function() {
console.log('Finished queue shutdown');
process.exit(0);
});
});
The number of workers running simultaneously in the same node.js thread can be managed dynamically using the following three methods on the instantiated Queue object:
getWorkerCount()
- This method returns the current number of workers on a queue.addWorker()
- This method instantiates a new worker with the queue's current specs.shutdownWorker()
- This method gracefully shuts down a worker and returns a promise fulfilled when shutdown. If there are no more workers to shutdown, the promise will be rejected.
In our example at the beginning, you wanted to perform several actions on your chat system:
- Sanitize chat message input
- Fan data out to multiple rooms and users
Together, these two actions form a job, and you can use custom specs, as shown above, to define the flow of tasks in this job. When you start, your Firebase should look like this:
root
- queue
- specs
- sanitize_message
- in_progress_state: "sanitize_message_in_progress"
- finished_state: "sanitize_message_finished"
- fanout_message
- start_state: "sanitize_message_finished"
- in_progress_state: "fanout_message_in_progress"
- error_state: "fanout_message_failed"
- retries: 3
- tasks
/* null, no data */
Let's imagine that you have some front end that allows your users to write their name and a message, and send that to your queue as it's data
. Let's assume your user writes something like the following:
// Using the web JavaScript client
var tasksRef = firebase.database().ref('queue/tasks');
tasksRef.push({
'message': 'Hello Firebase Queue Users!',
'name': 'Chris'
});
Your Firebase Database should now look like this:
root
- queue
- specs
/* same as above */
- tasks
- $taskId
- message: "Hello Firebase Queue Users!"
- name: "Chris"
When your users push data
like the above into the tasks
subtree, tasks will initially start in the sanitize_message
spec because the task has no start_state
. The associated queue can be specified using the following processing function:
// chat_message_sanitization.js
var Queue = require('firebase-queue');
var admin = require('firebase-admin');
var serviceAccount = require('path/to/serviceAccountCredentials.json');
admin.initializeApp({
credential: admin.credential.cert(serviceAccount),
databaseURL: '<your-database-url>'
});
var db = admin.database();
var queueRef = db.ref('queue');
var messagesRef = db.ref('messages');
var options = {
'specId': 'sanitize_message'
};
var sanitizeQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) {
// sanitize input message
data.message = sanitize(data.message);
// pass sanitized message and username along to be fanned out
resolve(data);
});
...
The queue worker will take this task, begin to process it, and update the reserved keys of the task:
root
- queue
- specs
/* same as above */
- tasks
- $taskId
- _owner: $workerUid
- _progress: 0
- _state: "sanitize_message_in_progress"
- _state_changed: 1431475215737
- _id: $taskId
- message: "Hello Firebase Queue Users!"
- name: "Chris"
Once the message is sanitized, it will be resolved and both the reserved keys and the data will be updated in the task (imagine for a minute that queue is a blacklisted word):
root
- queue
- specs
/* same as above */
- tasks
- $taskId
- _owner: null
- _progress: 100
- _state: "sanitize_message_finished"
- _state_changed: 1431475215918
- _id: $taskId
- message: "Hello Firebase ***** Users!"
- name: "Chris"
Now, you want to fan the data out to the messages
subtree of your Firebase Database, using the spec, fanout_message
, so you can set up a second processing function to find tasks whose _state
is sanitize_message_finished
:
...
var options = {
'specId': 'fanout_message',
'numWorkers': 5
};
var fanoutQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) {
// fan data out to /messages; ensure that errors are caught and cause the task to fail
messagesRef.push(data, function(error){
if (error) {
reject(error.message);
} else {
resolve(data);
}
});
});
Since there is no finished_state
in the fanout_message
spec, the task will be purged from the queue after the data is fanned out to the messages node. If the push
fails for some reason, the task will fail and retry, a maximum of three times (as specified in our spec).
While this example is a little contrived since you could perform the sanitization and fanout in a single task, creating multiple specs for our tasks allows us to do things like add selective retries to certain tasks more likely to fail, put additional workers on more expensive tasks, or add expressive error states.
It is possible to specify the locations the queue uses for tasks and the specs explicitly instead of using the defaults. To do this, simply pass an object to the Queue constructor in place of the Firebase reference; this object must contain the keys tasksRef
and specsRef
, and each value must be a Firebase reference.
var Queue = require('firebase-queue');
var admin = require('firebase-admin');
var serviceAccount = require('path/to/serviceAccountCredentials.json');
admin.initializeApp({
credential: admin.credential.cert(serviceAccount),
databaseURL: '<your-database-url>'
});
var db = admin.database();
var jobsRef = db.ref('jobs');
var specsRef = db.ref('specs');
var queue = new Queue({ tasksRef: jobsRef, specsRef: specsRef }, function(data, progress, resolve, reject) {
// process task
});
As you can see, Firebase Queue is a powerful tool that allows you to securely and robustly perform background work on your Firebase data, from sanitization to data fanout and more. We'd love to hear about how you're using Firebase-Queue in your project! Let us know on Twitter, Facebook, or G+. If you have any questions, please direct them to our Google Group or [email protected].