From 09ba958ce74b040d78015ba783eabe21834a6691 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Tue, 14 Jul 2020 09:21:15 -0400 Subject: [PATCH] Initial commit --- .eslintrc | 158 ++++++++++++++++++++++++ .github/workflows/lint.yml | 23 ++++ .github/workflows/unit.yml | 40 ++++++ .gitignore | 80 ++++++++++++ .npmignore | 21 ++++ .npmrc | 3 + .prettierignore | 2 + .prettierrc | 6 + .taprc | 9 ++ LICENSE | 1 + Readme.md | 111 +++++++++++++++++ index.js | 185 ++++++++++++++++++++++++++++ lib/get-messages.js | 29 +++++ lib/group-handler.js | 24 ++++ lib/handle-message.js | 35 ++++++ lib/keep-alive.js | 8 ++ lib/partition-messages.js | 36 ++++++ lib/poller.js | 62 ++++++++++ lib/process-messages.js | 41 ++++++ lib/symbols.js | 21 ++++ package.json | 36 ++++++ test/lib/get-messages.test.js | 52 ++++++++ test/lib/group-handler.test.js | 33 +++++ test/lib/handle-message.test.js | 140 +++++++++++++++++++++ test/lib/partition-messages.test.js | 23 ++++ test/lib/poller.test.js | 99 +++++++++++++++ test/lib/process-messages.test.js | 101 +++++++++++++++ 27 files changed, 1379 insertions(+) create mode 100644 .eslintrc create mode 100644 .github/workflows/lint.yml create mode 100644 .github/workflows/unit.yml create mode 100644 .gitignore create mode 100644 .npmignore create mode 100644 .npmrc create mode 100644 .prettierignore create mode 100644 .prettierrc create mode 100644 .taprc create mode 100644 LICENSE create mode 100644 Readme.md create mode 100644 index.js create mode 100644 lib/get-messages.js create mode 100644 lib/group-handler.js create mode 100644 lib/handle-message.js create mode 100644 lib/keep-alive.js create mode 100644 lib/partition-messages.js create mode 100644 lib/poller.js create mode 100644 lib/process-messages.js create mode 100644 lib/symbols.js create mode 100644 package.json create mode 100644 test/lib/get-messages.test.js create mode 100644 test/lib/group-handler.test.js create mode 100644 test/lib/handle-message.test.js create mode 100644 test/lib/partition-messages.test.js create mode 100644 test/lib/poller.test.js create mode 100644 test/lib/process-messages.test.js diff --git a/.eslintrc b/.eslintrc new file mode 100644 index 0000000..54cd496 --- /dev/null +++ b/.eslintrc @@ -0,0 +1,158 @@ +{ + "parserOptions": { + "ecmaVersion": 2020, + "sourceType": "module" + }, + + "env": { + "es6": true, + "es2017": true, + "es2020": true, + "node": true + }, + + "globals": { + "document": false, + "navigator": false, + "window": false, + "app": true + }, + + "plugins": ["prettier"], + + "rules": { + "arrow-spacing": ["error", { "before": true, "after": true }], + "callback-return": ["error", ["callback", "cb", "next", "done", "proceed"]], + "comma-spacing": ["error", { "before": false, "after": true }], + "comma-style": ["error", "last"], + "curly": ["error"], + "eol-last": ["error"], + "indent": ["error", 2, { "SwitchCase": 1 }], + "key-spacing": ["error", { "beforeColon": false, "afterColon": true }], + "linebreak-style": ["error", "unix"], + "prettier/prettier": ["error", { "singleQuote": true, "printWidth": 99 }], + "accessor-pairs": "error", + "constructor-super": "error", + "eqeqeq": ["error", "always", { "null": "ignore" }], + "handle-callback-err": ["error", "^(err|error)$"], + "new-cap": ["error", { "newIsCap": true, "capIsNew": false }], + "no-array-constructor": "error", + "no-caller": "error", + "no-class-assign": "error", + "no-compare-neg-zero": "error", + "no-cond-assign": "error", + "no-const-assign": "error", + "no-constant-condition": ["error", { "checkLoops": false }], + "no-control-regex": "error", + "no-debugger": "error", + "no-delete-var": "error", + "no-dupe-args": "error", + "no-dupe-class-members": "error", + "no-dupe-keys": "error", + "no-duplicate-case": "error", + "no-empty-character-class": "error", + "no-empty-pattern": "error", + "no-eval": "error", + "no-ex-assign": "error", + "no-extend-native": "error", + "no-extra-bind": "error", + "no-extra-boolean-cast": "error", + "no-fallthrough": "error", + "no-floating-decimal": "error", + "no-func-assign": "error", + "no-global-assign": "error", + "no-implied-eval": "error", + "no-inner-declarations": ["error", "functions"], + "no-invalid-regexp": "error", + "no-irregular-whitespace": "error", + "no-mixed-spaces-and-tabs": ["error", "smart-tabs"], + "no-mixed-operators": [ + "error", + { + "groups": [ + ["==", "!=", "===", "!==", ">", ">=", "<", "<="], + ["&&", "||"], + ["in", "instanceof"] + ], + "allowSamePrecedence": true + } + ], + "no-multi-spaces": "error", + "no-iterator": "error", + "no-label-var": "error", + "no-labels": ["error", { "allowLoop": false, "allowSwitch": false }], + "no-lone-blocks": "error", + "no-multi-str": "error", + "no-multiple-empty-lines": ["error", { "max": 1, "maxEOF": 0 }], + "no-trailing-spaces": ["error"], + "no-negated-in-lhs": "error", + "no-new": "error", + "no-new-func": "error", + "no-new-object": "error", + "no-new-require": "error", + "no-new-symbol": "error", + "no-new-wrappers": "error", + "no-obj-calls": "error", + "no-octal": "error", + "no-octal-escape": "error", + "no-path-concat": "error", + "no-proto": "error", + "no-redeclare": "error", + "no-regex-spaces": "error", + "no-return-assign": ["error", "except-parens"], + "no-return-await": "error", + "no-self-assign": "error", + "no-self-compare": "error", + "no-sequences": "error", + "no-shadow-restricted-names": "error", + "no-sparse-arrays": "error", + "no-template-curly-in-string": "error", + "no-this-before-super": "error", + "no-throw-literal": "error", + "no-undef": "error", + "no-undef-init": "error", + "no-unexpected-multiline": "error", + "no-unmodified-loop-condition": "error", + "no-unneeded-ternary": ["error", { "defaultAssignment": false }], + "no-unreachable": "error", + "no-unsafe-finally": "error", + "no-unsafe-negation": "error", + "no-unused-expressions": [ + "error", + { "allowShortCircuit": true, "allowTernary": true, "allowTaggedTemplates": true } + ], + "no-unused-vars": ["error", { "vars": "all", "args": "none", "ignoreRestSiblings": true }], + "no-use-before-define": [ + "error", + { "functions": false, "classes": false, "variables": false } + ], + "no-useless-call": "error", + "no-useless-computed-key": "error", + "no-useless-constructor": "error", + "no-useless-escape": "error", + "no-useless-rename": "error", + "no-useless-return": "error", + "semi": ["error", "always"], + "no-with": "error", + "one-var": ["error", { "initialized": "never" }], + "prefer-promise-reject-errors": "error", + "spaced-comment": [ + "error", + "always", + { + "line": { "markers": ["*package", "!", "/", ","] }, + "block": { + "balanced": true, + "markers": ["*package", "!", ",", ":", "::", "flow-include"], + "exceptions": ["*"] + } + } + ], + "symbol-description": "error", + "use-isnan": "error", + "valid-typeof": ["error", { "requireStringLiterals": true }], + "yoda": ["error", "never"] + }, + + "extends": ["prettier"] +} diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..3a4bf9d --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,23 @@ +name: Lint + +on: + pull_request: + branches: + - master + +jobs: + lint: + name: Lint Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node }} + registry-url: "https://registry.npmjs.org" + - name: Install Packages + run: npm install + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + - name: Lint Code + run: npm run lint diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml new file mode 100644 index 0000000..e8a3f92 --- /dev/null +++ b/.github/workflows/unit.yml @@ -0,0 +1,40 @@ +name: Unit + +on: + pull_request: + branches: + - master + +jobs: + run_tests: + name: Unit Tests + strategy: + matrix: + os: + - ubuntu-latest + node: + - 12.x + - 14.x + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v1 + - uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node }} + registry-url: "https://registry.npmjs.org" + - name: Install Packages + run: npm install + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + - name: Run Tests + run: npm run test + # - name: Coveralls Parallel + # uses: coverallsapp/github-action@master + # with: + # github-token: ${{ secrets.github_token }} + # parallel: true + # - name: Coveralls Finished + # uses: coverallsapp/github-action@master + # with: + # github-token: ${{ secrets.github_token }} + # parallel-finished: true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b558069 --- /dev/null +++ b/.gitignore @@ -0,0 +1,80 @@ +# Lock files +# pnpm-lock.yaml +# shrinkwrap.yaml +# package-lock.json +# yarn.lock + +# Logs +logs +*.log +npm-debug.log* + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +node_modules + +# Optional npm cache directory +.npm + +# Optional REPL history +.node_repl_history + +# 0x +.__browserify_string_empty.js +profile-* +*.flamegraph + +# tap --cov +.nyc_output/ + +# JetBrains IntelliJ IDEA +.idea/ +*.iml + +# VS Code +.vscode/ + +# xcode +build/* +*.mode1 +*.mode1v3 +*.mode2v3 +*.perspective +*.perspectivev3 +*.pbxuser +*.xcworkspace +xcuserdata + +# macOS +.DS_Store + +# keys +*.pem +*.env.json +*.env +.vagrant/* +Vagrantfile +functions/*/function.prod.json +functions/*/function.dev.json +aws_creds +.aws +dev.aws diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..3c27f33 --- /dev/null +++ b/.npmignore @@ -0,0 +1,21 @@ +test/ +.github/ +.circleci/ +.dependabot/ +.vscode/ +.idea/ +.nyc_output/ +.eslintrc +.prettierrc +.prettierignore +.lambda +.taprc +coverage/ +lib-cov/ +*.md +*.env +*.log +profile-* +*.flamegraph +npm-debug.log* +pnpmfile.js diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..6a6020f --- /dev/null +++ b/.npmrc @@ -0,0 +1,3 @@ +save-prefix=^ +package-lock=false +legacy-bundling=true diff --git a/.prettierignore b/.prettierignore new file mode 100644 index 0000000..cce0279 --- /dev/null +++ b/.prettierignore @@ -0,0 +1,2 @@ +package.json +package-lock.json diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..45c5bc5 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,6 @@ +{ + "printWidth": 99, + "singleQuote": true, + "trailingComma": "none", + "arrowParens": "avoid" +} diff --git a/.taprc b/.taprc new file mode 100644 index 0000000..426bf95 --- /dev/null +++ b/.taprc @@ -0,0 +1,9 @@ +files: 'test/**/*.test.js' + +# Disable extra loaders built-in to Tap +esm: false +jsx: false +ts: false + +# Adjust accordingly if your tests need more time +# timeout: 120 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1527da7 --- /dev/null +++ b/LICENSE @@ -0,0 +1 @@ +(c) Copyright 2020 Knockaway, Inc., all rights reserved. diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..d8863da --- /dev/null +++ b/Readme.md @@ -0,0 +1,111 @@ +# @knockaway/sqsiphon + +_sqsiphon_ is provides a framework for writing [Amazon SQS][sqs] polling +applications. It designed to poll, and process messages, as quickly as possible. +FIFO queues are supported. + +[sqs]: https://aws.amazon.com/sqs/ + +## Queue Processing + +1. _sqsiphon_ polls for up to 10 messages (the SQS allowed maximum). +2. The retrieved messages are inspected to determine if any are tagged as being + members of a FIFO queue. +3. Messages that are not tagged for a FIFO queue are placed into a general + processing batch. Any FIFO tagged messages are added to a batch specifically + for the tagged FIFO queue. For example, consider a poll event returns three + messages: message `A` is untagged, message `B` is tagged for "foo", and + message `C` is tagged for "bar". Message `A` will be put on the general + processing batch and two new batches will be created: "foo" and "bar", each + with one message added for processing. +4. All available processing batches are processed: the general batch's messages + are processed concurrently, and each FIFO batch is processed sequentially. +5. Messages that generate an error during processing are left on the queue. + +**FIFO Errors:** When a message on a FIFO queue cannot be processed successfully, +the message, and any remaining messages in the batch, will be left on the queue. +It is recommened that a corresponding dead letter queue be configured so that +these messages will be moved there by SQS. + +## Example + +```js +const { SQS } = require('aws-sdk'); +const sqs = new SQS({ + apiVersion: '2012-11-05', + region: 'us-east-1' +}); +const sqsiphonFactory = require('@knockaway/sqsiphon'); +const app = sqsiphonFactory({ + sqs, + queueUrl: 'url for the sqs queue', + handler: messageHandler +}); + +function shutdown(signal) { + app.stop(); +} +['SIGTERM', 'SIGINT'].forEach(signal => process.on(signal, shutdown)); + +if ((require.main === module) === true) { + app.start(); +} + +async function messageHandler(message) { + // `message` is an SQS message as returned by the `aws-sdk` + // ... + // Do something with the message or `throw Error('failed')` +} +``` + +## Factory Options + +This module exports a factory function which accepts an options object with the +following properties: + +- `logger` (optional): An object that follows the Log4j logger interface. + The default is an instance of [`abstract-logging`](https://npm.im/abstract-logging). +- `sqs` (required): An instance of `SQS` from the [`aws-sdk`](https://npm.im/aws-sdk). +- `queueUrl` (required): A string URL pointing to the SQS instance to poll. +- `handler` (required): A function to handle received messages. Must be an + `async function`. The function will receive one parameter: `message`. The + parameter is an instance of + [SQS Message](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html). + If the message cannot be processed for any reason, an instance of `Error` + should be thrown. If no error is thrown, the message has been considered to + be successfully processed. +- `tracer` (optional): an OpenTracing compliant tracer instance. +- `receiveMessageParameters` (optional): an object that conforms to the object + described by + [`sqs.receiveMessage`](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#receiveMessage-property). + The default has: `AttributeNames: ['All']`, `MaxNumberOfMessages: 10`, + `MessageAttributeNames: ['All']`, and `VisibilityTimeout: 30`. The `QueueUrl` + is always overridden by the passed in `queueUrl` value. + +## App Instance + +The factory returns an application instance. The application instance is +an event emitter. + +### Instance Methods And Properties + +- `isRunning` (boolean): indicates if the application is polling for messages + or not. +- `start()`: initiates the application to start polling for messages. +- `stop()`: initiates the application to stop polling for messages. Any + messages currently being processed will be completed. + +### Instance Events + +- `error`: fired when an unexpected error occurs. Receives an `Error` object. +- `request-error`: fired when a communication error occurs. Receives an + `Error` object. +- `processing-error`: fired when an error occurs while processing a message. + Receives an object with `error` and `message` properties. +- `fifo-processing-aborted`: fired when a FIFO batch stop processing due to an + error. Receives an object with `message` and `messages` properties. This event + will fire subsequent to a `processing-error` event. +- `received-messages`: fired when a new batch of messages has been received. + Receives an array of SQS message objects. +- `handled-message`: fired when a message has been successfully handled. + Receives an SQS message object. diff --git a/index.js b/index.js new file mode 100644 index 0000000..1b56e2a --- /dev/null +++ b/index.js @@ -0,0 +1,185 @@ +'use strict'; + +const { EventEmitter } = require('events'); +const opentracing = require('opentracing'); +const poller = require('./lib/poller'); +const keepAlive = require('./lib/keep-alive'); + +const { + symFifoSorter, + symHandler, + symLogger, + symQueueUrl, + symReceiveParams, + symRunning, + symSQS, + symTracer +} = require('./lib/symbols'); + +/** + * A utility that polls an AWS SQS queue for new messages and feeds them through + * a processor. It processes standard queue messages concurrently, but processes + * FIFO based queues in sequential order per partition. + * + * @typedef sqsiphon + */ +const proto = Object.create(EventEmitter.prototype, { + [Symbol.toStringTag]: { value: 'sqsiphon' }, + + [symRunning]: { value: false, writable: true }, + /** + * Indicates if the instance is polling for new messages. + * + * @memberof sqsiphon + * @instance + */ + isRunning: { + get() { + return this[symRunning]; + } + }, + + /** + * Initiates polling for new messages. + * + * @memberof sqsiphon + * @instance + */ + start: { + value: function start() { + if (this.isRunning) { + return; + } + this[symRunning] = true; + keepAlive.call(this); + + setImmediate(doPoll.bind(this)); + + async function doPoll() { + if (this.isRunning === false) { + return; + } + await poller.call(this); + setImmediate(doPoll.bind(this)); + } + } + }, + + /** + * Stops polling for new messages. + * + * @memberof sqsiphon + * @instance + */ + stop: { + value: function stop() { + this[symRunning] = false; + } + } +}); + +/** + * Fired when an unexpected error has occured. + * + * @event sqsiphon#error + * @type {Error} + */ + +/** + * Fired when a communication error has occurred when attempting to retrieve + * messages from SQS. + * + * @event sqsiphon#request-error + * @type {Error} + */ + +/** + * Fired when an error has occurred while handling a message. Either the + * message handler has failed or we were unable to delete the message from + * the queue. + * + * @event sqsiphon#processing-error + * @type {object} + * @property {Error} error + * @property {object} message The message that was being processed when the + * erorr occurred. + */ + +/** + * Fired when the processing of a FIFO partition has been stopped due to a + * message handling error. The {@see sqsiphon#processing-error} event will be + * fired alongside this event. + * + * @event sqsiphon#fifo-processing-aborted + * @type {object} + * @property {object} message The message that was being handled when the + * error occurred. + * @property {object[]} messages The queue of messages that was being processed. + */ + +/** + * Fired when a new batch of messages has been received from SQS. + * + * @event sqsiphon#received-messages + * @type {object[]} An array of SQS message objects. + */ + +/** + * Fired when a message has been successfully handled by the user provided + * handler function. + * + * @event sqsiphon#handled-message + * @type {object} The SQS message that was handled. + */ + +/** */ +const defaultOptions = { + logger: require('abstract-logging'), + sqs: undefined, + queueUrl: undefined, + handler: undefined, + receiveMessageParameters: { + AttributeNames: ['All'], + MaxNumberOfMessages: 10, + MessageAttributeNames: ['All'], + VisibilityTimeout: 30 + }, + fifoSorter: () => {}, + tracer: undefined +}; + +module.exports = function sqsiphonFactory(options) { + const opts = Object.assign({}, defaultOptions, options); + + let tracer; + if (opts.tracer) { + opentracing.initGlobalTracer(opts.tracer); + tracer = opentracing.globalTracer(); + } else { + tracer = new opentracing.Tracer(); + } + + const { sqs, queueUrl, handler, fifoSorter, logger } = opts; + if (!sqs) { + throw Error('must supply `sqs` instance'); + } + if (!queueUrl || typeof queueUrl !== 'string') { + throw Error('must supply string for `queueUrl`'); + } + if (!handler || Function.prototype.isPrototypeOf(handler) === false) { + throw Error('must provide `handler` function'); + } + + const receiveParams = { QueueUrl: queueUrl, ...opts.receiveMessageParameters }; + + const app = Object.create(proto, { + [symLogger]: { value: logger }, + [symHandler]: { value: handler }, + [symSQS]: { value: sqs }, + [symQueueUrl]: { value: queueUrl }, + [symReceiveParams]: { value: receiveParams }, + [symFifoSorter]: { value: fifoSorter }, + [symTracer]: { value: tracer } + }); + return app; +}; diff --git a/lib/get-messages.js b/lib/get-messages.js new file mode 100644 index 0000000..559e937 --- /dev/null +++ b/lib/get-messages.js @@ -0,0 +1,29 @@ +'use strict'; + +const { symReceiveParams, symSQS } = require('./symbols'); + +/** + * Query the configures SQS queue for new messages. + * + * @param {object} input + * @param {object} input.app An `sqsiphon` application instance that has an + * associated SQS client instance and message receive parameters object. + * + * @returns {object} Will have an `error` property if there was a communication + * error with AWS. Otherwise, will have a `value` property set to an array + * of SQS messages. + */ +module.exports = async function getMessages({ app }) { + const sqs = app[symSQS]; + const rececieveParams = app[symReceiveParams]; + const messages = []; + + try { + const response = await sqs.receiveMessage(rececieveParams).promise(); + Array.prototype.push.apply(messages, response.Messages || []); + } catch (error) { + return { error }; + } + + return { value: messages }; +}; diff --git a/lib/group-handler.js b/lib/group-handler.js new file mode 100644 index 0000000..e2657e9 --- /dev/null +++ b/lib/group-handler.js @@ -0,0 +1,24 @@ +'use strict'; + +const _handleMessage = require('./handle-message'); + +/** + * Iterate a FIFO queue of `messages` in sequential order and feed each + * message through the configured message handler. Any error in processing a + * message will result in the remaining messages being left unprocessed. + * + * @param {object} input + * @param {object[]} input.messages A set of SQS messages from a FIFO partition. + * @param {object} input.app A fully configures `sqsiphon` application instance. + * + * @fires sqsiphon#fifo-processing-aborted + */ +module.exports = async function groupHandler({ messages, app, handleMessage = _handleMessage }) { + for (const message of messages) { + const result = await handleMessage({ message, app }); + if (result === false) { + app.emit('fifo-processing-aborted', { message, messages }); + break; + } + } +}; diff --git a/lib/handle-message.js b/lib/handle-message.js new file mode 100644 index 0000000..c35fac4 --- /dev/null +++ b/lib/handle-message.js @@ -0,0 +1,35 @@ +'use strict'; + +const { symHandler, symQueueUrl, symSQS } = require('./symbols'); + +/** + * Invokes the user supplied message handler function and removes the message + * from the queue if the handler succeeds. + * + * @param {object} input + * @param {object} input.message An SQS message object. + * @param {object} input.app A fully configured `sqsiphon` instance. + * + * @fires sqsiphon#handled-message + * @fires sqsiphon#processing-error + * + * @returns {boolean} `true` if no error occured, `false` otherwise. + */ +module.exports = async function handleMessage({ message, app }) { + const sqs = app[symSQS]; + const queueUrl = app[symQueueUrl]; + const handler = app[symHandler]; + + try { + await handler(message); + await sqs + .deleteMessage({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }) + .promise(); + app.emit('handled-message', { message }); + } catch (error) { + app.emit('processing-error', { error, message }); + return false; + } + + return true; +}; diff --git a/lib/keep-alive.js b/lib/keep-alive.js new file mode 100644 index 0000000..21b9758 --- /dev/null +++ b/lib/keep-alive.js @@ -0,0 +1,8 @@ +'use strict'; + +module.exports = function keepAlive() { + if (this.isRunning === false) { + return; + } + return setImmediate(keepAlive.bind(this)); +}; diff --git a/lib/partition-messages.js b/lib/partition-messages.js new file mode 100644 index 0000000..2997db6 --- /dev/null +++ b/lib/partition-messages.js @@ -0,0 +1,36 @@ +'use strict'; + +/** + * Given an array of SQS message instances, iterate the messages to look for + * FIFO group identifier attributes. For messages that do not have such + * identifiers, add them to a default partition. For messages that do have + * such identifiers, segregate each message into a parition matching the FIFO + * group identifier of the message. + * + * @param {object} input + * @param {object[]} input.messages An array of SQS message objects. + * + * @returns {object} Standard, non-FIFO, messages are attached to the "default" + * property (partition). Any other properties on this object match the found + * FIFO group identifiers. Each property on the object is an array of SQS + * messages. + */ +module.exports = function partitionMessages({ messages }) { + const partitions = { + default: [] + }; + for (const message of messages) { + if (!message.Attributes || !message.Attributes.MessageGroupId) { + partitions.default.push(message); + continue; + } + const groupId = message.Attributes.MessageGroupId; + const queue = partitions[groupId]; + if (Array.isArray(queue)) { + queue.push(message); + } else { + partitions[groupId] = [message]; + } + } + return partitions; +}; diff --git a/lib/poller.js b/lib/poller.js new file mode 100644 index 0000000..60ea724 --- /dev/null +++ b/lib/poller.js @@ -0,0 +1,62 @@ +'use strict'; + +const { symLogger, symTracer } = require('./symbols'); +const _getMessages = require('./get-messages'); +const _processMessages = require('./process-messages'); + +/** + * Polls for new messages on an SQS queue and processes the messages. Must be + * invoked with the `this` context set to an "app" instance. + * + * @fires sqsiphon#request-error + * @fires sqsiphon#error + * @fires sqsiphon#received-messages + */ +module.exports = async function poller({ + getMessages = _getMessages, + processMessages = _processMessages +} = {}) { + const app = this; + const log = app[symLogger]; + const tracer = app[symTracer]; + + // TODO: extract any possible parent span and create new one as a child + const span = tracer.startSpan('sqs_poll'); + + log.trace('polling for new messages'); + const getMessagesResult = await getMessages({ app }); + if (getMessagesResult.error) { + const error = getMessagesResult.error; + span.setTag('error', true); + span.log({ + event: 'error', + 'error.object': error, + message: error.message, + stack: error.stack + }); + + log.trace('encountered communication error', { error }); + if (app.listenerCount('request-error') > 0) { + app.emit('request-error', error); + } else { + // If the user isn't listening for the specific event we will fallback + // to the baseline error event so that Node will barf up stacks and such. + app.emit('error', error); + } + + span.finish(); + return; + } + + const messages = getMessagesResult.value; + if (messages.length === 0) { + log.trace('zero messages received'); + span.finish(); + return; + } + app.emit('received-messages', messages); + log.trace('processing messages', { messagesCount: messages.length }); + await processMessages({ messages, app }); + + span.finish(); +}; diff --git a/lib/process-messages.js b/lib/process-messages.js new file mode 100644 index 0000000..c4fac65 --- /dev/null +++ b/lib/process-messages.js @@ -0,0 +1,41 @@ +'use strict'; + +const _partitionMessages = require('./partition-messages'); +const _handleMessage = require('./handle-message'); +const _groupHandler = require('./group-handler'); +const { symFifoSorter } = require('./symbols'); + +/** + * Iterates a set of SQS `messages` and feeds them through the configured + * message handler. + * + * @param {object} input + * @param {object[]} input.messages An array of SQS messages. + * @param {object} input.app A fully configured `sqsiphon` application instance. + */ +module.exports = async function processMessages({ + messages, + app, + partitionMessages = _partitionMessages, + handleMessage = _handleMessage, + groupHandler = _groupHandler +}) { + const partitions = partitionMessages({ messages }); + const { default: defaultPartition, ...fifoPartitions } = partitions; + + // We can process the default queue concurrently because it does not contain + // any FIFO queue messages. + const promises = defaultPartition.map(msg => handleMessage({ message: msg, app })); + await Promise.all(promises); + + // For FIFO queues we need to process each queue's messages sequentially. + // We also need to _stop_ processing a queue if one of the messages is not + // handled correctly. + const fifoSorter = app[symFifoSorter]; + const fifoPromises = []; + for (const partitionId in fifoPartitions) { + const messages = fifoPartitions[partitionId].sort(fifoSorter); + fifoPromises.push(groupHandler({ messages, app })); + } + await Promise.all(fifoPromises); +}; diff --git a/lib/symbols.js b/lib/symbols.js new file mode 100644 index 0000000..bac04a4 --- /dev/null +++ b/lib/symbols.js @@ -0,0 +1,21 @@ +'use strict'; + +const symLogger = Symbol('sqsiphon.logger'); +const symRunning = Symbol('sqsiphon.isRunning'); +const symSQS = Symbol('sqsiphon.sqs'); +const symQueueUrl = Symbol('sqsiphon.queueUrl'); +const symReceiveParams = Symbol('sqsiphon.receiveParams'); +const symHandler = Symbol('sqsiphon.handler'); +const symFifoSorter = Symbol('sqsiphon.fifoSorter'); +const symTracer = Symbol('sqsiphon.tracer'); + +module.exports = { + symLogger, + symRunning, + symSQS, + symQueueUrl, + symReceiveParams, + symHandler, + symFifoSorter, + symTracer +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..7c11edd --- /dev/null +++ b/package.json @@ -0,0 +1,36 @@ +{ + "name": "@knockaway/sqsiphon", + "main": "index.js", + "version": "1.0.0", + "homepage": "https://github.com/knockaway/@knockaway/sqsiphon", + "license": "LicenseRef-LICENSE", + "repository": { + "type": "git", + "url": "https://github.com/knockaway/@knockaway/sqsiphon.git" + }, + "dependencies": { + "opentracing": "^0.14.4" + }, + "devDependencies": { + "tap": "^14.10.7", + "eslint": "^7.3.1", + "eslint-config-prettier": "^6.11.0", + "eslint-plugin-prettier": "^3.1.4", + "husky": "^4.2.5", + "prettier": "^2.0.5" + }, + "scripts": { + "check-format": "prettier --list-different '*.js' 'lib/**/*.js' 'test/**/*.js'", + "format": "prettier --write '*.js' 'lib/**/*.js' 'test/**/*.js'", + "lint": "eslint '*.js' 'lib/**/*.js' 'test/**/*.js'", + "test": "LOG_LEVEL=silent tap --no-cov", + "test:cov": "LOG_LEVEL=silent tap", + "test:cov:html": "LOG_LEVEL=silent tap --coverage-report=html", + "test:watch": "LOG_LEVEL=silent tap -n -w --no-coverage-report" + }, + "husky": { + "hooks": { + "pre-commit": "npm run lint && npm run test" + } + } +} diff --git a/test/lib/get-messages.test.js b/test/lib/get-messages.test.js new file mode 100644 index 0000000..89b3d97 --- /dev/null +++ b/test/lib/get-messages.test.js @@ -0,0 +1,52 @@ +'use strict'; + +const tap = require('tap'); +const symbols = require('../../lib/symbols'); +const getMessages = require('../../lib/get-messages'); + +tap.test('returns error if cannot communicate with sqs', async t => { + const sqs = { + receiveMessage() { + return this; + }, + promise() { + return Promise.reject(Error('broken sqs')); + } + }; + const app = { + [symbols.symSQS]: sqs, + [symbols.symReceiveParams]: { QueueUrl: 'http://sqs.example.com' } + }; + + const response = await getMessages({ app }); + t.match(response, { + error: { + message: 'broken sqs' + } + }); +}); + +tap.test('returns messages on success', async t => { + const sqs = { + receiveMessage(params) { + t.deepEqual(params, { QueueUrl: 'http://sqs.example.com' }); + return this; + }, + promise() { + return Promise.resolve({ Messages: [{ message: 'one' }] }); + } + }; + const app = { + [symbols.symSQS]: sqs, + [symbols.symReceiveParams]: { QueueUrl: 'http://sqs.example.com' } + }; + + const response = await getMessages({ app }); + t.deepEqual(response, { + value: [ + { + message: 'one' + } + ] + }); +}); diff --git a/test/lib/group-handler.test.js b/test/lib/group-handler.test.js new file mode 100644 index 0000000..49b2483 --- /dev/null +++ b/test/lib/group-handler.test.js @@ -0,0 +1,33 @@ +'use strict'; + +const tap = require('tap'); +const groupHandler = require('../../lib/group-handler'); + +tap.test('emits aborted for failed handler', async t => { + const app = { + emit(event, body) { + t.is(event, 'fifo-processing-aborted'); + t.deepEqual(body, { message: 'foo', messages: ['foo'] }); + } + }; + async function handleMessage() { + return false; + } + + await groupHandler({ messages: ['foo'], app, handleMessage }); +}); + +tap.test('does not emit for all successes', async t => { + const app = { + emit() { + t.fail('should not be invoked'); + } + }; + async function handleMessage(params) { + t.is(params.app, app); + t.is(params.message, 'foo'); + return true; + } + + await groupHandler({ messages: ['foo'], app, handleMessage }); +}); diff --git a/test/lib/handle-message.test.js b/test/lib/handle-message.test.js new file mode 100644 index 0000000..a35af91 --- /dev/null +++ b/test/lib/handle-message.test.js @@ -0,0 +1,140 @@ +'use strict'; + +const tap = require('tap'); +const symbols = require('../../lib/symbols'); +const handleMessage = require('../../lib/handle-message'); + +tap.test('emits processing-error for bad handler', async t => { + const sqs = { + deleteMessage() { + t.fail('should not be invoked'); + return this; + }, + promise() { + t.fail('should not be invoked'); + return Promise.reject(Error('broken sqs')); + } + }; + async function handler() { + throw Error('broken handler'); + } + const app = { + [symbols.symSQS]: sqs, + [symbols.symQueueUrl]: 'http://sqs.example.com', + [symbols.symHandler]: handler, + + emit(event, body) { + t.is(event, 'processing-error'); + t.match(body, { + message: 'foo', + error: { + message: 'broken handler' + } + }); + } + }; + + const result = await handleMessage({ message: 'foo', app }); + t.is(result, false); +}); + +tap.test('emits processing-error for broken sqs', async t => { + const sqs = { + deleteMessage() { + return this; + }, + promise() { + return Promise.reject(Error('broken sqs')); + } + }; + async function handler() { + return true; + } + const app = { + [symbols.symSQS]: sqs, + [symbols.symQueueUrl]: 'http://sqs.example.com', + [symbols.symHandler]: handler, + + emit(event, body) { + t.is(event, 'processing-error'); + t.match(body, { + message: 'foo', + error: { + message: 'broken sqs' + } + }); + } + }; + + const result = await handleMessage({ message: 'foo', app }); + t.is(result, false); +}); + +tap.test('emits handled-message on success', async t => { + const sqs = { + deleteMessage(params) { + t.deepEqual(params, { + QueueUrl: 'http://sqs.example.com', + ReceiptHandle: 'handle-1' + }); + return this; + }, + promise() { + return Promise.resolve(); + } + }; + async function handler(message) { + t.deepEqual(message, { foo: 'foo', ReceiptHandle: 'handle-1' }); + return true; + } + const app = { + [symbols.symSQS]: sqs, + [symbols.symQueueUrl]: 'http://sqs.example.com', + [symbols.symHandler]: handler, + + emit(event, body) { + t.is(event, 'handled-message'); + t.deepEqual(body, { message: { foo: 'foo', ReceiptHandle: 'handle-1' } }); + } + }; + + const result = await handleMessage({ message: { foo: 'foo', ReceiptHandle: 'handle-1' }, app }); + t.is(result, true); +}); + +tap.test('does not throw for rejections', async t => { + const sqs = { + deleteMessage() { + return this; + }, + promise() { + return Promise.reject(Error('sqs failed')); + } + }; + async function handler() { + return true; + } + const app = { + [symbols.symSQS]: sqs, + [symbols.symQueueUrl]: 'http://sqs.example.com', + [symbols.symHandler]: handler, + + emit(event, body) { + t.is(event, 'processing-error'); + t.deepEqual(body, { + error: { message: 'sqs failed', name: 'Error' }, + message: { foo: 'foo', ReceiptHandle: 'handle-1' } + }); + } + }; + + try { + const result = await handleMessage({ + message: { foo: 'foo', ReceiptHandle: 'handle-1' }, + app + }); + t.is(result, false); + } catch (error) { + t.error(error); + } +}); diff --git a/test/lib/partition-messages.test.js b/test/lib/partition-messages.test.js new file mode 100644 index 0000000..748aeca --- /dev/null +++ b/test/lib/partition-messages.test.js @@ -0,0 +1,23 @@ +'use strict'; + +const tap = require('tap'); +const partitionMessages = require('../../lib/partition-messages'); + +tap.test('partitions messages based on attributes', async t => { + const messages = [ + { Attributes: { MessageGroupId: 'one' }, Body: 'foo' }, + { Body: 'non-fifo' }, + { Attributes: { MessageGroupId: 'one' }, Body: 'bar' }, + { Attributes: { MessageGroupId: 'two' }, Body: 'foo' } + ]; + const partitioned = partitionMessages({ messages }); + + t.deepEqual(partitioned, { + default: [{ Body: 'non-fifo' }], + one: [ + { Attributes: { MessageGroupId: 'one' }, Body: 'foo' }, + { Attributes: { MessageGroupId: 'one' }, Body: 'bar' } + ], + two: [{ Attributes: { MessageGroupId: 'two' }, Body: 'foo' }] + }); +}); diff --git a/test/lib/poller.test.js b/test/lib/poller.test.js new file mode 100644 index 0000000..5506ed6 --- /dev/null +++ b/test/lib/poller.test.js @@ -0,0 +1,99 @@ +'use strict'; + +const tap = require('tap'); +const opentracing = require('opentracing'); +const symbols = require('../../lib/symbols'); +const poller = require('../../lib/poller'); + +tap.test('fires error event if cannot get messages', async t => { + const app = { + [symbols.symLogger]: { trace() {} }, + [symbols.symTracer]: new opentracing.MockTracer(), + listenerCount() { + return 0; + }, + emit(event, body) { + t.is(event, 'error'); + t.match(body, /broken messages/); + } + }; + async function getMessages() { + return { error: Error('broken messages') }; + } + + const result = await poller.call(app, { getMessages }); + t.is(result, undefined); +}); + +tap.test('fires request-error event if cannot get messages and listener registered', async t => { + const app = { + [symbols.symLogger]: { trace() {} }, + [symbols.symTracer]: new opentracing.MockTracer(), + listenerCount(event) { + t.is(event, 'request-error'); + return 1; + }, + emit(event, body) { + t.is(event, 'request-error'); + t.match(body, /broken messages/); + } + }; + async function getMessages() { + return { error: Error('broken messages') }; + } + + const result = await poller.call(app, { getMessages }); + t.is(result, undefined); +}); + +tap.test('merely logs when no messages to process', async t => { + const app = { + [symbols.symLogger]: { + trace(msg) { + t.true(['polling for new messages', 'zero messages received'].includes(msg)); + } + }, + [symbols.symTracer]: new opentracing.MockTracer(), + listenerCount() { + t.fail('should not be invoked'); + }, + emit() { + t.fail('should not be invoked'); + } + }; + async function getMessages() { + return { value: [] }; + } + + const result = await poller.call(app, { getMessages }); + t.is(result, undefined); +}); + +tap.test('emits received-messages and invoked processor', async t => { + const app = { + [symbols.symLogger]: { + trace(msg) { + t.true(['polling for new messages', 'processing messages'].includes(msg)); + } + }, + [symbols.symTracer]: new opentracing.MockTracer(), + listenerCount() { + t.fail('should not be invoked'); + }, + emit(event, body) { + t.is(event, 'received-messages'); + t.deepEqual(body, [{ foo: 'foo' }]); + } + }; + async function getMessages({ app: _app }) { + t.is(_app, app); + return { value: [{ foo: 'foo' }] }; + } + async function processMessages({ messages, app: _app }) { + t.is(_app, app); + t.deepEqual(messages, [{ foo: 'foo' }]); + } + + const result = await poller.call(app, { getMessages, processMessages }); + t.is(result, undefined); +}); diff --git a/test/lib/process-messages.test.js b/test/lib/process-messages.test.js new file mode 100644 index 0000000..b67607d --- /dev/null +++ b/test/lib/process-messages.test.js @@ -0,0 +1,101 @@ +'use strict'; + +const tap = require('tap'); +const symbols = require('../../lib/symbols'); +const processMessages = require('../../lib/process-messages'); + +tap.test('throws if cannot parition messages', async t => { + function partitionMessages() { + throw Error('broken partition'); + } + try { + await processMessages({ messages: ['foo'], partitionMessages }); + t.fail('should not be invoked'); + } catch (error) { + t.match(error, /broken partition/); + } +}); + +tap.test('throws if cannot handle messages', async t => { + function partitionMessages({ messages }) { + t.deepEqual(messages, ['foo']); + return { default: ['foo'] }; + } + + async function handleMessage() { + throw Error('cannot handle message'); + } + + try { + await processMessages({ messages: ['foo'], partitionMessages, handleMessage }); + t.fail('should not be invoked'); + } catch (error) { + t.match(error, /cannot handle message/); + } +}); + +tap.test('throws if cannot handle fifo messages', async t => { + const app = { + [symbols.symFifoSorter]: () => {} + }; + + function partitionMessages({ messages }) { + t.deepEqual(messages, ['foo']); + return { default: [], fifo: ['foo'] }; + } + + async function handleMessage() { + t.fail('should not be invoked'); + } + + async function groupHandler() { + throw Error('cannot handle fifo message'); + } + + try { + await processMessages({ + messages: ['foo'], + app, + partitionMessages, + handleMessage, + groupHandler + }); + t.fail('should not be invoked'); + } catch (error) { + t.match(error, /cannot handle fifo message/); + } +}); + +tap.test('does not throw on all success', async t => { + const app = { + [symbols.symFifoSorter]: () => {} + }; + + function partitionMessages({ messages }) { + t.deepEqual(messages, ['foo']); + return { default: ['foo'], fifo: ['bar'] }; + } + + async function handleMessage({ message, app: _app }) { + t.is(_app, app); + t.is(message, 'foo'); + } + + async function groupHandler({ messages, app: _app }) { + t.is(_app, app); + t.deepEqual(messages, ['bar']); + } + + try { + await processMessages({ + messages: ['foo'], + app, + partitionMessages, + handleMessage, + groupHandler + }); + t.pass(); + } catch (error) { + t.error(error); + } +});