diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b8c1a4d..b55538b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,5 +10,9 @@ jobs: persist-credentials: false - name: Build run: yarn prep && yarn build + env: + NODE_OPTIONS: "--openssl-legacy-provider" - name: Test run: yarn test-all + env: + NODE_OPTIONS: "--openssl-legacy-provider" diff --git a/.gitignore b/.gitignore index d3a5923..395674e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ .cache *.orig *.log +*.*~ # Folders diff --git a/packages/92green-aws-rxjs/package.json b/packages/92green-aws-rxjs/package.json index 75e6266..6d68930 100644 --- a/packages/92green-aws-rxjs/package.json +++ b/packages/92green-aws-rxjs/package.json @@ -1,6 +1,6 @@ { "name": "@92green/92green-aws-rxjs", - "version": "0.10.0", + "version": "0.11.0", "description": "Rxjs algorithms for use with AWS SDK v3", "license": "MIT", "author": "Blueflag", diff --git a/packages/92green-aws-rxjs/src/eventbus/README.md b/packages/92green-aws-rxjs/src/eventbus/README.md new file mode 100644 index 0000000..0f3ceee --- /dev/null +++ b/packages/92green-aws-rxjs/src/eventbus/README.md @@ -0,0 +1,21 @@ +# 92Green AWS RXJS Eventbus Functions + +## batchPut +Gather multiple eventbridge messages together before sending in a batch to reduce AWS API calls. +This function will group messages up until the batch count is reached or the total message size exceeds the maximum permitted size. It can also retry specified errors before they are eventually thrown. + +### Parameter +| Parameter | Type | Required | Description | +| eventBridgeClient | EventBridgeClient | Yes | The AWS SDK V3 EventBridge client to use | +| source | string | Yes | The event source string to use for the event | +| eventBusName | string | Yes | The name of the EventBus to send events to | +| retryOn | string[] | No | An array of error code strings that should be retried. This will only work if maxAttempts is also set | +| maxAttempts | number | No | The number of times to retry errors defined in the retryOn parameter. Defaults to 0 for no retries. | +| throttleMs | number | No | The time in milliseconds to throttle retry attempts. Defaults to 500ms | +| maxMessageSize | number | No | The maximum size permitted for the message batch to reach before sending. Defaults to 256kb which is the maximum for EventBridge | +| maxBatchCount | number | No | The maximum number of messages to include in a single batch. Defaults to 10 | + +### Handling errors +batchPut will throw an error when items fail to send. If errors are not appropriatly handled using rxjs `catchError` then any failed messages may cause the remainder of the batch to fail. The thrown `BatchPutError` extends Error with the following additional properties +* code: The AWS error code or `UNKNOWN` if it is not defined +* data: The original message data it was trying to send to eventbridge diff --git a/packages/92green-aws-rxjs/src/eventbus/__tests__/batchPutWithRetry.spec.ts b/packages/92green-aws-rxjs/src/eventbus/__tests__/batchPutWithRetry.spec.ts index 9e11e73..83fc89c 100644 --- a/packages/92green-aws-rxjs/src/eventbus/__tests__/batchPutWithRetry.spec.ts +++ b/packages/92green-aws-rxjs/src/eventbus/__tests__/batchPutWithRetry.spec.ts @@ -1,8 +1,8 @@ -import batchPut from '../batchPut'; +import batchPut, {BatchPutError} from '../batchPut'; import {from, lastValueFrom} from "rxjs"; import {EventBridgeClient, PutEventsCommand, PutEventsCommandOutput} from "@aws-sdk/client-eventbridge"; import {mockClient} from "aws-sdk-client-mock"; -import { toArray } from 'rxjs/operators'; +import { toArray, distinct } from 'rxjs/operators'; describe('batchPut', () => { it('retrys on ThrottlingException', async () => { @@ -29,7 +29,7 @@ describe('batchPut', () => { mockClient(eventBridgeClient) .on(PutEventsCommand) .callsFake(fn) - + await lastValueFrom(from([{test: 'test1'}]) .pipe( batchPut({ @@ -44,6 +44,46 @@ describe('batchPut', () => { ) expect(fn).toBeCalledTimes(2); }); + + it('Throws error on alternative failure', async () => { + expect.assertions(3); + let eventBridgeClient = new EventBridgeClient({}); + let responsePayload: PutEventsCommandOutput = { + $metadata: {}, + Entries: [ + { + ErrorCode: "DifferentError" + } + ] + } + + let fn = jest.fn() + .mockImplementationOnce(() => Promise.resolve(responsePayload)) + + mockClient(eventBridgeClient) + .on(PutEventsCommand) + .callsFake(fn) + + const message = {test: 'test1'}; + try { + await lastValueFrom(from([message]) + .pipe( + batchPut({ + eventBridgeClient, + detailType: 'test', + source: 'learningrecord.test', + eventBusName: 'eventbustest', + maxAttempts: 10, + throttleMs: 0 + }) + ) + ); + } catch (err) { + expect(err).toBeInstanceOf(Error); + expect((err as BatchPutError).code).toEqual('DifferentError'); + expect((err as BatchPutError).data).toEqual(expect.objectContaining({Detail: JSON.stringify(message)})); + } + }); it('buffers entrys into one call', async () => { let eventBridgeClient = new EventBridgeClient({}); let responsePayload = { @@ -54,13 +94,16 @@ describe('batchPut', () => { mockClient(eventBridgeClient) .on(PutEventsCommand) .callsFake(fn) - await lastValueFrom(from([ + + const messages = [ {test: 'test1'}, {test: 'test2'}, {test: 'test3'} - - ]) + ]; + await lastValueFrom( + from(messages) .pipe( + distinct(({test}) => test), batchPut({ eventBridgeClient, detailType: 'test', @@ -71,8 +114,9 @@ describe('batchPut', () => { }), toArray() )) - - expect(fn.mock.calls[0][0].Entries.length).toBe(3); + + expect(fn).toHaveBeenCalled(); + expect(fn.mock.calls[0][0].Entries).toHaveLength(messages.length); }); it('uses the event bus name specified in the config', async () => { let eventBridgeClient = new EventBridgeClient({}); @@ -126,7 +170,7 @@ describe('batchPut', () => { }) // toArray() ) - + await lastValueFrom($obs); expect(fn.mock.calls.length).toBe(1); }); @@ -141,7 +185,7 @@ describe('batchPut', () => { mockClient(eventBridgeClient) .on(PutEventsCommand) .callsFake(fn) - + let [event] = await lastValueFrom(from([{test: 'test1'}]) .pipe( batchPut({ @@ -187,4 +231,72 @@ describe('batchPut', () => { }) ); }); + + + it('does not put groups of messages over the size limit', async () => { + const sizeLimit = 10 * 1024; // 10 KB + const largeMessage = { test: 'a'.repeat(sizeLimit / 2) }; // Creates a large message half the size of the limit + const messages = [largeMessage, largeMessage, largeMessage]; // This should exceed the limit when combined + + let eventBridgeClient = new EventBridgeClient({}); + let putCmd = jest.fn().mockImplementation(({Entries}) => Promise.resolve({ + Entries: Entries.map((_: unknown, index: number) => ({ EventId: `event_${index}` })) + })); + mockClient(eventBridgeClient) + .on(PutEventsCommand) + .callsFake(putCmd) + + await lastValueFrom( + from(messages).pipe( + batchPut({ + eventBridgeClient, + detailType: 'test', + source: 'learningrecord.test', + eventBusName: 'eventbustest', + maxAttempts: 1, + throttleMs: 0, + maxMessageSize: sizeLimit, + maxBatchCount: 10 // Set a higher batch count to test size limit + }), + toArray() + ) + ); + + // The function should have split the messages into separate batches due to size constraints + expect(putCmd).toHaveBeenCalledTimes(3); + }); + + it('does not put groups of messages over the count limit', async () => { + const countLimit = 10; + const messages = new Array(countLimit + 5).fill({ test: 'test' }); // Creates more messages than the count limit + + + let eventBridgeClient = new EventBridgeClient({}); + let putCmd = jest.fn().mockImplementation(({Entries}) => Promise.resolve({ + Entries: Entries.map((_: unknown, index: number) => ({ EventId: `event_${index}` })) + })); + mockClient(eventBridgeClient) + .on(PutEventsCommand) + .callsFake(putCmd) + + await lastValueFrom( + from(messages).pipe( + batchPut({ + eventBridgeClient, + detailType: 'test', + source: 'learningrecord.test', + eventBusName: 'eventbustest', + maxAttempts: 1, + throttleMs: 0, + maxBatchCount: countLimit // Set the batch count limit + }), + toArray() + ) + ); + + // The function should have split the messages into 2 batches: one with the count limit and one with the remainder + expect(putCmd).toHaveBeenCalledTimes(2); + expect(putCmd.mock.calls[0][0].Entries).toHaveLength(countLimit); + expect(putCmd.mock.calls[1][0].Entries).toHaveLength(5); + }); }) diff --git a/packages/92green-aws-rxjs/src/eventbus/batchPut.ts b/packages/92green-aws-rxjs/src/eventbus/batchPut.ts index fc5ac44..369bfaa 100644 --- a/packages/92green-aws-rxjs/src/eventbus/batchPut.ts +++ b/packages/92green-aws-rxjs/src/eventbus/batchPut.ts @@ -1,20 +1,75 @@ -// @flow import {Observable, interval, zip, of, EMPTY} from "rxjs"; -import {map, filter, expand, bufferCount, concatMap,share, throttle} from 'rxjs/operators'; -const MAX_EVENT_BRIDGE_PUT = 10; +import {map, concatMap, filter, expand, scan, share, throttle, endWith} from 'rxjs/operators'; const DEFAULT_THROTTLE_MS = 500; +const MAX_TOTAL_MESSAGE_SIZE = 262144; // 256 * 1024 - https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html +const DEFAULT_BATCH_SIZE = 10; import type {EventBridgeClient, PutEventsRequestEntry, PutEventsResultEntry} from '@aws-sdk/client-eventbridge' import {PutEventsCommand} from '@aws-sdk/client-eventbridge'; +const END_SYMBOL = Symbol('END'); + +export class BatchPutError extends Error { + code: string; + data: unknown; + + constructor(message: string, code: string, data: unknown) { + super(message); + this.code = code; + this.data = data; + } +} + +export type BufferSizeParams = { + bufferSize: number, + sizeFn: (data: T) => number, + maxBufferCount: number +}; + +export const bufferSize = ({bufferSize, sizeFn, maxBufferCount}: BufferSizeParams) => (obs: Observable): Observable => { + + return obs.pipe( + endWith(END_SYMBOL), + scan((acc, value) => { + if(value === END_SYMBOL) { + acc.result = acc.buffer; + acc.buffer = []; + acc.emit = true; + return acc; + } + + const size = sizeFn(value); + if(size > bufferSize) { + throw new Error(`Message size ${size} is larger than buffer size ${bufferSize}`); + } + if (acc.size + size > bufferSize || acc.buffer.length >= maxBufferCount) { + acc.result = acc.buffer; + acc.buffer = [value]; + acc.size = size; + acc.emit = true; + return acc; + } + acc.buffer.push(value); + acc.emit = false; + acc.size += size; + return acc; + }, {buffer: [] as T[], emit: false, size: 0, result: [] as T[]}), + filter(({emit}) => emit), + map(({result}) => result) + ); +} + + type Config = { eventBridgeClient: EventBridgeClient, retryOn?: string[], detailType: string, source: string, eventBusName: string, - maxAttempts: number, - throttleMs: number + maxAttempts?: number, + throttleMs?: number, + maxMessageSize?: number, + maxBatchCount?: number }; const RETRY_ON = [ @@ -29,13 +84,21 @@ type MetaData = { type RecordTuple= [PutEventsRequestEntry, MetaData] -export default (config: Config) => (obs: Observable) => { +export default (config: Config) => (obs: Observable) => { + const maxMessageSize = config.maxMessageSize || MAX_TOTAL_MESSAGE_SIZE; + const maxBatchCount = config.maxBatchCount || DEFAULT_BATCH_SIZE; + const maxAttempts = config.maxAttempts || 0; + const retryOn = config.retryOn || RETRY_ON; const sendToEventBus = (obs: Observable): Observable => { let input = obs.pipe(share()); let results = obs.pipe( - map(([record]) => record), - bufferCount(MAX_EVENT_BRIDGE_PUT), + map((tuple) => tuple[0]), + bufferSize({ + sizeFn: (data: PutEventsRequestEntry) => JSON.stringify(data).length, + bufferSize: maxMessageSize, + maxBufferCount: maxBatchCount + }), concatMap(records => config .eventBridgeClient .send(new PutEventsCommand({Entries: records}) @@ -49,7 +112,7 @@ export default (config: Config) => (obs: Observable) => { map(([input, result]) => { let [record, info] = input; return [record, { - ...info, + ...info, result, attempts: ++info.attempts }]; @@ -69,21 +132,23 @@ export default (config: Config) => (obs: Observable) => { sendToEventBus, expand(ii => of(ii) .pipe( - filter(([, info]) => { - // Bail no result.. - if(!info.result){ - return false; - } - // Bail can't check error code; - if(!info.result.ErrorCode){ - return false - } + filter(([item, info]) => { // Event already pushed to EventBus, Don't retry; - if(info.result.EventId) + if(info.result?.EventId) { return false + } - return retryOn.includes(info.result.ErrorCode) && - info.attempts < config.maxAttempts; + const errorCode = info.result?.ErrorCode || 'UNKNOWN'; + + if( + info.attempts < maxAttempts + && retryOn.includes(errorCode) + ) { + return true; + } + + + throw new BatchPutError(`${errorCode} Error sending record to eventbridge`, errorCode, item); }), throttle(() => interval(config.throttleMs || DEFAULT_THROTTLE_MS)), sendToEventBus @@ -91,4 +156,4 @@ export default (config: Config) => (obs: Observable) => { ) ); -}; \ No newline at end of file +};