diff --git a/README.md b/README.md index 82e1dc2..3a7d08a 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ $ aws-move-queue-messages With all optional CLI arguments: ```sh -$ aws-move-queue-messages -r [AWS-REGION] -y +$ aws-move-queue-messages -r [AWS-REGION] -m 100 -y ``` Any CLI argument or option you do not specify will fallback to a CLI prompt. The `-y` CLI option will answer the confirmation prompt automatically with "yes". diff --git a/package.json b/package.json index 44d2983..0c69d7e 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "license": "MIT", "scripts": { "lint": "eslint . --cache", + "lint:fix": "eslint . --fix --cache", "test": "jest", "test:watch": "yarn test --watch --onlyChanged", "test:coverage": "yarn test --coverage", diff --git a/src/helper.js b/src/helper.js index dc9ff35..4f24c50 100644 --- a/src/helper.js +++ b/src/helper.js @@ -6,6 +6,16 @@ const validateUrl = (input) => { return valid; }; +const validateMaxMessages = (input) => { + let valid = true; + if (!input.match(/^[1-9][0-9]*$/)) { + valid = 'Please enter a valid max number of messages greater than 0!'; + } + + return valid; +}; + module.exports = { validateUrl, + validateMaxMessages, }; diff --git a/src/helper.spec.js b/src/helper.spec.js index 2191218..827521a 100644 --- a/src/helper.spec.js +++ b/src/helper.spec.js @@ -2,7 +2,7 @@ * @jest-environment node */ -const { validateUrl } = require('./helper'); +const { validateUrl, validateMaxMessages } = require('./helper'); describe('validateUrl', () => { test('to pass invalid url', () => { @@ -20,3 +20,30 @@ describe('validateUrl', () => { expect(validate).toBe(true); }); }); + +describe('validateMaxMessages', () => { + test('to pass an invalid value with numeric and alpha chars', () => { + const validate = validateMaxMessages('1abc'); + expect(validate).toEqual('Please enter a valid max number of messages greater than 0!'); + }); + + test('to pass an invalid value with only alpha chars', () => { + const validate = validateMaxMessages('abc'); + expect(validate).toEqual('Please enter a valid max number of messages greater than 0!'); + }); + + test('to pass an empty value', () => { + const validate = validateMaxMessages(''); + expect(validate).toEqual('Please enter a valid max number of messages greater than 0!'); + }); + + test('to pass negative number', () => { + const validate = validateMaxMessages('-1'); + expect(validate).toEqual('Please enter a valid max number of messages greater than 0!'); + }); + + test('to pass valid number', () => { + const validate = validateMaxMessages('10'); + expect(validate).toBe(true); + }); +}); diff --git a/src/index.js b/src/index.js index 9d6d313..32d11f7 100644 --- a/src/index.js +++ b/src/index.js @@ -3,7 +3,7 @@ const aws = require('aws-sdk'); const program = require('commander'); const { prompt } = require('inquirer'); -const { validateUrl } = require('./helper'); +const { validateMaxMessages, validateUrl } = require('./helper'); const { handle } = require('./main'); const { createClient } = require('./sqs'); @@ -28,13 +28,31 @@ const toQuestion = { validate: validateUrl, }; +const maxQuestion = { + type: 'input', + name: 'maxMessages', + message: 'Enter the max number of messages:', + validate: validateMaxMessages, +}; + const handleAction = (from, to, options) => { const questions = []; + let copy = false; + let copiedOrMoved = 'moved'; if (!options.region) { questions.push(regionQuestion); } + if (options.copy) { + copy = true; + copiedOrMoved = 'copied'; + } + + if (!options.maxMessages) { + questions.push(maxQuestion); + } + if (!from) { questions.push(fromQuestion); } @@ -45,6 +63,7 @@ const handleAction = (from, to, options) => { prompt(questions).then(async ({ awsRegion = options.region, + maxMessages = options.maxMessages, sourceQueueUrl = from, targetQueueUrl = to, }) => { @@ -57,6 +76,8 @@ const handleAction = (from, to, options) => { count = await handle({ sourceQueueUrl, targetQueueUrl, + maxMessages, + copy, sqs, prompt, skipPrompt: options.yes, @@ -66,13 +87,15 @@ const handleAction = (from, to, options) => { process.exit(1); } - console.log(`${count} messages moved from ${sourceQueueUrl} to ${targetQueueUrl}!`); + console.log(`${count} messages ${copiedOrMoved} from ${sourceQueueUrl} to ${targetQueueUrl}!`); }); }; program .arguments('[from] [to]') .option('-r, --region [value]', 'The AWS region') + .option('-m, --maxMessages [value]', 'Max number of messages') .option('-y, --yes', 'Non interactive message moving') + .option('-c, --copy', 'Copy messages to new queue, do not delete') .action(handleAction) .parse(process.argv); diff --git a/src/main.js b/src/main.js index 9e84174..27e858f 100644 --- a/src/main.js +++ b/src/main.js @@ -3,6 +3,8 @@ const { Spinner } = require('clui'); const handle = async ({ sourceQueueUrl, targetQueueUrl, + maxMessages, + copy, sqs, prompt, skipPrompt, @@ -10,16 +12,27 @@ const handle = async ({ const count = await sqs.getCount(sourceQueueUrl); await sqs.getCount(targetQueueUrl); - if (count === 0) { + if (parseInt(count, 10) === 0) { throw new Error(`The queue ${sourceQueueUrl} is empty!`); } + let copyOrMove = 'move'; + if (copy) { + copyOrMove = 'copy'; + } + + const maxCount = parseInt(maxMessages, 10); + let moveCount = count; + if (count > maxCount) { + moveCount = maxCount; + } + if (!skipPrompt) { const { move } = await prompt([ { type: 'confirm', name: 'move', - message: `Do you want to move ${count} messages?`, + message: `Do you want to ${copyOrMove} ${moveCount} of ${count} messages?`, default: false, }, ]); @@ -29,15 +42,13 @@ const handle = async ({ } } - const spinner = new Spinner(`Moving ${count} messages...`); + const spinner = new Spinner(`Moving ${moveCount} messages...`); spinner.start(); const promises = []; - for (let i = 0; i < count; i += 1) { - promises.push(sqs.moveMessage(sourceQueueUrl, targetQueueUrl)); - } - + promises.push(sqs.moveMessage(sourceQueueUrl, targetQueueUrl, copy)); + await Promise.all(promises).then(() => { spinner.stop(); }).catch((e) => { @@ -45,7 +56,7 @@ const handle = async ({ throw new Error(e.message); }); - return count; + return moveCount; }; module.exports = { diff --git a/src/main.spec.js b/src/main.spec.js index 3373f66..c8ff312 100644 --- a/src/main.spec.js +++ b/src/main.spec.js @@ -5,41 +5,97 @@ const { handle } = require('./main'); describe('handle', () => { + const sourceQueueUrl = 'https://sqs.region.amazonaws.com/123456789/srcQueue'; + const targetQueueUrl = 'https://sqs.region.amazonaws.com/123456789/targetQueue'; + const maxMessages = 5; + let getCountVal = 3; + test('to move messages', async () => { + const sqs = { + getCount: jest.fn(() => getCountVal), + moveMessage: jest.fn(), + }; + + const prompt = jest.fn(() => ({ move: true })); + + await handle({ + sourceQueueUrl, + targetQueueUrl, + maxMessages, + sqs, + prompt, + }); + + expect(sqs.getCount).toHaveBeenCalled(); + expect(sqs.moveMessage).toHaveBeenCalledTimes(getCountVal); + }); + + test('to move messages with maxCount', async () => { + const sqs = { + getCount: jest.fn(() => getCountVal), + moveMessage: jest.fn(), + }; + + const prompt = jest.fn(() => ({ move: true })); + const copy = false; + + await handle({ + sourceQueueUrl: sourceQueueUrl, + targetQueueUrl: targetQueueUrl, + copy, + sqs, + prompt, + }); + + expect(sqs.getCount).toHaveBeenCalled(); + expect(sqs.moveMessage).toHaveBeenCalledWith(sourceQueueUrl, targetQueueUrl, copy); + }); + + test('to copy messages', async () => { const sqs = { getCount: jest.fn(() => 3), moveMessage: jest.fn(), }; const prompt = jest.fn(() => ({ move: true })); + const copy = true; + getCountVal = 10; await handle({ - sourceQueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', - targetQueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + sourceQueueUrl, + targetQueueUrl, + maxMessages, + copy, sqs, prompt, }); expect(sqs.getCount).toHaveBeenCalled(); + expect(sqs.moveMessage).toHaveBeenCalledWith(sourceQueueUrl, targetQueueUrl, copy); + expect(sqs.moveMessage).toHaveBeenCalledTimes(maxMessages); }); test('to move messages without prompt', async () => { const sqs = { - getCount: jest.fn(() => 3), + getCount: jest.fn(() => getCountVal), moveMessage: jest.fn(), }; const prompt = jest.fn(); + const copy = false; await handle({ - sourceQueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', - targetQueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + sourceQueueUrl, + targetQueueUrl, + maxMessages, + copy, sqs, prompt, skipPrompt: true, }); expect(sqs.getCount).toHaveBeenCalled(); + expect(sqs.moveMessage).toHaveBeenCalledWith(sourceQueueUrl, targetQueueUrl, copy); }); describe('reject promise', () => { @@ -50,10 +106,13 @@ describe('handle', () => { }; const prompt = jest.fn(() => ({ move: true })); + const copy = false; expect(handle({ - sourceQueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', - targetQueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + sourceQueueUrl, + targetQueueUrl, + maxMessages, + copy, sqs, prompt, })).rejects.toEqual({ message: 'getCount' }); @@ -66,10 +125,13 @@ describe('handle', () => { }; const prompt = jest.fn(() => ({ move: true })); + const copy = false; expect(handle({ - sourceQueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', - targetQueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + sourceQueueUrl, + targetQueueUrl, + maxMessages, + copy, sqs, prompt, })).rejects.toEqual(new Error('moveMessage')); @@ -82,10 +144,13 @@ describe('handle', () => { }; const prompt = jest.fn(() => ({ move: true })); + const copy = false; expect(handle({ - sourceQueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', - targetQueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + sourceQueueUrl, + targetQueueUrl, + maxMessages, + copy, sqs, prompt, })).rejects.toEqual(new Error('The queue https://sqs.region.amazonaws.com/123456789/srcQueue is empty!')); diff --git a/src/sqs.js b/src/sqs.js index 6cb41b7..8ac5ae0 100644 --- a/src/sqs.js +++ b/src/sqs.js @@ -1,14 +1,14 @@ const createClient = (sqs) => { - const sendMessage = (QueueUrl, MessageBody) => new Promise((resolve, reject) => { + const sendMessage = (QueueUrl, MessageBody, MessageAttributes) => new Promise((resolve, rej) => { sqs.sendMessage( - { QueueUrl, MessageBody }, - (error, data) => (error ? reject(error) : resolve(data)), + { QueueUrl, MessageBody, MessageAttributes }, + (error, data) => (error ? rej(error) : resolve(data)), ); }); const receiveMessage = QueueUrl => new Promise((resolve, reject) => { sqs.receiveMessage( - { QueueUrl }, + { QueueUrl, MessageAttributeNames: ['All'] }, (error, data) => (error ? reject(error) : resolve(data.Messages[0])), ); }); @@ -36,7 +36,7 @@ const createClient = (sqs) => { ); }); - const moveMessage = (sourceQueueUrl, targetQueueUrl) => ( + const moveMessage = (sourceQueueUrl, targetQueueUrl, copy) => ( new Promise(async (resolve, reject) => { try { const receivedMessage = await receiveMessage(sourceQueueUrl); @@ -45,10 +45,26 @@ const createClient = (sqs) => { throw 'Queue is empty'; // eslint-disable-line } - const { Body, ReceiptHandle } = receivedMessage; + const { Body, ReceiptHandle, MessageAttributes } = receivedMessage; + + const deleteArrays = (obj) => { + if (typeof obj !== 'undefined') { + Object.keys(obj).forEach((key) => { + if (Array.isArray(obj[key])) { + // Remove invalid Array values in MessageAttributes + delete obj[key]; // eslint-disable-line + } else if (typeof obj[key] === 'object') { + deleteArrays(obj[key]); + } + }); + } + }; + deleteArrays(MessageAttributes); - await sendMessage(targetQueueUrl, Body); - await deleteMessage(sourceQueueUrl, ReceiptHandle); + await sendMessage(targetQueueUrl, Body, MessageAttributes); + if (!copy) { + await deleteMessage(sourceQueueUrl, ReceiptHandle); + } resolve(ReceiptHandle); } catch (error) { diff --git a/src/sqs.spec.js b/src/sqs.spec.js index 8c20373..32ed272 100644 --- a/src/sqs.spec.js +++ b/src/sqs.spec.js @@ -10,6 +10,9 @@ const mockCallbackFunction = (error, data) => ( } ); +const sourceQueueUrl = 'https://sqs.region.amazonaws.com/123456789/srcQueue'; +const targetQueueUrl = 'https://sqs.region.amazonaws.com/123456789/targetQueue'; + describe('getCount', () => { test('to resolve promise', async () => { const sqs = createClient({ @@ -19,7 +22,7 @@ describe('getCount', () => { ), }); - const count = await sqs.getCount('https://sqs.region.amazonaws.com/123456789/queue'); + const count = await sqs.getCount(sourceQueueUrl); expect(count).toEqual(3); }); @@ -28,7 +31,7 @@ describe('getCount', () => { getQueueAttributes: mockCallbackFunction({ message: 'error' }, null), }); - expect(sqs.getCount('https://sqs.region.amazonaws.com/123456789/queue')).rejects.toEqual({ message: 'error' }); + expect(sqs.getCount(sourceQueueUrl)).rejects.toEqual({ message: 'error' }); }); test('to pass queue url', async () => { @@ -42,10 +45,10 @@ describe('getCount', () => { const sqs = createClient(mockSqs); - await sqs.getCount('https://sqs.region.amazonaws.com/123456789/queue'); + await sqs.getCount(sourceQueueUrl); expect(mockSqs.getQueueAttributes).toHaveBeenCalledWith( { - QueueUrl: 'https://sqs.region.amazonaws.com/123456789/queue', + QueueUrl: sourceQueueUrl, AttributeNames: [ 'ApproximateNumberOfMessages', ], @@ -63,6 +66,7 @@ describe('moveMessage', () => { { Body: 'Body', ReceiptHandle: 'ReceiptHandle', + MessageAttributes: 'MessageAttributes', }, ], }), @@ -70,10 +74,24 @@ describe('moveMessage', () => { deleteMessage: mockCallbackFunction(null, {}), }); - const handle = await sqs.moveMessage( - 'https://sqs.region.amazonaws.com/123456789/srcQueue', - 'https://sqs.region.amazonaws.com/123456789/targetQueue', - ); + const handle = await sqs.moveMessage(sourceQueueUrl, targetQueueUrl, false); + expect(handle).toEqual('ReceiptHandle'); + }); + + test('to resolve promise - copy', async () => { + const sqs = createClient({ + receiveMessage: mockCallbackFunction(null, { + Messages: [ + { + Body: 'Body', + ReceiptHandle: 'ReceiptHandle', + }, + ], + }), + sendMessage: mockCallbackFunction(null, {}), + }); + + const handle = await sqs.moveMessage(sourceQueueUrl, targetQueueUrl, true); expect(handle).toEqual('ReceiptHandle'); }); @@ -84,10 +102,7 @@ describe('moveMessage', () => { deleteMessage: mockCallbackFunction(null, {}), }); - expect(sqs.moveMessage( - 'https://sqs.region.amazonaws.com/123456789/srcQueue', - 'https://sqs.region.amazonaws.com/123456789/targetQueue', - )).rejects.toEqual({ message: 'error' }); + expect(sqs.moveMessage(sourceQueueUrl, targetQueueUrl, false)).rejects.toEqual({ message: 'error' }); }); test('to call receiveMessage with expected parameters', async () => { @@ -107,23 +122,60 @@ describe('moveMessage', () => { const sqs = createClient(mockSqs); - await sqs.moveMessage( - 'https://sqs.region.amazonaws.com/123456789/srcQueue', - 'https://sqs.region.amazonaws.com/123456789/targetQueue', - ); + await sqs.moveMessage(sourceQueueUrl, targetQueueUrl, false); expect(mockSqs.receiveMessage).toHaveBeenCalledWith( - { QueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue' }, + { + QueueUrl: sourceQueueUrl, + MessageAttributeNames: ['All'], + }, expect.any(Function), ); }); test('to call sendMessage with expected parameters', async () => { + const messageAttributes = 'MessageAttributes'; + const mockSqs = { + receiveMessage: mockCallbackFunction(null, { + Messages: [ + { + Body: 'Body', + ReceiptHandle: 'ReceiptHandle', + MessageAttributes: messageAttributes, + }, + ], + }), + sendMessage: mockCallbackFunction(null, {}), + deleteMessage: mockCallbackFunction(null, {}), + }; + jest.spyOn(mockSqs, 'sendMessage'); + + const sqs = createClient(mockSqs); + + await sqs.moveMessage(sourceQueueUrl, targetQueueUrl, false); + expect(mockSqs.sendMessage).toHaveBeenCalledWith( + { + MessageBody: 'Body', + QueueUrl: targetQueueUrl, + MessageAttributes: messageAttributes, + }, + expect.any(Function), + ); + }); + + test('to call sendMessage with MessageAttributes', async () => { + const messageAttributes = { + messageType: { + Type: 'String', + StringValue: 'messageType1', + }, + }; const mockSqs = { receiveMessage: mockCallbackFunction(null, { Messages: [ { Body: 'Body', ReceiptHandle: 'ReceiptHandle', + MessageAttributes: messageAttributes, }, ], }), @@ -134,14 +186,55 @@ describe('moveMessage', () => { const sqs = createClient(mockSqs); - await sqs.moveMessage( - 'https://sqs.region.amazonaws.com/123456789/srcQueue', - 'https://sqs.region.amazonaws.com/123456789/targetQueue', + await sqs.moveMessage(sourceQueueUrl, targetQueueUrl); + expect(mockSqs.sendMessage).toHaveBeenCalledWith( + { + MessageBody: 'Body', + QueueUrl: targetQueueUrl, + MessageAttributes: messageAttributes, + }, + expect.any(Function), ); + }); + + test('to call sendMessage with MessageAttributes adjusted', async () => { + const expectedMessageAttributes = { + messageType: { + Type: 'String', + StringValue: 'messageType1', + }, + }; + + const messageAttributes = { + messageType: { + Type: 'String', + StringValue: 'messageType1', + }, + }; + + const mockSqs = { + receiveMessage: mockCallbackFunction(null, { + Messages: [ + { + Body: 'Body', + ReceiptHandle: 'ReceiptHandle', + MessageAttributes: messageAttributes, + }, + ], + }), + sendMessage: mockCallbackFunction(null, {}), + deleteMessage: mockCallbackFunction(null, {}), + }; + jest.spyOn(mockSqs, 'sendMessage'); + + const sqs = createClient(mockSqs); + + await sqs.moveMessage(sourceQueueUrl, targetQueueUrl); expect(mockSqs.sendMessage).toHaveBeenCalledWith( { MessageBody: 'Body', - QueueUrl: 'https://sqs.region.amazonaws.com/123456789/targetQueue', + QueueUrl: targetQueueUrl, + MessageAttributes: expectedMessageAttributes, }, expect.any(Function), ); @@ -164,14 +257,11 @@ describe('moveMessage', () => { const sqs = createClient(mockSqs); - await sqs.moveMessage( - 'https://sqs.region.amazonaws.com/123456789/srcQueue', - 'https://sqs.region.amazonaws.com/123456789/targetQueue', - ); + await sqs.moveMessage(sourceQueueUrl, targetQueueUrl, false); expect(mockSqs.deleteMessage).toHaveBeenCalledWith( { ReceiptHandle: 'ReceiptHandle', - QueueUrl: 'https://sqs.region.amazonaws.com/123456789/srcQueue', + QueueUrl: sourceQueueUrl, }, expect.any(Function), );