Skip to content

Commit

Permalink
Merge pull request #21 from pablovilas/master
Browse files Browse the repository at this point in the history
Added 'AttributeNames' option
  • Loading branch information
hjerling committed Dec 30, 2015
2 parents a3d1934 + e6c57c1 commit 6b37673
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Creates a new SQS consumer.
* `queueUrl` - _String_ - The SQS queue URL
* `region` - _String_ - The AWS region (default `eu-west-1`)
* `handleMessage` - _Function_ - A function to be called whenever a message is received. Receives an SQS message object as its first argument and a function to call when the message has been handled as its second argument (i.e. `handleMessage(message, done)`).
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).
* `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ function validate(options) {
* @param {string} options.queueUrl
* @param {string} options.region
* @param {function} options.handleMessage
* @param {array} options.attributeNames
* @param {array} options.messageAttributeNames
* @param {number} options.batchSize
* @param {object} options.sqs
Expand All @@ -48,6 +49,7 @@ function Consumer(options) {

this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
this.stopped = true;
this.batchSize = options.batchSize || 1;
Expand Down Expand Up @@ -93,6 +95,7 @@ Consumer.prototype.stop = function () {
Consumer.prototype._poll = function () {
var receiveParams = {
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
MessageAttributeNames: this.messageAttributeNames,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
Expand Down
40 changes: 40 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ describe('Consumer', function () {
setTimeout(function () {
sinon.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
AttributeNames: [],
MessageAttributeNames: ['attribute-1', 'attribute-2'],
MaxNumberOfMessages: 3,
WaitTimeSeconds: 20,
Expand All @@ -249,6 +250,45 @@ describe('Consumer', function () {
done();
}, 10);
});

it('consumes messages with message attibute \'ApproximateReceiveCount\'', function (done) {

var messageWithAttr = {
ReceiptHandle: 'receipt-handle-1',
MessageId: '1',
Body: 'body-1',
Attributes: {
ApproximateReceiveCount: 1
}
};

sqs.receiveMessage.yieldsAsync(null, {
Messages: [messageWithAttr]
});

consumer = new Consumer({
queueUrl: 'some-queue-url',
attributeNames: ['ApproximateReceiveCount'],
region: 'some-region',
handleMessage: handleMessage,
sqs: sqs
});

consumer.on('message_received', function (message) {
sinon.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
AttributeNames: ['ApproximateReceiveCount'],
MessageAttributeNames: [],
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
VisibilityTimeout: undefined
});
assert.equal(message, messageWithAttr);
done();
});

consumer.start();
});
});

describe('.stop', function () {
Expand Down

0 comments on commit 6b37673

Please sign in to comment.