diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md new file mode 100644 index 0000000..b84b3d8 --- /dev/null +++ b/.github/CONTRIBUTING.md @@ -0,0 +1,103 @@ +# Contributing | Firebase Queue + +Thank you for contributing to the Firebase community! + + - [Have a usage question?](#question) + - [Think you found a bug?](#issue) + - [Have a feature request?](#feature) + - [Want to submit a pull request?](#submit) + - [Need to get set up locally?](#local-setup) + + +## Have a usage question? + +We get lots of those and we love helping you, but GitHub is not the best place for them. Issues +which just ask about usage will be closed. + +Start with the [guide](../docs/guide.md). If the official documentation doesn't help, try asking a +question through our [official support channels](https://firebase.google.com/support/). + +**Please avoid double posting across multiple channels!** + + +## Think you found a bug? + +Yeah, we're definitely not perfect! + +Search through [old issues](https://github.com/firebase/firebase-queue/issues) before submitting a new +issue as your question may have already been answered. + +If your issue appears to be a bug, and hasn't been reported, +[open a new issue](https://github.com/firebase/firebase-queue/issues/new). Please use the provided bug +report template and include a minimal repro. + +If you are up to the challenge, [submit a pull request](#submit) with a fix! + + +## Have a feature request? + +Great, we love hearing how we can improve our products! After making sure someone hasn't already +requested the feature in the [existing issues](https://github.com/firebase/firebase-queue/issues), go +ahead and [open a new issue](https://github.com/firebase/firebase-queue/issues/new). Feel free to remove +the bug report template and instead provide an explanation of your feature request. Provide code +samples if applicable. Try to think about what it will allow you to do that you can't do today? How +will it make current workarounds straightforward? What potential bugs and edge cases does it help to +avoid? + + +## Want to submit a pull request? + +Sweet, we'd love to accept your contribution! [Open a new pull request](https://github.com/firebase/firebase-queue/pull/new/master) +and fill out the provided form. + +**If you want to implement a new feature, please open an issue with a proposal first so that we can +figure out if the feature makes sense and how it will work.** + +Make sure your changes pass our linter and the tests all pass on your local machine. We've hooked +up this repo with continuous integration to double check those things for you. + +Most non-trivial changes should include some extra test coverage. If you aren't sure how to add +tests, feel free to submit regardless and ask us for some advice. + +Finally, you will need to sign our [Contributor License Agreement](https://cla.developers.google.com/about/google-individual) +before we can accept your pull request. + + +## Need to get set up locally? + +If you'd like to contribute to Firebase Queue, you'll need to do the following to get your +environment set up. + +### Install Dependencies + +```bash +$ git clone https://github.com/firebase/firebase-queue.git +$ cd firebase-queue # go to the firebase-queue directory +$ npm install -g gulp # globally install gulp task runner +$ npm install # install local npm build / test dependencies +``` + +### Create a Firebase Project + +1. Create a Firebase project [here](https://console.firebase.google.com). +2. Set the `FB_QUEUE_TEST_DB_URL` environment variable to your project's database URL: + +```bash +$ export FB_QUEUE_TEST_DB_URL="https://.firebaseio.com" +``` + +### Download a Service Account JSON File + +1. Follow the instructions [here](https://firebase.google.com/docs/server/setup#add_firebase_to_your_app) +on how to create a service account for your project and furnish a private key. +2. Copy the credentials JSON file to `test/key.json`. + +### Lint, Build, and Test + +```bash +$ gulp # lint, build, and test + +$ gulp lint # just lint +$ gulp build # just build +$ gulp test # just test +``` diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..1db058b --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,65 @@ + + + +### Version info + + + +**Firebase:** + +**Firebase Queue:** + +**Node.js:** + +**Other (e.g. operating system) (if applicable):** + +### Test case + + + + +### Steps to reproduce + + + + +### Expected behavior + + + + +### Actual behavior + + diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..80efa77 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,31 @@ + + + +### Description + + + +### Code sample + + diff --git a/.gitignore b/.gitignore index 193bcf0..c95f32b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ dist/ coverage/ node_modules/ -key.json +test/key.json diff --git a/.travis.yml b/.travis.yml index 6d7822e..65f010e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,12 +6,14 @@ node_js: - '5' - stable sudo: false +env: + FB_QUEUE_TEST_DB_URL=https://fir-queue-test.firebaseio.com install: - npm install script: -- '[ -e key.json ] && npm run travis || false' +- '[ -e test/key.json ] && npm run travis || false' after_script: - cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js before_install: - openssl aes-256-cbc -K $encrypted_a237b3635f54_key -iv $encrypted_a237b3635f54_iv - -in key.json.enc -out key.json -d + -in test/key.json.enc -out test/key.json -d diff --git a/README.md b/README.md index 128e53a..7a3b63a 100644 --- a/README.md +++ b/README.md @@ -1,538 +1,40 @@ -# Firebase Queue +# Firebase Queue [![Build Status](https://travis-ci.org/firebase/firebase-queue.svg?branch=master)](https://travis-ci.org/firebase/firebase-queue) [![Coverage Status](https://img.shields.io/coveralls/firebase/firebase-queue.svg?branch=master&style=flat)](https://coveralls.io/r/firebase/firebase-queue) [![GitHub version](https://badge.fury.io/gh/firebase%2Ffirebase-queue.svg)](http://badge.fury.io/gh/firebase%2Ffirebase-queue) -[![Build Status](https://travis-ci.org/firebase/firebase-queue.svg?branch=master)](https://travis-ci.org/firebase/firebase-queue) -[![Coverage Status](https://img.shields.io/coveralls/firebase/firebase-queue.svg?branch=master&style=flat)](https://coveralls.io/r/firebase/firebase-queue) -[![GitHub version](https://badge.fury.io/gh/firebase%2Ffirebase-queue.svg)](http://badge.fury.io/gh/firebase%2Ffirebase-queue) +A fault-tolerant, multi-worker, multi-stage job pipeline built on the [Firebase Realtime +Database](https://firebase.google.com/docs/database/). -A fault-tolerant, multi-worker, multi-stage job pipeline built on the Firebase realtime database. +## Table of Contents -## Purpose of a Queue + * [Getting Started With Firebase](#getting-started-with-firebase) + * [Downloading Firebase Queue](#downloading-firebase-queue) + * [Documentation](#documentation) + * [Contributing](#contributing) -Queues can be used in your Firebase app to organize workers or perform background work like generating thumbnails of images, filtering message contents and censoring data, or fanning data out to multiple locations in your Firebase database. First, let's define a few terms we'll use when talking about a queue: - - `task` - a unit of work that a queue worker can process - - `spec` - a definition of an operation that the queue will perform on matching tasks - - `job` - one of more `spec`'s that specify a series of ordered operations to be performed - - `worker` - an individual process that picks up tasks with a certain spec and processes them -Let's take a look at a simple example to see how this works. Imagine you wanted to build a chat application that does two things: - 1. Sanitize chat message input - 2. Fan data out to multiple rooms and users +## Getting Started With Firebase -Since chat message sanitization can't happen purely on the client side, as that would allow a malicious client to circumvent client side restrictions, you'll have to run this process on a trusted server process. +Firebase Queue requires [Firebase](https://firebase.google.com/) in order to sync and store data. +Firebase is a suite of integrated products designed to help you develop your app, grow your user +base, and earn money. You can [sign up here for a free account](https://console.firebase.google.com/). -Using Firebase Queue, you can create specs for each of these tasks, and then use workers to process the individual tasks to complete the job. We'll explore the queue, adding tasks, assigning workers, and creating custom specs to create full jobs, then [revisit the example](#message-sanitization-revisited) above. -## The Queue in Your Firebase Database +## Downloading Firebase Queue -The queue relies on having a Firebase database reference to coordinate workers e.g. `https://databaseName.firebaseio.com/queue`. This queue can be stored at any path in your Firebase database, and you can have multiple queues as well. The queue will respond to tasks pushed onto the `tasks` subtree and optionally read specifications from a `specs` subtree. +You can download Firebase Queue via npm. You will also have to install Firebase separately (that is, +they are `peerDependencies`): +```bash +$ npm install firebase firebase-queue --save ``` -queue - - specs - - tasks -``` - -See [Custom references to tasks and specs](#custom-references-to-tasks-and-specs) for defining the locations of these other than the default. - -## Queue Workers - -The basic unit of the queue is the queue worker: the process that claims a task, performs the appropriate processing on the data, and either returns the transformed data, or an appropriate error. - -You can start a worker process by passing in a Firebase database [`ref`](https://firebase.google.com/docs/server/setup#initialize_the_sdk) along with a processing function ([described below](#the-processing-function)), as follows: - -```js -// my_queue_worker.js - -var Queue = require('firebase-queue'); -var firebase = require('firebase'); - -firebase.initializeApp({ - serviceAccount: 'path/to/serviceAccountCredentials.json', - databaseURL: '' -}); - -var ref = firebase.database().ref('queue'); -var queue = new Queue(ref, function(data, progress, resolve, reject) { - // Read and process task data - console.log(data); - - // Do some work - progress(50); - - // Finish the task asynchronously - setTimeout(function() { - resolve(); - }, 1000); -}); -``` - -```shell -node my_queue_worker.js -``` - -Multiple queue workers can be initialized on multiple machines and Firebase-Queue will ensure that only one worker is processing a single queue task at a time. - - -#### Queue Worker Options (Optional) - -Queue workers can take an optional options object to specify: - - `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec. - - `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker. - - `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`. - - `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error. - -```js -... - -var options = { - 'specId': 'spec_1', - 'numWorkers': 5, - 'sanitize': false, - 'suppressStack': true -}; -var queue = new Queue(ref, options, function(data, progress, resolve, reject) { - ... -}); -``` - - -## Pushing Tasks Onto the Queue - -Using any Firebase client or the REST API, push an object with some data to the `tasks` subtree of your queue. Queue workers listening on that subtree will automatically pick up and process the new task. - -```shell -# Using curl in shell -curl -X POST -d '{"foo": "bar"}' https://databaseName.firebaseio.com/queue/tasks.json -``` -or -```js -// Using the web JavaScript client -var ref = firebase.database().ref('queue/tasks'); -ref.push({'foo': 'bar'}); -``` - -### Starting Tasks in Specific States (Optional) - -When using a custom spec, you can pass a `_state` key in with your object, which will allow a custom spec's worker(s) to pick up your task at a specific spec, rather than starting with the starting spec. - -```js -{ - "foo": "bar", - "boo": "baz", - "_state": "spec_n_start" -} -``` - - -## The Processing Function - -The processing function provides the body of the data transformation, and allows for completing tasks successfully or with error conditions, as well as reporting the progress of a task. As this function defines the work that the worker must do, this callback function is required. It should take the following four parameters: - -#### `data` - -A JavaScript object containing the claimed task's data, and can contain any keys and values with the exception of several reserved keys, which are used for tracking worker progress. - -The reserved keys are: - - `_state` - The current state of the task. Will always be the task's `in_progress_state` when passed to the processing function. - - `_state_changed` - The timestamp that the task changed into its current state. This will always be the server time when the processing function was called. - - `_owner` - A unique ID for the worker and task number combination to ensure only one worker is responsible for the task at any time. - - `_progress` - A number between 0 and 100, reset at the start of each task to 0. - - `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task. If the `suppressStack` queue option is not set to `true`, there may also be a `error_stack` field containg a stack dump of any error passed into the `reject()` function. - - `_id` - The Firebase key of the task. - - By default the data is sanitized of these keys, but you can disable this behavior by setting `'sanitize': false` in the [queue options](#queue-worker-options-optional). - -#### `progress()` - -A callback function for reporting the progress of the task. `progress()` takes a single parameter that must be a number between 0 and 100, and returns a [RSVP.Promise](https://github.com/tildeio/rsvp.js) that's fulfilled when successfully updated. If this promise is rejected, it's likely that the task is no longer owned by this process (perhaps it has timed out or the task specification has changed) or the worker has lost its connection to Firebase. - -By catching when this call fails and cancelling the current task early, the worker can minimize the extra work it does and return to processing new queue tasks sooner: - -```js -... -var queue = new Queue(ref, options, function(data, progress, resolve, reject) { - ... - function stopProcessing() { - ... - } - ... - // report current progress - progress(currentProgress).catch(function(errorMessage) { - // we've lost the current task, so stop processing - stopProcessing(); - - // and reject the task so that we can pick up new tasks - reject(errorMessage); - }); - ... -}); -``` - -#### `resolve()` - -A callback function for reporting that the current task has been completed and the worker is ready to process another task. Any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100 and the `_state` is replaced with either the `_new_state` key of the object passed in, or the `finished_state` of the task spec. If the task does not have a `finished_state` or the `_new_state` key is set to `false`, the task will be removed from the queue. - -#### `reject()` - -A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. If an Error is passed into the `reject()` function, the `error` key will contain the `error.message`, and if `suppressStack` option has not been specified the `error_stack` key will contain the `error.stack`. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`. - -## Queue Security - -Securing your queue is an important step in securely processing events that come in. Below is a sample set of security rules that can be tailored to your particular use case. - -In this example, there are three categories of users, represented using fields of a [Database Auth Variable Override](https://firebase.google.com/docs/database/server/start#authenticate-with-limited-privileges): -- `auth.canAddTasks`: Users who can add tasks to the queue (could be an authenticated client or a secure server) -- `auth.canProcessTasks`: Users who can process tasks (usually on a secure server) -- `auth.canAddSpecs`: Users who can create and view task specifications (usually on a secure server) - -These don't have to use a custom token, for instance you could use `auth != null` in place of `auth.canAddTasks` if application's users can write directly to the queue. Similarly, `auth.canProcessTasks` and `auth.canAddSpecs` could be `auth.admin === true` if a single trusted server process was used to perform all queue functions. - -```json -{ - "rules": { - "queue": { - "tasks": { - ".read": "auth.canProcessTasks", - ".write": "auth.canAddTasks || auth.canProcessTasks", - ".indexOn": "_state", - "$taskId": { - ".validate": "newData.hasChildren(['property_1', ..., 'property_n']) - || (auth.canProcessTasks - && newData.hasChildren(['_state', '_state_changed', '_progress']))", - "_state": { - ".validate": "newData.isString()" - }, - "_state_changed": { - ".validate": "newData.isNumber() && (newData.val() === now - || data.val() === newData.val())" - }, - "_owner": { - ".validate": "newData.isString()" - }, - "_progress": { - ".validate": "newData.isNumber() - && newData.val() >= 0 - && newData.val() <= 100" - }, - "_error_details": { - "error": { - ".validate": "newData.isString()" - }, - "error_stack": { - ".validate": "newData.isString()" - }, - "previous_state": { - ".validate": "newData.isString()" - }, - "original_task": { - /* This space intentionally left blank, for malformed tasks */ - }, - "attempts": { - ".validate": "newData.isNumber() && newData.val() > 0" - }, - "$other": { - ".validate": false - } - }, - "_id": { - ".validate": "newData.isString()" - }, - "property_1": { - ".validate": "/* Insert custom data validation code here */" - }, - ... - "property_n": { - ".validate": "/* Insert custom data validation code here */" - } - } - }, - "specs" : { - ".read": "auth.canAddSpecs || auth.canProcessTasks", - ".write": "auth.canAddSpecs", - "$specId": { - ".validate": "newData.hasChild('in_progress_state')", - "start_state": { - ".validate": "newData.isString()" - }, - "in_progress_state": { - ".validate": "newData.isString()" - }, - "finished_state": { - ".validate": "newData.isString()" - }, - "error_state": { - ".validate": "newData.isString()" - }, - "timeout": { - ".validate": "newData.isNumber() && newData.val() > 0" - }, - "$other": { - ".validate": false - } - } - } - } - } -} -``` - -## Defining Specs (Optional) - -#### Default Spec - -A default spec configuration is assumed if no specs are specified in the `specs` subtree of the queue. The default spec has the following characteristics: - -```js -{ - "default_spec": { - "start_state": null, - "in_progress_state": "in_progress", - "finished_state": null, - "error_state": "error", - "timeout": 300000, // 5 minutes - "retries": 0 // don't retry - } -} -``` - -- `start_state` - The default spec has no `start_state`, which means any task pushed into the `tasks` subtree without a `_state` key will be picked up by default spec workers. If `start_state` is specified, only tasks with that `_state` may be claimed by the worker. -- `in_progress_state` - When a worker picks up a task and begins processing it, it will change the tasks's `_state` to the value of `in_progress_state`. This is the only required spec property, and it cannot equal the `start_state`, `finished_state`, or `error_state`. -- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job. It's possible to override the `finished_state` on a per-task basis by setting the `_new_state` key of the object passed into `resolve()` to a string to set the `_state` explicitly, `null` to remove the `_state` so it gets picked up by any spec without a `start_state`, or `false` to remove the task from the queue. -- `error_state` - If the task gets rejected the `_state` will be updated to this value and an additional key `_error_details` will be populated with the `previous_state` and an optional error message from the `reject()` callback. If this isn't specified, it defaults to "error". This can be useful for specifying different error states for different tasks, or chaining errors so that they can be logged. -- `timeout` - The default timeout is 5 minutes. When a task has been claimed by a worker but has not completed within `timeout` milliseconds, the queue will report that task as timed out, and reset that task to be claimable once again. If this is not specified, a task claimed by a worker could be orphaned and left in an unclaimable state if the worker processing it dies before the task is resolved or rejected. -- `retries` - The default spec doesn't retry failed tasks. When a task fails, if there are any remaining attempts, the queue will restart the task by setting the task's `_state` to its spec's `start_state`. -#### Creating Jobs using Custom Specs and Task Chaining -In order to use a job specification other than the default, the specification must be defined in the Firebase under the `specs` subtree. This allows us to coordinate job specification changes between workers and enforce expected behavior with Firebase security rules. +## Documentation -In this example, we're chaining three specs to make a job. New tasks pushed onto the queue without a `_state` key will be picked up by "spec_1" and go into the `spec_1_in_progress` state. Once "spec_1" completes and the task goes into the `spec_1_finished` state, "spec_2" takes over and puts it into the `spec_2_in_progress` state. Again, once "spec_2" completes and the task goes into the `spec_2_finished` state, "spec_3" takes over and puts it into the `spec_3_in_progress` state. Finally, "spec_3" removes it once complete. If, during any stage in the process there's an error, the task will end up in an "error" state. +* [Guide](docs/guide.md) -``` -queue - - specs -``` -```json -{ - "spec_1": { - "in_progress_state": "spec_1_in_progress", - "finished_state": "spec_1_finished", - "timeout": 5000 - }, - "spec_2": { - "start_state": "spec_1_finished", - "in_progress_state": "spec_2_in_progress", - "finished_state": "spec_2_finished", - "timeout" : 20000 - }, - "spec_3": { - "start_state": "spec_2_finished", - "in_progress_state": "spec_3_in_progress", - "timeout": 3000 - } -} -``` - -## Graceful Shutdown - -Once initialized, a queue can be gracefully shutdown by calling its `shutdown()` function. This prevents workers from claiming new tasks, removes all Firebase listeners, and waits until all the current tasks have been completed before resolving the RSVP.Promise returned by the function. - -By intercepting for the `SIGINT` termination signal like this, you can ensure the queue shuts down gracefully so you don't have to rely on the jobs timing out and being picked up by another worker: - -```js -... -var queue = new Queue(ref, function(data, progress, resolve, reject) { - ... -}); - -process.on('SIGINT', function() { - console.log('Starting queue shutdown'); - queue.shutdown().then(function() { - console.log('Finished queue shutdown'); - process.exit(0); - }); -}); -``` - -## Message Sanitization, Revisited - -In our example at the beginning, you wanted to perform several actions on your chat system: - 1. Sanitize chat message input - 2. Fan data out to multiple rooms and users - -Together, these two actions form a job, and you can use custom specs, as shown above, to define the flow of tasks in this job. When you start, your Firebase should look like this: - -``` -root - - queue - - specs - - sanitize_message - - in_progress_state: "sanitize_message_in_progress" - - finished_state: "sanitize_message_finished" - - fanout_message - - start_state: "sanitize_message_finished" - - in_progress_state: "fanout_message_in_progress" - - error_state: "fanout_message_failed" - - retries: 3 - - tasks - /* null, no data */ -``` - -Let's imagine that you have some front end that allows your users to write their name and a message, and send that to your queue as it's `data`. Let's assume your user writes something like the following: - -```js -// Using the web JavaScript client -var tasksRef = firebase.database().ref('queue/tasks'); -tasksRef.push({ - 'message': 'Hello Firebase Queue Users!', - 'name': 'Chris' -}); -``` - -Your Firebase database should now look like this: - -``` -root - - queue - - specs - /* same as above */ - - tasks - - $taskId - - message: "Hello Firebase Queue Users!" - - name: "Chris" -``` - -When your users push `data` like the above into the `tasks` subtree, tasks will initially start in the `sanitize_message` spec because the task has no `start_state`. The associated queue can be specified using the following processing function: - -```js -// chat_message_sanitization.js - -var Queue = require('firebase-queue'); -var firebase = require('firebase'); - -firebase.initializeApp({ - serviceAccount: 'path/to/serviceAccountCredentials.json', - databaseURL: '' -}); - -var db = firebase.database(); -var queueRef = db.ref('queue'); -var messagesRef = db.ref('messages'); - -var options = { - 'specId': 'sanitize_message' -}; -var sanitizeQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) { - // sanitize input message - data.message = sanitize(data.message); - - // pass sanitized message and username along to be fanned out - resolve(data); -}); - -... -``` - -The queue worker will take this task, begin to process it, and update the reserved keys of the task: - -``` -root - - queue - - specs - /* same as above */ - - tasks - - $taskId - - _owner: $workerUid - - _progress: 0 - - _state: "sanitize_message_in_progress" - - _state_changed: 1431475215737 - - _id: $taskId - - message: "Hello Firebase Queue Users!" - - name: "Chris" -``` - -Once the message is sanitized, it will be resolved and both the reserved keys and the data will be updated in the task (imagine for a minute that queue is a blacklisted word): - -``` -root - - queue - - specs - /* same as above */ - - tasks - - $taskId - - _owner: null - - _progress: 100 - - _state: "sanitize_message_finished" - - _state_changed: 1431475215918 - - _id: $taskId - - message: "Hello Firebase ***** Users!" - - name: "Chris" -``` - -Now, you want to fan the data out to the `messages` subtree of your Firebase database, using the spec, `fanout_message`, so you can set up a second processing function to find tasks whose `_state` is `sanitize_message_finished`: - -```js -... - -var options = { - 'specId': 'fanout_message', - 'numWorkers': 5 -}; -var fanoutQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) { - // fan data out to /messages; ensure that errors are caught and cause the task to fail - messagesRef.push(data, function(error){ - if (error) { - reject(error.message); - } else { - resolve(data); - } - }); -}); -``` - -Since there is no `finished_state` in the `fanout_message` spec, the task will be purged from the queue after the data is fanned out to the messages node. If the `push` fails for some reason, the task will fail and retry, a maximum of three times (as specified in our spec). - -While this example is a little contrived since you could perform the sanitization and fanout in a single task, creating multiple specs for our tasks allows us to do things like add selective retries to certain tasks more likely to fail, put additional workers on more expensive tasks, or add expressive error states. -## Custom references to tasks and specs - -It is possible to specify the locations the queue uses for tasks and the specs explicitly instead of using the defaults. To do this, simply pass an object to the Queue constructor in place of the Firebase reference; this object must contain the keys `tasksRef` and `specsRef`, and each value must be a Firebase reference. - -```js -var Queue = require('firebase-queue'); -var firebase = require('firebase'); - -firebase.initializeApp({ - serviceAccount: 'path/to/serviceAccountCredentials.json', - databaseURL: '' -}); - -var db = firebase.database(); - -var jobsRef = db.ref('jobs'); -var specsRef = db.ref('specs'); - -var queue = new Queue({ tasksRef: jobsRef, specsRef: specsRef }, function(data, progress, resolve, reject) { - // process task -}); -``` - -## Wrap Up - -As you can see, Firebase Queue is a powerful tool that allows you to securely and robustly perform background work on your Firebase data, from sanitization to data fanout and more. We'd love to hear about how you're using Firebase-Queue in your project! Let us know on [Twitter](https://twitter.com/firebase), [Facebook](https://www.facebook.com/Firebase), or [G+](https://plus.google.com/115330003035930967645). If you have any questions, please direct them to our [Google Group](https://groups.google.com/forum/#!forum/firebase-talk) or [firebase-support@google.com](mailto:firebase-support@google.com). - -## Running the Tests - -To run the tests you first need to create a Firebase Realtime Database to test against in the [Firebase console](https://console.firebase.google.com), and create a [service account](https://console.firebase.google.com/iam-admin/serviceaccounts/project) for that project. The service account should have Editor permission on the project, and you'll need to furnish the account with JSON credentials. See [the documentation](https://firebase.google.com/docs/server/setup#add_firebase_to_your_app) for detailed instructions on creating service accounts. - -Once you have created and downloaded the service account credentials, place them in a `key.json` file at the root of this repository (you should **never** check this file in, it should be ignored by a rule in the `.gitignore` file). - -Then, to run the tests, simply run these commands: - -```sh -export FB_QUEUE_TEST_DB_URL=https://databaseName.firebaseio.com -npm test -``` +## Contributing -Where `databaseName` is the name of the Firebase Realtime Database you created for testing. +If you'd like to contribute to Firebase Queue, please first read through our [contribution +guidelines](.github/CONTRIBUTING.md). Local setup instructions are available [here](.github/CONTRIBUTING.md#local-setup). diff --git a/docs/guide.md b/docs/guide.md new file mode 100644 index 0000000..5bc94ea --- /dev/null +++ b/docs/guide.md @@ -0,0 +1,538 @@ +# Guide | Firebase Queue + + +## Table of Contents + + * [Purpose of a Queue](#purpose-of-a-queue) + * [The Queue in Your Firebase Database](#the-queue-in-your-firebase-database) + * [Queue Workers](#queue-workers) + * [Pushing Tasks Onto the Queue](#pushing-tasks-onto-the-queue) + * [Queue Security](#queue-security) + * [Defining Specs (Optional)](#defining-specs-optional) + * [Graceful Shutdown](#graceful-shutdown) + * [Message Sanitization, Revisited](#message-sanitization-revisited) + * [Custom references to tasks and specs](#custom-references-to-tasks-and-specs) + * [Wrap Up](#wrap-up) + * [Contributing](#contributing) + + +## Purpose of a Queue + +Queues can be used in your Firebase app to organize workers or perform background work like generating thumbnails of images, filtering message contents and censoring data, or fanning data out to multiple locations in your Firebase database. First, let's define a few terms we'll use when talking about a queue: + - `task` - a unit of work that a queue worker can process + - `spec` - a definition of an operation that the queue will perform on matching tasks + - `job` - one of more `spec`'s that specify a series of ordered operations to be performed + - `worker` - an individual process that picks up tasks with a certain spec and processes them + +Let's take a look at a simple example to see how this works. Imagine you wanted to build a chat application that does two things: + 1. Sanitize chat message input + 2. Fan data out to multiple rooms and users + +Since chat message sanitization can't happen purely on the client side, as that would allow a malicious client to circumvent client side restrictions, you'll have to run this process on a trusted server process. + +Using Firebase Queue, you can create specs for each of these tasks, and then use workers to process the individual tasks to complete the job. We'll explore the queue, adding tasks, assigning workers, and creating custom specs to create full jobs, then [revisit the example](#message-sanitization-revisited) above. + +## The Queue in Your Firebase Database + +The queue relies on having a Firebase database reference to coordinate workers e.g. `https://databaseName.firebaseio.com/queue`. This queue can be stored at any path in your Firebase database, and you can have multiple queues as well. The queue will respond to tasks pushed onto the `tasks` subtree and optionally read specifications from a `specs` subtree. + +``` +queue + - specs + - tasks +``` + +See [Custom references to tasks and specs](#custom-references-to-tasks-and-specs) for defining the locations of these other than the default. + + +## Queue Workers + +The basic unit of the queue is the queue worker: the process that claims a task, performs the appropriate processing on the data, and either returns the transformed data, or an appropriate error. + +You can start a worker process by passing in a Firebase database [`ref`](https://firebase.google.com/docs/server/setup#initialize_the_sdk) along with a processing function ([described below](#the-processing-function)), as follows: + +```js +// my_queue_worker.js + +var Queue = require('firebase-queue'); +var firebase = require('firebase'); + +firebase.initializeApp({ + serviceAccount: 'path/to/serviceAccountCredentials.json', + databaseURL: '' +}); + +var ref = firebase.database().ref('queue'); +var queue = new Queue(ref, function(data, progress, resolve, reject) { + // Read and process task data + console.log(data); + + // Do some work + progress(50); + + // Finish the task asynchronously + setTimeout(function() { + resolve(); + }, 1000); +}); +``` + +```shell +node my_queue_worker.js +``` + +Multiple queue workers can be initialized on multiple machines and Firebase-Queue will ensure that only one worker is processing a single queue task at a time. + + +#### Queue Worker Options (Optional) + +Queue workers can take an optional options object to specify: + - `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec. + - `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker. + - `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`. + - `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error. + +```js +... + +var options = { + 'specId': 'spec_1', + 'numWorkers': 5, + 'sanitize': false, + 'suppressStack': true +}; +var queue = new Queue(ref, options, function(data, progress, resolve, reject) { + ... +}); +``` + + +## Pushing Tasks Onto the Queue + +Using any Firebase client or the REST API, push an object with some data to the `tasks` subtree of your queue. Queue workers listening on that subtree will automatically pick up and process the new task. + +```shell +# Using curl in shell +curl -X POST -d '{"foo": "bar"}' https://databaseName.firebaseio.com/queue/tasks.json +``` +or +```js +// Using the web JavaScript client +var ref = firebase.database().ref('queue/tasks'); +ref.push({'foo': 'bar'}); +``` + +### Starting Tasks in Specific States (Optional) + +When using a custom spec, you can pass a `_state` key in with your object, which will allow a custom spec's worker(s) to pick up your task at a specific spec, rather than starting with the starting spec. + +```js +{ + "foo": "bar", + "boo": "baz", + "_state": "spec_n_start" +} +``` + + +## The Processing Function + +The processing function provides the body of the data transformation, and allows for completing tasks successfully or with error conditions, as well as reporting the progress of a task. As this function defines the work that the worker must do, this callback function is required. It should take the following four parameters: + +#### `data` + +A JavaScript object containing the claimed task's data, and can contain any keys and values with the exception of several reserved keys, which are used for tracking worker progress. + +The reserved keys are: + - `_state` - The current state of the task. Will always be the task's `in_progress_state` when passed to the processing function. + - `_state_changed` - The timestamp that the task changed into its current state. This will always be the server time when the processing function was called. + - `_owner` - A unique ID for the worker and task number combination to ensure only one worker is responsible for the task at any time. + - `_progress` - A number between 0 and 100, reset at the start of each task to 0. + - `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task. If the `suppressStack` queue option is not set to `true`, there may also be a `error_stack` field containg a stack dump of any error passed into the `reject()` function. + - `_id` - The Firebase key of the task. + + By default the data is sanitized of these keys, but you can disable this behavior by setting `'sanitize': false` in the [queue options](#queue-worker-options-optional). + +#### `progress()` + +A callback function for reporting the progress of the task. `progress()` takes a single parameter that must be a number between 0 and 100, and returns a [RSVP.Promise](https://github.com/tildeio/rsvp.js) that's fulfilled when successfully updated. If this promise is rejected, it's likely that the task is no longer owned by this process (perhaps it has timed out or the task specification has changed) or the worker has lost its connection to Firebase. + +By catching when this call fails and cancelling the current task early, the worker can minimize the extra work it does and return to processing new queue tasks sooner: + +```js +... +var queue = new Queue(ref, options, function(data, progress, resolve, reject) { + ... + function stopProcessing() { + ... + } + ... + // report current progress + progress(currentProgress).catch(function(errorMessage) { + // we've lost the current task, so stop processing + stopProcessing(); + + // and reject the task so that we can pick up new tasks + reject(errorMessage); + }); + ... +}); +``` + +#### `resolve()` + +A callback function for reporting that the current task has been completed and the worker is ready to process another task. Any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100 and the `_state` is replaced with either the `_new_state` key of the object passed in, or the `finished_state` of the task spec. If the task does not have a `finished_state` or the `_new_state` key is set to `false`, the task will be removed from the queue. + +#### `reject()` + +A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. If an Error is passed into the `reject()` function, the `error` key will contain the `error.message`, and if `suppressStack` option has not been specified the `error_stack` key will contain the `error.stack`. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`. + + +## Queue Security + +Securing your queue is an important step in securely processing events that come in. Below is a sample set of security rules that can be tailored to your particular use case. + +In this example, there are three categories of users, represented using fields of a [Database Auth Variable Override](https://firebase.google.com/docs/database/server/start#authenticate-with-limited-privileges): +- `auth.canAddTasks`: Users who can add tasks to the queue (could be an authenticated client or a secure server) +- `auth.canProcessTasks`: Users who can process tasks (usually on a secure server) +- `auth.canAddSpecs`: Users who can create and view task specifications (usually on a secure server) + +These don't have to use a custom token, for instance you could use `auth != null` in place of `auth.canAddTasks` if application's users can write directly to the queue. Similarly, `auth.canProcessTasks` and `auth.canAddSpecs` could be `auth.admin === true` if a single trusted server process was used to perform all queue functions. + +```json +{ + "rules": { + "queue": { + "tasks": { + ".read": "auth.canProcessTasks", + ".write": "auth.canAddTasks || auth.canProcessTasks", + ".indexOn": "_state", + "$taskId": { + ".validate": "newData.hasChildren(['property_1', ..., 'property_n']) + || (auth.canProcessTasks + && newData.hasChildren(['_state', '_state_changed', '_progress']))", + "_state": { + ".validate": "newData.isString()" + }, + "_state_changed": { + ".validate": "newData.isNumber() && (newData.val() === now + || data.val() === newData.val())" + }, + "_owner": { + ".validate": "newData.isString()" + }, + "_progress": { + ".validate": "newData.isNumber() + && newData.val() >= 0 + && newData.val() <= 100" + }, + "_error_details": { + "error": { + ".validate": "newData.isString()" + }, + "error_stack": { + ".validate": "newData.isString()" + }, + "previous_state": { + ".validate": "newData.isString()" + }, + "original_task": { + /* This space intentionally left blank, for malformed tasks */ + }, + "attempts": { + ".validate": "newData.isNumber() && newData.val() > 0" + }, + "$other": { + ".validate": false + } + }, + "_id": { + ".validate": "newData.isString()" + }, + "property_1": { + ".validate": "/* Insert custom data validation code here */" + }, + ... + "property_n": { + ".validate": "/* Insert custom data validation code here */" + } + } + }, + "specs" : { + ".read": "auth.canAddSpecs || auth.canProcessTasks", + ".write": "auth.canAddSpecs", + "$specId": { + ".validate": "newData.hasChild('in_progress_state')", + "start_state": { + ".validate": "newData.isString()" + }, + "in_progress_state": { + ".validate": "newData.isString()" + }, + "finished_state": { + ".validate": "newData.isString()" + }, + "error_state": { + ".validate": "newData.isString()" + }, + "timeout": { + ".validate": "newData.isNumber() && newData.val() > 0" + }, + "$other": { + ".validate": false + } + } + } + } + } +} +``` + + +## Defining Specs (Optional) + +#### Default Spec + +A default spec configuration is assumed if no specs are specified in the `specs` subtree of the queue. The default spec has the following characteristics: + +```js +{ + "default_spec": { + "start_state": null, + "in_progress_state": "in_progress", + "finished_state": null, + "error_state": "error", + "timeout": 300000, // 5 minutes + "retries": 0 // don't retry + } +} +``` + +- `start_state` - The default spec has no `start_state`, which means any task pushed into the `tasks` subtree without a `_state` key will be picked up by default spec workers. If `start_state` is specified, only tasks with that `_state` may be claimed by the worker. +- `in_progress_state` - When a worker picks up a task and begins processing it, it will change the tasks's `_state` to the value of `in_progress_state`. This is the only required spec property, and it cannot equal the `start_state`, `finished_state`, or `error_state`. +- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job. It's possible to override the `finished_state` on a per-task basis by setting the `_new_state` key of the object passed into `resolve()` to a string to set the `_state` explicitly, `null` to remove the `_state` so it gets picked up by any spec without a `start_state`, or `false` to remove the task from the queue. +- `error_state` - If the task gets rejected the `_state` will be updated to this value and an additional key `_error_details` will be populated with the `previous_state` and an optional error message from the `reject()` callback. If this isn't specified, it defaults to "error". This can be useful for specifying different error states for different tasks, or chaining errors so that they can be logged. +- `timeout` - The default timeout is 5 minutes. When a task has been claimed by a worker but has not completed within `timeout` milliseconds, the queue will report that task as timed out, and reset that task to be claimable once again. If this is not specified, a task claimed by a worker could be orphaned and left in an unclaimable state if the worker processing it dies before the task is resolved or rejected. +- `retries` - The default spec doesn't retry failed tasks. When a task fails, if there are any remaining attempts, the queue will restart the task by setting the task's `_state` to its spec's `start_state`. + +#### Creating Jobs using Custom Specs and Task Chaining + +In order to use a job specification other than the default, the specification must be defined in the Firebase under the `specs` subtree. This allows us to coordinate job specification changes between workers and enforce expected behavior with Firebase security rules. + +In this example, we're chaining three specs to make a job. New tasks pushed onto the queue without a `_state` key will be picked up by "spec_1" and go into the `spec_1_in_progress` state. Once "spec_1" completes and the task goes into the `spec_1_finished` state, "spec_2" takes over and puts it into the `spec_2_in_progress` state. Again, once "spec_2" completes and the task goes into the `spec_2_finished` state, "spec_3" takes over and puts it into the `spec_3_in_progress` state. Finally, "spec_3" removes it once complete. If, during any stage in the process there's an error, the task will end up in an "error" state. + +``` +queue + - specs +``` +```json +{ + "spec_1": { + "in_progress_state": "spec_1_in_progress", + "finished_state": "spec_1_finished", + "timeout": 5000 + }, + "spec_2": { + "start_state": "spec_1_finished", + "in_progress_state": "spec_2_in_progress", + "finished_state": "spec_2_finished", + "timeout" : 20000 + }, + "spec_3": { + "start_state": "spec_2_finished", + "in_progress_state": "spec_3_in_progress", + "timeout": 3000 + } +} +``` + + +## Graceful Shutdown + +Once initialized, a queue can be gracefully shutdown by calling its `shutdown()` function. This prevents workers from claiming new tasks, removes all Firebase listeners, and waits until all the current tasks have been completed before resolving the RSVP.Promise returned by the function. + +By intercepting for the `SIGINT` termination signal like this, you can ensure the queue shuts down gracefully so you don't have to rely on the jobs timing out and being picked up by another worker: + +```js +... +var queue = new Queue(ref, function(data, progress, resolve, reject) { + ... +}); + +process.on('SIGINT', function() { + console.log('Starting queue shutdown'); + queue.shutdown().then(function() { + console.log('Finished queue shutdown'); + process.exit(0); + }); +}); +``` + + +## Message Sanitization, Revisited + +In our example at the beginning, you wanted to perform several actions on your chat system: + 1. Sanitize chat message input + 2. Fan data out to multiple rooms and users + +Together, these two actions form a job, and you can use custom specs, as shown above, to define the flow of tasks in this job. When you start, your Firebase should look like this: + +``` +root + - queue + - specs + - sanitize_message + - in_progress_state: "sanitize_message_in_progress" + - finished_state: "sanitize_message_finished" + - fanout_message + - start_state: "sanitize_message_finished" + - in_progress_state: "fanout_message_in_progress" + - error_state: "fanout_message_failed" + - retries: 3 + - tasks + /* null, no data */ +``` + +Let's imagine that you have some front end that allows your users to write their name and a message, and send that to your queue as it's `data`. Let's assume your user writes something like the following: + +```js +// Using the web JavaScript client +var tasksRef = firebase.database().ref('queue/tasks'); +tasksRef.push({ + 'message': 'Hello Firebase Queue Users!', + 'name': 'Chris' +}); +``` + +Your Firebase database should now look like this: + +``` +root + - queue + - specs + /* same as above */ + - tasks + - $taskId + - message: "Hello Firebase Queue Users!" + - name: "Chris" +``` + +When your users push `data` like the above into the `tasks` subtree, tasks will initially start in the `sanitize_message` spec because the task has no `start_state`. The associated queue can be specified using the following processing function: + +```js +// chat_message_sanitization.js + +var Queue = require('firebase-queue'); +var firebase = require('firebase'); + +firebase.initializeApp({ + serviceAccount: 'path/to/serviceAccountCredentials.json', + databaseURL: '' +}); + +var db = firebase.database(); +var queueRef = db.ref('queue'); +var messagesRef = db.ref('messages'); + +var options = { + 'specId': 'sanitize_message' +}; +var sanitizeQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) { + // sanitize input message + data.message = sanitize(data.message); + + // pass sanitized message and username along to be fanned out + resolve(data); +}); + +... +``` + +The queue worker will take this task, begin to process it, and update the reserved keys of the task: + +``` +root + - queue + - specs + /* same as above */ + - tasks + - $taskId + - _owner: $workerUid + - _progress: 0 + - _state: "sanitize_message_in_progress" + - _state_changed: 1431475215737 + - _id: $taskId + - message: "Hello Firebase Queue Users!" + - name: "Chris" +``` + +Once the message is sanitized, it will be resolved and both the reserved keys and the data will be updated in the task (imagine for a minute that queue is a blacklisted word): + +``` +root + - queue + - specs + /* same as above */ + - tasks + - $taskId + - _owner: null + - _progress: 100 + - _state: "sanitize_message_finished" + - _state_changed: 1431475215918 + - _id: $taskId + - message: "Hello Firebase ***** Users!" + - name: "Chris" +``` + +Now, you want to fan the data out to the `messages` subtree of your Firebase database, using the spec, `fanout_message`, so you can set up a second processing function to find tasks whose `_state` is `sanitize_message_finished`: + +```js +... + +var options = { + 'specId': 'fanout_message', + 'numWorkers': 5 +}; +var fanoutQueue = new Queue(queueRef, options, function(data, progress, resolve, reject) { + // fan data out to /messages; ensure that errors are caught and cause the task to fail + messagesRef.push(data, function(error){ + if (error) { + reject(error.message); + } else { + resolve(data); + } + }); +}); +``` + +Since there is no `finished_state` in the `fanout_message` spec, the task will be purged from the queue after the data is fanned out to the messages node. If the `push` fails for some reason, the task will fail and retry, a maximum of three times (as specified in our spec). + +While this example is a little contrived since you could perform the sanitization and fanout in a single task, creating multiple specs for our tasks allows us to do things like add selective retries to certain tasks more likely to fail, put additional workers on more expensive tasks, or add expressive error states. + + +## Custom references to tasks and specs + +It is possible to specify the locations the queue uses for tasks and the specs explicitly instead of using the defaults. To do this, simply pass an object to the Queue constructor in place of the Firebase reference; this object must contain the keys `tasksRef` and `specsRef`, and each value must be a Firebase reference. + +```js +var Queue = require('firebase-queue'); +var firebase = require('firebase'); + +firebase.initializeApp({ + serviceAccount: 'path/to/serviceAccountCredentials.json', + databaseURL: '' +}); + +var db = firebase.database(); + +var jobsRef = db.ref('jobs'); +var specsRef = db.ref('specs'); + +var queue = new Queue({ tasksRef: jobsRef, specsRef: specsRef }, function(data, progress, resolve, reject) { + // process task +}); +``` + +## Wrap Up + +As you can see, Firebase Queue is a powerful tool that allows you to securely and robustly perform background work on your Firebase data, from sanitization to data fanout and more. We'd love to hear about how you're using Firebase-Queue in your project! Let us know on [Twitter](https://twitter.com/firebase), [Facebook](https://www.facebook.com/Firebase), or [G+](https://plus.google.com/115330003035930967645). If you have any questions, please direct them to our [Google Group](https://groups.google.com/forum/#!forum/firebase-talk) or [firebase-support@google.com](mailto:firebase-support@google.com). diff --git a/test/helpers.js b/test/helpers.js index 196ae24..05f95d5 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -1,12 +1,13 @@ 'use strict'; var _ = require('lodash'); +var path = require('path'); var util = require('util'); var firebase = require('firebase'); firebase.initializeApp({ - serviceAccount: 'key.json', - databaseURL: process.env.FB_QUEUE_TEST_DB_URL || 'https://fir-queue-test.firebaseio.com' + serviceAccount: path.resolve(__dirname, './key.json'), + databaseURL: process.env.FB_QUEUE_TEST_DB_URL }); module.exports = function() { diff --git a/key.json.enc b/test/key.json.enc similarity index 100% rename from key.json.enc rename to test/key.json.enc