Skip to content

Commit

Permalink
Merge pull request #3 from LOKE/feature/update-aws-sdk
Browse files Browse the repository at this point in the history
[ORD-1813] Upgrade AWS SDK to v3
  • Loading branch information
J-tt authored Aug 1, 2024
2 parents a4a8cdb + 01304bc commit 8fb201e
Show file tree
Hide file tree
Showing 6 changed files with 3,723 additions and 1,130 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/npm-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

strategy:
matrix:
node-version: [14.x, 16.x, 18.x]
node-version: [18.x, 20.x]

steps:
- uses: actions/checkout@v3
Expand All @@ -37,7 +37,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: 14.x
node-version: 18.x
- run: npm install
- uses: JS-DevTools/npm-publish@v1
with:
Expand Down
1 change: 0 additions & 1 deletion lib/rabbit.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import test, { ExecutionContext } from "ava";
import { connect } from "amqplib";
import { ulid } from "ulid";
import { AbortController } from "node-abort-controller";

import { RabbitHelper } from "./rabbit";

Expand Down
66 changes: 36 additions & 30 deletions lib/sqs.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import test from "ava";
import SQS from "aws-sdk/clients/sqs";
import { AbortController } from "node-abort-controller";
import { SQSClient } from "@aws-sdk/client-sqs";
import { ulid } from "ulid";

import { SQSData, SQSHelper } from "./sqs";
import { SQS, SQSData, SQSHelper } from "./sqs";

const noopLogger = {
error: () => undefined,
};

function setup() {
if (process.env.SQS_QUEUE_URL) {
const queueUrl = new URL(process.env.SQS_QUEUE_URL);
const region = queueUrl.hostname.split(".")[1];
return {
queueUrl: process.env.SQS_QUEUE_URL,
sqs: new SQSHelper({
sqs: new SQS(),
sqs: new SQSClient({ region: region }),
logger: noopLogger,
}),
};
Expand Down Expand Up @@ -160,10 +161,26 @@ interface MockRawMessage {
Visibility: boolean;
}

class MockSQS {
class MockSQS implements SQS {
private queues: { [key: string]: MockRawMessage[] } = {};

changeMessageVisibility(args: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async send(arg: { input: any }) {
switch (arg.constructor.name) {
case "ReceiveMessageCommand":
return this.receiveMessage(arg.input);
case "ChangeMessageVisibilityCommand":
return this.changeMessageVisibility(arg.input);
case "DeleteMessageCommand":
return this.deleteMessage(arg.input);
case "SendMessageCommand":
return this.sendMessage(arg.input);
default:
throw new Error("command not implemented");
}
}

private async changeMessageVisibility(args: {
ReceiptHandle: string;
QueueUrl: string;
VisibilityTimeout: number;
Expand All @@ -173,23 +190,20 @@ class MockSQS {
if (msg) {
msg.Visibility = true;
}

return {
promise: () => Promise.resolve(),
};
}
deleteMessage(args: { ReceiptHandle: string; QueueUrl: string }) {

private async deleteMessage(args: {
ReceiptHandle: string;
QueueUrl: string;
}) {
const queue = this.getQueue(args.QueueUrl);
const i = queue.findIndex((m) => m.ReceiptHandle === args.ReceiptHandle);
if (i >= 0) {
queue.splice(i, 1);
}

return {
promise: () => Promise.resolve(),
};
}
receiveMessage(args: {

private async receiveMessage(args: {
MaxNumberOfMessages: number;
QueueUrl: string;
WaitTimeSeconds: number;
Expand All @@ -208,19 +222,15 @@ class MockSQS {
}

if (nextMessages.length === 0) {
return {
promise: () =>
new Promise<{ Messages: undefined }>((resolve) =>
setTimeout(() => resolve({ Messages: undefined }), 10)
),
};
return await new Promise<{ Messages: undefined }>((resolve) =>
setTimeout(() => resolve({ Messages: undefined }), 10)
);
}

return {
promise: () => Promise.resolve({ Messages: nextMessages }),
};
return { Messages: nextMessages };
}
sendMessage(args: { QueueUrl: string; MessageBody: string }) {

private async sendMessage(args: { QueueUrl: string; MessageBody: string }) {
const id = ulid();

this.getQueue(args.QueueUrl).push({
Expand All @@ -229,10 +239,6 @@ class MockSQS {
ReceiptHandle: "rh-" + id,
Visibility: true,
});

return {
promise: () => Promise.resolve(),
};
}

private getQueue(queueUrl: string) {
Expand Down
84 changes: 32 additions & 52 deletions lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
import util from "util";
import { Logger, MessageHandler } from "./common";
import {
ChangeMessageVisibilityCommand,
DeleteMessageCommand,
ReceiveMessageCommand,
SendMessageCommand,
SQSClient,
} from "@aws-sdk/client-sqs";

interface RawMessage {
MessageId?: string;
Body?: string;
ReceiptHandle?: string;
}

export interface SQS {
changeMessageVisibility: (args: {
ReceiptHandle: string;
QueueUrl: string;
VisibilityTimeout: number;
}) => {
promise: () => Promise<unknown>;
};
deleteMessage: (args: { ReceiptHandle: string; QueueUrl: string }) => {
promise: () => Promise<unknown>;
};
receiveMessage: (args: {
MaxNumberOfMessages: number;
QueueUrl: string;
WaitTimeSeconds: number;
}) => { promise: () => Promise<{ Messages?: RawMessage[] }> };
sendMessage: (args: { QueueUrl: string; MessageBody: string }) => {
promise: () => Promise<unknown>;
};
}
export type SQS = Pick<SQSClient, "send">;

export type SQSData<T> = { id?: string; body: T };

Expand Down Expand Up @@ -103,25 +85,25 @@ export class SQSHelper {
}

async sendToQueue(queueUrl: string, payload: unknown): Promise<void> {
await this.sqs
.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(payload),
})
.promise();
const message = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(payload),
});

await this.sqs.send(message);
}

private async receiveMessages<T>(
queueUrl: string,
max: number
): Promise<SQSMessage<T>[]> {
const rawSqsMessages = await this.sqs
.receiveMessage({
MaxNumberOfMessages: Math.min(10, max),
QueueUrl: queueUrl,
WaitTimeSeconds: 20,
})
.promise();
const message = new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: Math.min(10, max),
WaitTimeSeconds: 20,
});

const rawSqsMessages = await this.sqs.send(message);

if (!rawSqsMessages.Messages) {
return [];
Expand All @@ -135,22 +117,20 @@ export class SQSHelper {
},
ack: async () => {
if (!m.ReceiptHandle) return;
await this.sqs
.deleteMessage({
ReceiptHandle: m.ReceiptHandle,
QueueUrl: queueUrl,
})
.promise();
const message = new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: m.ReceiptHandle,
});
await this.sqs.send(message);
},
nack: async () => {
if (!m.ReceiptHandle) return;
await this.sqs
.changeMessageVisibility({
ReceiptHandle: m.ReceiptHandle,
QueueUrl: queueUrl,
VisibilityTimeout: 0,
})
.promise();
const message = new ChangeMessageVisibilityCommand({
QueueUrl: queueUrl,
ReceiptHandle: m.ReceiptHandle,
VisibilityTimeout: 0,
});
await this.sqs.send(message);
},
};
});
Expand Down
Loading

0 comments on commit 8fb201e

Please sign in to comment.