Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
jsumners committed Jul 14, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
0 parents commit 09ba958
Showing 27 changed files with 1,379 additions and 0 deletions.
158 changes: 158 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
{
"parserOptions": {
"ecmaVersion": 2020,
"sourceType": "module"
},

"env": {
"es6": true,
"es2017": true,
"es2020": true,
"node": true
},

"globals": {
"document": false,
"navigator": false,
"window": false,
"app": true
},

"plugins": ["prettier"],

"rules": {
"arrow-spacing": ["error", { "before": true, "after": true }],
"callback-return": ["error", ["callback", "cb", "next", "done", "proceed"]],
"comma-spacing": ["error", { "before": false, "after": true }],
"comma-style": ["error", "last"],
"curly": ["error"],
"eol-last": ["error"],
"indent": ["error", 2, { "SwitchCase": 1 }],
"key-spacing": ["error", { "beforeColon": false, "afterColon": true }],
"linebreak-style": ["error", "unix"],
"prettier/prettier": ["error", { "singleQuote": true, "printWidth": 99 }],
"accessor-pairs": "error",
"constructor-super": "error",
"eqeqeq": ["error", "always", { "null": "ignore" }],
"handle-callback-err": ["error", "^(err|error)$"],
"new-cap": ["error", { "newIsCap": true, "capIsNew": false }],
"no-array-constructor": "error",
"no-caller": "error",
"no-class-assign": "error",
"no-compare-neg-zero": "error",
"no-cond-assign": "error",
"no-const-assign": "error",
"no-constant-condition": ["error", { "checkLoops": false }],
"no-control-regex": "error",
"no-debugger": "error",
"no-delete-var": "error",
"no-dupe-args": "error",
"no-dupe-class-members": "error",
"no-dupe-keys": "error",
"no-duplicate-case": "error",
"no-empty-character-class": "error",
"no-empty-pattern": "error",
"no-eval": "error",
"no-ex-assign": "error",
"no-extend-native": "error",
"no-extra-bind": "error",
"no-extra-boolean-cast": "error",
"no-fallthrough": "error",
"no-floating-decimal": "error",
"no-func-assign": "error",
"no-global-assign": "error",
"no-implied-eval": "error",
"no-inner-declarations": ["error", "functions"],
"no-invalid-regexp": "error",
"no-irregular-whitespace": "error",
"no-mixed-spaces-and-tabs": ["error", "smart-tabs"],
"no-mixed-operators": [
"error",
{
"groups": [
["==", "!=", "===", "!==", ">", ">=", "<", "<="],
["&&", "||"],
["in", "instanceof"]
],
"allowSamePrecedence": true
}
],
"no-multi-spaces": "error",
"no-iterator": "error",
"no-label-var": "error",
"no-labels": ["error", { "allowLoop": false, "allowSwitch": false }],
"no-lone-blocks": "error",
"no-multi-str": "error",
"no-multiple-empty-lines": ["error", { "max": 1, "maxEOF": 0 }],
"no-trailing-spaces": ["error"],
"no-negated-in-lhs": "error",
"no-new": "error",
"no-new-func": "error",
"no-new-object": "error",
"no-new-require": "error",
"no-new-symbol": "error",
"no-new-wrappers": "error",
"no-obj-calls": "error",
"no-octal": "error",
"no-octal-escape": "error",
"no-path-concat": "error",
"no-proto": "error",
"no-redeclare": "error",
"no-regex-spaces": "error",
"no-return-assign": ["error", "except-parens"],
"no-return-await": "error",
"no-self-assign": "error",
"no-self-compare": "error",
"no-sequences": "error",
"no-shadow-restricted-names": "error",
"no-sparse-arrays": "error",
"no-template-curly-in-string": "error",
"no-this-before-super": "error",
"no-throw-literal": "error",
"no-undef": "error",
"no-undef-init": "error",
"no-unexpected-multiline": "error",
"no-unmodified-loop-condition": "error",
"no-unneeded-ternary": ["error", { "defaultAssignment": false }],
"no-unreachable": "error",
"no-unsafe-finally": "error",
"no-unsafe-negation": "error",
"no-unused-expressions": [
"error",
{ "allowShortCircuit": true, "allowTernary": true, "allowTaggedTemplates": true }
],
"no-unused-vars": ["error", { "vars": "all", "args": "none", "ignoreRestSiblings": true }],
"no-use-before-define": [
"error",
{ "functions": false, "classes": false, "variables": false }
],
"no-useless-call": "error",
"no-useless-computed-key": "error",
"no-useless-constructor": "error",
"no-useless-escape": "error",
"no-useless-rename": "error",
"no-useless-return": "error",
"semi": ["error", "always"],
"no-with": "error",
"one-var": ["error", { "initialized": "never" }],
"prefer-promise-reject-errors": "error",
"spaced-comment": [
"error",
"always",
{
"line": { "markers": ["*package", "!", "/", ","] },
"block": {
"balanced": true,
"markers": ["*package", "!", ",", ":", "::", "flow-include"],
"exceptions": ["*"]
}
}
],
"symbol-description": "error",
"use-isnan": "error",
"valid-typeof": ["error", { "requireStringLiterals": true }],
"yoda": ["error", "never"]
},

"extends": ["prettier"]
}
23 changes: 23 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Lint

on:
pull_request:
branches:
- master

jobs:
lint:
name: Lint Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
registry-url: "https://registry.npmjs.org"
- name: Install Packages
run: npm install
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Lint Code
run: npm run lint
40 changes: 40 additions & 0 deletions .github/workflows/unit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Unit

on:
pull_request:
branches:
- master

jobs:
run_tests:
name: Unit Tests
strategy:
matrix:
os:
- ubuntu-latest
node:
- 12.x
- 14.x
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
registry-url: "https://registry.npmjs.org"
- name: Install Packages
run: npm install
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Run Tests
run: npm run test
# - name: Coveralls Parallel
# uses: coverallsapp/github-action@master
# with:
# github-token: ${{ secrets.github_token }}
# parallel: true
# - name: Coveralls Finished
# uses: coverallsapp/github-action@master
# with:
# github-token: ${{ secrets.github_token }}
# parallel-finished: true
80 changes: 80 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Lock files
# pnpm-lock.yaml
# shrinkwrap.yaml
# package-lock.json
# yarn.lock

# Logs
logs
*.log
npm-debug.log*

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directory
node_modules

# Optional npm cache directory
.npm

# Optional REPL history
.node_repl_history

# 0x
.__browserify_string_empty.js
profile-*
*.flamegraph

# tap --cov
.nyc_output/

# JetBrains IntelliJ IDEA
.idea/
*.iml

# VS Code
.vscode/

# xcode
build/*
*.mode1
*.mode1v3
*.mode2v3
*.perspective
*.perspectivev3
*.pbxuser
*.xcworkspace
xcuserdata

# macOS
.DS_Store

# keys
*.pem
*.env.json
*.env
.vagrant/*
Vagrantfile
functions/*/function.prod.json
functions/*/function.dev.json
aws_creds
.aws
dev.aws
21 changes: 21 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
test/
.github/
.circleci/
.dependabot/
.vscode/
.idea/
.nyc_output/
.eslintrc
.prettierrc
.prettierignore
.lambda
.taprc
coverage/
lib-cov/
*.md
*.env
*.log
profile-*
*.flamegraph
npm-debug.log*
pnpmfile.js
3 changes: 3 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
save-prefix=^
package-lock=false
legacy-bundling=true
2 changes: 2 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package.json
package-lock.json
6 changes: 6 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"printWidth": 99,
"singleQuote": true,
"trailingComma": "none",
"arrowParens": "avoid"
}
9 changes: 9 additions & 0 deletions .taprc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
files: 'test/**/*.test.js'

# Disable extra loaders built-in to Tap
esm: false
jsx: false
ts: false

# Adjust accordingly if your tests need more time
# timeout: 120
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(c) Copyright 2020 Knockaway, Inc., all rights reserved.
111 changes: 111 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# @knockaway/sqsiphon

_sqsiphon_ is provides a framework for writing [Amazon SQS][sqs] polling
applications. It designed to poll, and process messages, as quickly as possible.
FIFO queues are supported.

[sqs]: https://aws.amazon.com/sqs/

## Queue Processing

1. _sqsiphon_ polls for up to 10 messages (the SQS allowed maximum).
2. The retrieved messages are inspected to determine if any are tagged as being
members of a FIFO queue.
3. Messages that are not tagged for a FIFO queue are placed into a general
processing batch. Any FIFO tagged messages are added to a batch specifically
for the tagged FIFO queue. For example, consider a poll event returns three
messages: message `A` is untagged, message `B` is tagged for "foo", and
message `C` is tagged for "bar". Message `A` will be put on the general
processing batch and two new batches will be created: "foo" and "bar", each
with one message added for processing.
4. All available processing batches are processed: the general batch's messages
are processed concurrently, and each FIFO batch is processed sequentially.
5. Messages that generate an error during processing are left on the queue.

**FIFO Errors:** When a message on a FIFO queue cannot be processed successfully,
the message, and any remaining messages in the batch, will be left on the queue.
It is recommened that a corresponding dead letter queue be configured so that
these messages will be moved there by SQS.

## Example

```js
const { SQS } = require('aws-sdk');
const sqs = new SQS({
apiVersion: '2012-11-05',
region: 'us-east-1'
});
const sqsiphonFactory = require('@knockaway/sqsiphon');
const app = sqsiphonFactory({
sqs,
queueUrl: 'url for the sqs queue',
handler: messageHandler
});

function shutdown(signal) {
app.stop();
}
['SIGTERM', 'SIGINT'].forEach(signal => process.on(signal, shutdown));

if ((require.main === module) === true) {
app.start();
}

async function messageHandler(message) {
// `message` is an SQS message as returned by the `aws-sdk`
// ...
// Do something with the message or `throw Error('failed')`
}
```

## Factory Options

This module exports a factory function which accepts an options object with the
following properties:

- `logger` (optional): An object that follows the Log4j logger interface.
The default is an instance of [`abstract-logging`](https://npm.im/abstract-logging).
- `sqs` (required): An instance of `SQS` from the [`aws-sdk`](https://npm.im/aws-sdk).
- `queueUrl` (required): A string URL pointing to the SQS instance to poll.
- `handler` (required): A function to handle received messages. Must be an
`async function`. The function will receive one parameter: `message`. The
parameter is an instance of
[SQS Message](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Message.html).
If the message cannot be processed for any reason, an instance of `Error`
should be thrown. If no error is thrown, the message has been considered to
be successfully processed.
- `tracer` (optional): an OpenTracing compliant tracer instance.
- `receiveMessageParameters` (optional): an object that conforms to the object
described by
[`sqs.receiveMessage`](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#receiveMessage-property).
The default has: `AttributeNames: ['All']`, `MaxNumberOfMessages: 10`,
`MessageAttributeNames: ['All']`, and `VisibilityTimeout: 30`. The `QueueUrl`
is always overridden by the passed in `queueUrl` value.

## App Instance

The factory returns an application instance. The application instance is
an event emitter.

### Instance Methods And Properties

- `isRunning` (boolean): indicates if the application is polling for messages
or not.
- `start()`: initiates the application to start polling for messages.
- `stop()`: initiates the application to stop polling for messages. Any
messages currently being processed will be completed.

### Instance Events

- `error`: fired when an unexpected error occurs. Receives an `Error` object.
- `request-error`: fired when a communication error occurs. Receives an
`Error` object.
- `processing-error`: fired when an error occurs while processing a message.
Receives an object with `error` and `message` properties.
- `fifo-processing-aborted`: fired when a FIFO batch stop processing due to an
error. Receives an object with `message` and `messages` properties. This event
will fire subsequent to a `processing-error` event.
- `received-messages`: fired when a new batch of messages has been received.
Receives an array of SQS message objects.
- `handled-message`: fired when a message has been successfully handled.
Receives an SQS message object.
185 changes: 185 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'use strict';

const { EventEmitter } = require('events');
const opentracing = require('opentracing');
const poller = require('./lib/poller');
const keepAlive = require('./lib/keep-alive');

const {
symFifoSorter,
symHandler,
symLogger,
symQueueUrl,
symReceiveParams,
symRunning,
symSQS,
symTracer
} = require('./lib/symbols');

/**
* A utility that polls an AWS SQS queue for new messages and feeds them through
* a processor. It processes standard queue messages concurrently, but processes
* FIFO based queues in sequential order per partition.
*
* @typedef sqsiphon
*/
const proto = Object.create(EventEmitter.prototype, {
[Symbol.toStringTag]: { value: 'sqsiphon' },

[symRunning]: { value: false, writable: true },
/**
* Indicates if the instance is polling for new messages.
*
* @memberof sqsiphon
* @instance
*/
isRunning: {
get() {
return this[symRunning];
}
},

/**
* Initiates polling for new messages.
*
* @memberof sqsiphon
* @instance
*/
start: {
value: function start() {
if (this.isRunning) {
return;
}
this[symRunning] = true;
keepAlive.call(this);

setImmediate(doPoll.bind(this));

async function doPoll() {
if (this.isRunning === false) {
return;
}
await poller.call(this);
setImmediate(doPoll.bind(this));
}
}
},

/**
* Stops polling for new messages.
*
* @memberof sqsiphon
* @instance
*/
stop: {
value: function stop() {
this[symRunning] = false;
}
}
});

/**
* Fired when an unexpected error has occured.
*
* @event sqsiphon#error
* @type {Error}
*/

/**
* Fired when a communication error has occurred when attempting to retrieve
* messages from SQS.
*
* @event sqsiphon#request-error
* @type {Error}
*/

/**
* Fired when an error has occurred while handling a message. Either the
* message handler has failed or we were unable to delete the message from
* the queue.
*
* @event sqsiphon#processing-error
* @type {object}
* @property {Error} error
* @property {object} message The message that was being processed when the
* erorr occurred.
*/

/**
* Fired when the processing of a FIFO partition has been stopped due to a
* message handling error. The {@see sqsiphon#processing-error} event will be
* fired alongside this event.
*
* @event sqsiphon#fifo-processing-aborted
* @type {object}
* @property {object} message The message that was being handled when the
* error occurred.
* @property {object[]} messages The queue of messages that was being processed.
*/

/**
* Fired when a new batch of messages has been received from SQS.
*
* @event sqsiphon#received-messages
* @type {object[]} An array of SQS message objects.
*/

/**
* Fired when a message has been successfully handled by the user provided
* handler function.
*
* @event sqsiphon#handled-message
* @type {object} The SQS message that was handled.
*/

/** */
const defaultOptions = {
logger: require('abstract-logging'),
sqs: undefined,
queueUrl: undefined,
handler: undefined,
receiveMessageParameters: {
AttributeNames: ['All'],
MaxNumberOfMessages: 10,
MessageAttributeNames: ['All'],
VisibilityTimeout: 30
},
fifoSorter: () => {},
tracer: undefined
};

module.exports = function sqsiphonFactory(options) {
const opts = Object.assign({}, defaultOptions, options);

let tracer;
if (opts.tracer) {
opentracing.initGlobalTracer(opts.tracer);
tracer = opentracing.globalTracer();
} else {
tracer = new opentracing.Tracer();
}

const { sqs, queueUrl, handler, fifoSorter, logger } = opts;
if (!sqs) {
throw Error('must supply `sqs` instance');
}
if (!queueUrl || typeof queueUrl !== 'string') {
throw Error('must supply string for `queueUrl`');
}
if (!handler || Function.prototype.isPrototypeOf(handler) === false) {
throw Error('must provide `handler` function');
}

const receiveParams = { QueueUrl: queueUrl, ...opts.receiveMessageParameters };

const app = Object.create(proto, {
[symLogger]: { value: logger },
[symHandler]: { value: handler },
[symSQS]: { value: sqs },
[symQueueUrl]: { value: queueUrl },
[symReceiveParams]: { value: receiveParams },
[symFifoSorter]: { value: fifoSorter },
[symTracer]: { value: tracer }
});
return app;
};
29 changes: 29 additions & 0 deletions lib/get-messages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const { symReceiveParams, symSQS } = require('./symbols');

/**
* Query the configures SQS queue for new messages.
*
* @param {object} input
* @param {object} input.app An `sqsiphon` application instance that has an
* associated SQS client instance and message receive parameters object.
*
* @returns {object} Will have an `error` property if there was a communication
* error with AWS. Otherwise, will have a `value` property set to an array
* of SQS messages.
*/
module.exports = async function getMessages({ app }) {
const sqs = app[symSQS];
const rececieveParams = app[symReceiveParams];
const messages = [];

try {
const response = await sqs.receiveMessage(rececieveParams).promise();
Array.prototype.push.apply(messages, response.Messages || []);
} catch (error) {
return { error };
}

return { value: messages };
};
24 changes: 24 additions & 0 deletions lib/group-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const _handleMessage = require('./handle-message');

/**
* Iterate a FIFO queue of `messages` in sequential order and feed each
* message through the configured message handler. Any error in processing a
* message will result in the remaining messages being left unprocessed.
*
* @param {object} input
* @param {object[]} input.messages A set of SQS messages from a FIFO partition.
* @param {object} input.app A fully configures `sqsiphon` application instance.
*
* @fires sqsiphon#fifo-processing-aborted
*/
module.exports = async function groupHandler({ messages, app, handleMessage = _handleMessage }) {
for (const message of messages) {
const result = await handleMessage({ message, app });
if (result === false) {
app.emit('fifo-processing-aborted', { message, messages });
break;
}
}
};
35 changes: 35 additions & 0 deletions lib/handle-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

const { symHandler, symQueueUrl, symSQS } = require('./symbols');

/**
* Invokes the user supplied message handler function and removes the message
* from the queue if the handler succeeds.
*
* @param {object} input
* @param {object} input.message An SQS message object.
* @param {object} input.app A fully configured `sqsiphon` instance.
*
* @fires sqsiphon#handled-message
* @fires sqsiphon#processing-error
*
* @returns {boolean} `true` if no error occured, `false` otherwise.
*/
module.exports = async function handleMessage({ message, app }) {
const sqs = app[symSQS];
const queueUrl = app[symQueueUrl];
const handler = app[symHandler];

try {
await handler(message);
await sqs
.deleteMessage({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle })
.promise();
app.emit('handled-message', { message });
} catch (error) {
app.emit('processing-error', { error, message });
return false;
}

return true;
};
8 changes: 8 additions & 0 deletions lib/keep-alive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict';

module.exports = function keepAlive() {
if (this.isRunning === false) {
return;
}
return setImmediate(keepAlive.bind(this));
};
36 changes: 36 additions & 0 deletions lib/partition-messages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

/**
* Given an array of SQS message instances, iterate the messages to look for
* FIFO group identifier attributes. For messages that do not have such
* identifiers, add them to a default partition. For messages that do have
* such identifiers, segregate each message into a parition matching the FIFO
* group identifier of the message.
*
* @param {object} input
* @param {object[]} input.messages An array of SQS message objects.
*
* @returns {object} Standard, non-FIFO, messages are attached to the "default"
* property (partition). Any other properties on this object match the found
* FIFO group identifiers. Each property on the object is an array of SQS
* messages.
*/
module.exports = function partitionMessages({ messages }) {
const partitions = {
default: []
};
for (const message of messages) {
if (!message.Attributes || !message.Attributes.MessageGroupId) {
partitions.default.push(message);
continue;
}
const groupId = message.Attributes.MessageGroupId;
const queue = partitions[groupId];
if (Array.isArray(queue)) {
queue.push(message);
} else {
partitions[groupId] = [message];
}
}
return partitions;
};
62 changes: 62 additions & 0 deletions lib/poller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict';

const { symLogger, symTracer } = require('./symbols');
const _getMessages = require('./get-messages');
const _processMessages = require('./process-messages');

/**
* Polls for new messages on an SQS queue and processes the messages. Must be
* invoked with the `this` context set to an "app" instance.
*
* @fires sqsiphon#request-error
* @fires sqsiphon#error
* @fires sqsiphon#received-messages
*/
module.exports = async function poller({
getMessages = _getMessages,
processMessages = _processMessages
} = {}) {
const app = this;
const log = app[symLogger];
const tracer = app[symTracer];

// TODO: extract any possible parent span and create new one as a child
const span = tracer.startSpan('sqs_poll');

log.trace('polling for new messages');
const getMessagesResult = await getMessages({ app });
if (getMessagesResult.error) {
const error = getMessagesResult.error;
span.setTag('error', true);
span.log({
event: 'error',
'error.object': error,
message: error.message,
stack: error.stack
});

log.trace('encountered communication error', { error });
if (app.listenerCount('request-error') > 0) {
app.emit('request-error', error);
} else {
// If the user isn't listening for the specific event we will fallback
// to the baseline error event so that Node will barf up stacks and such.
app.emit('error', error);
}

span.finish();
return;
}

const messages = getMessagesResult.value;
if (messages.length === 0) {
log.trace('zero messages received');
span.finish();
return;
}
app.emit('received-messages', messages);
log.trace('processing messages', { messagesCount: messages.length });
await processMessages({ messages, app });

span.finish();
};
41 changes: 41 additions & 0 deletions lib/process-messages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';

const _partitionMessages = require('./partition-messages');
const _handleMessage = require('./handle-message');
const _groupHandler = require('./group-handler');
const { symFifoSorter } = require('./symbols');

/**
* Iterates a set of SQS `messages` and feeds them through the configured
* message handler.
*
* @param {object} input
* @param {object[]} input.messages An array of SQS messages.
* @param {object} input.app A fully configured `sqsiphon` application instance.
*/
module.exports = async function processMessages({
messages,
app,
partitionMessages = _partitionMessages,
handleMessage = _handleMessage,
groupHandler = _groupHandler
}) {
const partitions = partitionMessages({ messages });
const { default: defaultPartition, ...fifoPartitions } = partitions;

// We can process the default queue concurrently because it does not contain
// any FIFO queue messages.
const promises = defaultPartition.map(msg => handleMessage({ message: msg, app }));
await Promise.all(promises);

// For FIFO queues we need to process each queue's messages sequentially.
// We also need to _stop_ processing a queue if one of the messages is not
// handled correctly.
const fifoSorter = app[symFifoSorter];
const fifoPromises = [];
for (const partitionId in fifoPartitions) {
const messages = fifoPartitions[partitionId].sort(fifoSorter);
fifoPromises.push(groupHandler({ messages, app }));
}
await Promise.all(fifoPromises);
};
21 changes: 21 additions & 0 deletions lib/symbols.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

const symLogger = Symbol('sqsiphon.logger');
const symRunning = Symbol('sqsiphon.isRunning');
const symSQS = Symbol('sqsiphon.sqs');
const symQueueUrl = Symbol('sqsiphon.queueUrl');
const symReceiveParams = Symbol('sqsiphon.receiveParams');
const symHandler = Symbol('sqsiphon.handler');
const symFifoSorter = Symbol('sqsiphon.fifoSorter');
const symTracer = Symbol('sqsiphon.tracer');

module.exports = {
symLogger,
symRunning,
symSQS,
symQueueUrl,
symReceiveParams,
symHandler,
symFifoSorter,
symTracer
};
36 changes: 36 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "@knockaway/sqsiphon",
"main": "index.js",
"version": "1.0.0",
"homepage": "https://github.com/knockaway/@knockaway/sqsiphon",
"license": "LicenseRef-LICENSE",
"repository": {
"type": "git",
"url": "https://github.com/knockaway/@knockaway/sqsiphon.git"
},
"dependencies": {
"opentracing": "^0.14.4"
},
"devDependencies": {
"tap": "^14.10.7",
"eslint": "^7.3.1",
"eslint-config-prettier": "^6.11.0",
"eslint-plugin-prettier": "^3.1.4",
"husky": "^4.2.5",
"prettier": "^2.0.5"
},
"scripts": {
"check-format": "prettier --list-different '*.js' 'lib/**/*.js' 'test/**/*.js'",
"format": "prettier --write '*.js' 'lib/**/*.js' 'test/**/*.js'",
"lint": "eslint '*.js' 'lib/**/*.js' 'test/**/*.js'",
"test": "LOG_LEVEL=silent tap --no-cov",
"test:cov": "LOG_LEVEL=silent tap",
"test:cov:html": "LOG_LEVEL=silent tap --coverage-report=html",
"test:watch": "LOG_LEVEL=silent tap -n -w --no-coverage-report"
},
"husky": {
"hooks": {
"pre-commit": "npm run lint && npm run test"
}
}
}
52 changes: 52 additions & 0 deletions test/lib/get-messages.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const tap = require('tap');
const symbols = require('../../lib/symbols');
const getMessages = require('../../lib/get-messages');

tap.test('returns error if cannot communicate with sqs', async t => {
const sqs = {
receiveMessage() {
return this;
},
promise() {
return Promise.reject(Error('broken sqs'));
}
};
const app = {
[symbols.symSQS]: sqs,
[symbols.symReceiveParams]: { QueueUrl: 'http://sqs.example.com' }
};

const response = await getMessages({ app });
t.match(response, {
error: {
message: 'broken sqs'
}
});
});

tap.test('returns messages on success', async t => {
const sqs = {
receiveMessage(params) {
t.deepEqual(params, { QueueUrl: 'http://sqs.example.com' });
return this;
},
promise() {
return Promise.resolve({ Messages: [{ message: 'one' }] });
}
};
const app = {
[symbols.symSQS]: sqs,
[symbols.symReceiveParams]: { QueueUrl: 'http://sqs.example.com' }
};

const response = await getMessages({ app });
t.deepEqual(response, {
value: [
{
message: 'one'
}
]
});
});
33 changes: 33 additions & 0 deletions test/lib/group-handler.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';

const tap = require('tap');
const groupHandler = require('../../lib/group-handler');

tap.test('emits aborted for failed handler', async t => {
const app = {
emit(event, body) {
t.is(event, 'fifo-processing-aborted');
t.deepEqual(body, { message: 'foo', messages: ['foo'] });
}
};
async function handleMessage() {
return false;
}

await groupHandler({ messages: ['foo'], app, handleMessage });
});

tap.test('does not emit for all successes', async t => {
const app = {
emit() {
t.fail('should not be invoked');
}
};
async function handleMessage(params) {
t.is(params.app, app);
t.is(params.message, 'foo');
return true;
}

await groupHandler({ messages: ['foo'], app, handleMessage });
});
140 changes: 140 additions & 0 deletions test/lib/handle-message.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
'use strict';

const tap = require('tap');
const symbols = require('../../lib/symbols');
const handleMessage = require('../../lib/handle-message');

tap.test('emits processing-error for bad handler', async t => {
const sqs = {
deleteMessage() {
t.fail('should not be invoked');
return this;
},
promise() {
t.fail('should not be invoked');
return Promise.reject(Error('broken sqs'));
}
};
async function handler() {
throw Error('broken handler');
}
const app = {
[symbols.symSQS]: sqs,
[symbols.symQueueUrl]: 'http://sqs.example.com',
[symbols.symHandler]: handler,

emit(event, body) {
t.is(event, 'processing-error');
t.match(body, {
message: 'foo',
error: {
message: 'broken handler'
}
});
}
};

const result = await handleMessage({ message: 'foo', app });
t.is(result, false);
});

tap.test('emits processing-error for broken sqs', async t => {
const sqs = {
deleteMessage() {
return this;
},
promise() {
return Promise.reject(Error('broken sqs'));
}
};
async function handler() {
return true;
}
const app = {
[symbols.symSQS]: sqs,
[symbols.symQueueUrl]: 'http://sqs.example.com',
[symbols.symHandler]: handler,

emit(event, body) {
t.is(event, 'processing-error');
t.match(body, {
message: 'foo',
error: {
message: 'broken sqs'
}
});
}
};

const result = await handleMessage({ message: 'foo', app });
t.is(result, false);
});

tap.test('emits handled-message on success', async t => {
const sqs = {
deleteMessage(params) {
t.deepEqual(params, {
QueueUrl: 'http://sqs.example.com',
ReceiptHandle: 'handle-1'
});
return this;
},
promise() {
return Promise.resolve();
}
};
async function handler(message) {
t.deepEqual(message, { foo: 'foo', ReceiptHandle: 'handle-1' });
return true;
}
const app = {
[symbols.symSQS]: sqs,
[symbols.symQueueUrl]: 'http://sqs.example.com',
[symbols.symHandler]: handler,

emit(event, body) {
t.is(event, 'handled-message');
t.deepEqual(body, { message: { foo: 'foo', ReceiptHandle: 'handle-1' } });
}
};

const result = await handleMessage({ message: { foo: 'foo', ReceiptHandle: 'handle-1' }, app });
t.is(result, true);
});

tap.test('does not throw for rejections', async t => {
const sqs = {
deleteMessage() {
return this;
},
promise() {
return Promise.reject(Error('sqs failed'));
}
};
async function handler() {
return true;
}
const app = {
[symbols.symSQS]: sqs,
[symbols.symQueueUrl]: 'http://sqs.example.com',
[symbols.symHandler]: handler,

emit(event, body) {
t.is(event, 'processing-error');
t.deepEqual(body, {
error: { message: 'sqs failed', name: 'Error' },
message: { foo: 'foo', ReceiptHandle: 'handle-1' }
});
}
};

try {
const result = await handleMessage({
message: { foo: 'foo', ReceiptHandle: 'handle-1' },
app
});
t.is(result, false);
} catch (error) {
t.error(error);
}
});
23 changes: 23 additions & 0 deletions test/lib/partition-messages.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const tap = require('tap');
const partitionMessages = require('../../lib/partition-messages');

tap.test('partitions messages based on attributes', async t => {
const messages = [
{ Attributes: { MessageGroupId: 'one' }, Body: 'foo' },
{ Body: 'non-fifo' },
{ Attributes: { MessageGroupId: 'one' }, Body: 'bar' },
{ Attributes: { MessageGroupId: 'two' }, Body: 'foo' }
];
const partitioned = partitionMessages({ messages });

t.deepEqual(partitioned, {
default: [{ Body: 'non-fifo' }],
one: [
{ Attributes: { MessageGroupId: 'one' }, Body: 'foo' },
{ Attributes: { MessageGroupId: 'one' }, Body: 'bar' }
],
two: [{ Attributes: { MessageGroupId: 'two' }, Body: 'foo' }]
});
});
99 changes: 99 additions & 0 deletions test/lib/poller.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
'use strict';

const tap = require('tap');
const opentracing = require('opentracing');
const symbols = require('../../lib/symbols');
const poller = require('../../lib/poller');

tap.test('fires error event if cannot get messages', async t => {
const app = {
[symbols.symLogger]: { trace() {} },
[symbols.symTracer]: new opentracing.MockTracer(),
listenerCount() {
return 0;
},
emit(event, body) {
t.is(event, 'error');
t.match(body, /broken messages/);
}
};
async function getMessages() {
return { error: Error('broken messages') };
}

const result = await poller.call(app, { getMessages });
t.is(result, undefined);
});

tap.test('fires request-error event if cannot get messages and listener registered', async t => {
const app = {
[symbols.symLogger]: { trace() {} },
[symbols.symTracer]: new opentracing.MockTracer(),
listenerCount(event) {
t.is(event, 'request-error');
return 1;
},
emit(event, body) {
t.is(event, 'request-error');
t.match(body, /broken messages/);
}
};
async function getMessages() {
return { error: Error('broken messages') };
}

const result = await poller.call(app, { getMessages });
t.is(result, undefined);
});

tap.test('merely logs when no messages to process', async t => {
const app = {
[symbols.symLogger]: {
trace(msg) {
t.true(['polling for new messages', 'zero messages received'].includes(msg));
}
},
[symbols.symTracer]: new opentracing.MockTracer(),
listenerCount() {
t.fail('should not be invoked');
},
emit() {
t.fail('should not be invoked');
}
};
async function getMessages() {
return { value: [] };
}

const result = await poller.call(app, { getMessages });
t.is(result, undefined);
});

tap.test('emits received-messages and invoked processor', async t => {
const app = {
[symbols.symLogger]: {
trace(msg) {
t.true(['polling for new messages', 'processing messages'].includes(msg));
}
},
[symbols.symTracer]: new opentracing.MockTracer(),
listenerCount() {
t.fail('should not be invoked');
},
emit(event, body) {
t.is(event, 'received-messages');
t.deepEqual(body, [{ foo: 'foo' }]);
}
};
async function getMessages({ app: _app }) {
t.is(_app, app);
return { value: [{ foo: 'foo' }] };
}
async function processMessages({ messages, app: _app }) {
t.is(_app, app);
t.deepEqual(messages, [{ foo: 'foo' }]);
}

const result = await poller.call(app, { getMessages, processMessages });
t.is(result, undefined);
});
101 changes: 101 additions & 0 deletions test/lib/process-messages.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
'use strict';

const tap = require('tap');
const symbols = require('../../lib/symbols');
const processMessages = require('../../lib/process-messages');

tap.test('throws if cannot parition messages', async t => {
function partitionMessages() {
throw Error('broken partition');
}
try {
await processMessages({ messages: ['foo'], partitionMessages });
t.fail('should not be invoked');
} catch (error) {
t.match(error, /broken partition/);
}
});

tap.test('throws if cannot handle messages', async t => {
function partitionMessages({ messages }) {
t.deepEqual(messages, ['foo']);
return { default: ['foo'] };
}

async function handleMessage() {
throw Error('cannot handle message');
}

try {
await processMessages({ messages: ['foo'], partitionMessages, handleMessage });
t.fail('should not be invoked');
} catch (error) {
t.match(error, /cannot handle message/);
}
});

tap.test('throws if cannot handle fifo messages', async t => {
const app = {
[symbols.symFifoSorter]: () => {}
};

function partitionMessages({ messages }) {
t.deepEqual(messages, ['foo']);
return { default: [], fifo: ['foo'] };
}

async function handleMessage() {
t.fail('should not be invoked');
}

async function groupHandler() {
throw Error('cannot handle fifo message');
}

try {
await processMessages({
messages: ['foo'],
app,
partitionMessages,
handleMessage,
groupHandler
});
t.fail('should not be invoked');
} catch (error) {
t.match(error, /cannot handle fifo message/);
}
});

tap.test('does not throw on all success', async t => {
const app = {
[symbols.symFifoSorter]: () => {}
};

function partitionMessages({ messages }) {
t.deepEqual(messages, ['foo']);
return { default: ['foo'], fifo: ['bar'] };
}

async function handleMessage({ message, app: _app }) {
t.is(_app, app);
t.is(message, 'foo');
}

async function groupHandler({ messages, app: _app }) {
t.is(_app, app);
t.deepEqual(messages, ['bar']);
}

try {
await processMessages({
messages: ['foo'],
app,
partitionMessages,
handleMessage,
groupHandler
});
t.pass();
} catch (error) {
t.error(error);
}
});

0 comments on commit 09ba958

Please sign in to comment.