Skip to content

Commit

Permalink
Merge pull request #175 from MoonActive/feature/new-polling-wait-time-ms
Browse files Browse the repository at this point in the history
Feature/new polling wait time ms
  • Loading branch information
nspragg authored Sep 13, 2019
2 parents ac7b650 + 1db26a8 commit ee31f00
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Creates a new SQS consumer.
* `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`).
* `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.
* `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`).
* `pollingWaitTimeMs` - _Number_ - The duration (in milliseconds) to wait before repolling the queue (defaults to `0`).
* `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually

### `consumer.start()`
Expand Down
9 changes: 6 additions & 3 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export interface ConsumerOptions {
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
sqs?: SQS;
region?: string;
Expand All @@ -98,6 +99,7 @@ export class Consumer extends EventEmitter {
private visibilityTimeout: number;
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private terminateVisibilityTimeout: boolean;
private sqs: SQS;

Expand All @@ -116,6 +118,7 @@ export class Consumer extends EventEmitter {
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
this.waitTimeSeconds = options.waitTimeSeconds || 20;
this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs || 0;

this.sqs = options.sqs || new SQS({
region: options.region || process.env.AWS_REGION || 'eu-west-1'
Expand Down Expand Up @@ -272,18 +275,18 @@ export class Consumer extends EventEmitter {
VisibilityTimeout: this.visibilityTimeout
};

let pollingTimeout = 0;
let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage(receiveParams)
.then(this.handleSqsResponse)
.catch((err) => {
this.emit('error', err);
if (isConnectionError(err)) {
debug('There was an authentication error. Pausing before retrying.');
pollingTimeout = this.authenticationErrorTimeout;
currentPollingTimeout = this.authenticationErrorTimeout;
}
return;
}).then(() => {
setTimeout(this.poll, pollingTimeout);
setTimeout(this.poll, currentPollingTimeout);
}).catch((err) => {
this.emit('error', err);
});
Expand Down
26 changes: 26 additions & 0 deletions test/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Consumer } from '../src/index';
const sandbox = sinon.createSandbox();

const AUTHENTICATION_ERROR_TIMEOUT = 20;
const POLLING_TIMEOUT = 100;

function stubResolve(value?: any): any {
return sandbox
Expand Down Expand Up @@ -314,6 +315,31 @@ describe('Consumer', () => {
});
});

it('waits before repolling when a polling timeout is set', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage,
sqs,
authenticationErrorTimeout: 20,
pollingWaitTimeMs: 100
});
return new Promise((resolve) => {
const timings = [];
const timeListener = sandbox.stub().callsFake(() => timings.push(new Date()));

timeListener.onThirdCall().callsFake(() => {
consumer.stop();
sandbox.assert.calledThrice(sqs.receiveMessage);
assert.isAtLeast(timings[1] - timings[0], POLLING_TIMEOUT);
resolve();
});

consumer.on('message_received', timeListener);
consumer.start();
});
});

it('fires a message_received event when a message is received', async () => {
consumer.start();
const message = await pEvent(consumer, 'message_received');
Expand Down

0 comments on commit ee31f00

Please sign in to comment.