Skip to content

Commit

Permalink
Merge pull request #88 from 92green/feature/batchput-max-size
Browse files Browse the repository at this point in the history
Limit EventBus batchPut max message size to prevent exceeding AWS limit
  • Loading branch information
melvey authored Nov 16, 2023
2 parents cfec1e3 + 585b7a1 commit 4ac1c10
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 33 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ jobs:
persist-credentials: false
- name: Build
run: yarn prep && yarn build
env:
NODE_OPTIONS: "--openssl-legacy-provider"
- name: Test
run: yarn test-all
env:
NODE_OPTIONS: "--openssl-legacy-provider"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.cache
*.orig
*.log
*.*~


# Folders
Expand Down
2 changes: 1 addition & 1 deletion packages/92green-aws-rxjs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@92green/92green-aws-rxjs",
"version": "0.10.0",
"version": "0.11.0",
"description": "Rxjs algorithms for use with AWS SDK v3",
"license": "MIT",
"author": "Blueflag",
Expand Down
21 changes: 21 additions & 0 deletions packages/92green-aws-rxjs/src/eventbus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# 92Green AWS RXJS Eventbus Functions

## batchPut
Gather multiple eventbridge messages together before sending in a batch to reduce AWS API calls.
This function will group messages up until the batch count is reached or the total message size exceeds the maximum permitted size. It can also retry specified errors before they are eventually thrown.

### Parameter
| Parameter | Type | Required | Description |
| eventBridgeClient | EventBridgeClient | Yes | The AWS SDK V3 EventBridge client to use |
| source | string | Yes | The event source string to use for the event |
| eventBusName | string | Yes | The name of the EventBus to send events to |
| retryOn | string[] | No | An array of error code strings that should be retried. This will only work if maxAttempts is also set |
| maxAttempts | number | No | The number of times to retry errors defined in the retryOn parameter. Defaults to 0 for no retries. |
| throttleMs | number | No | The time in milliseconds to throttle retry attempts. Defaults to 500ms |
| maxMessageSize | number | No | The maximum size permitted for the message batch to reach before sending. Defaults to 256kb which is the maximum for EventBridge |
| maxBatchCount | number | No | The maximum number of messages to include in a single batch. Defaults to 10 |

### Handling errors
batchPut will throw an error when items fail to send. If errors are not appropriatly handled using rxjs `catchError` then any failed messages may cause the remainder of the batch to fail. The thrown `BatchPutError` extends Error with the following additional properties
* code: The AWS error code or `UNKNOWN` if it is not defined
* data: The original message data it was trying to send to eventbridge
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import batchPut from '../batchPut';
import batchPut, {BatchPutError} from '../batchPut';
import {from, lastValueFrom} from "rxjs";
import {EventBridgeClient, PutEventsCommand, PutEventsCommandOutput} from "@aws-sdk/client-eventbridge";
import {mockClient} from "aws-sdk-client-mock";
import { toArray } from 'rxjs/operators';
import { toArray, distinct } from 'rxjs/operators';

describe('batchPut', () => {
it('retrys on ThrottlingException', async () => {
Expand All @@ -29,7 +29,7 @@ describe('batchPut', () => {
mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(fn)

await lastValueFrom(from([{test: 'test1'}])
.pipe(
batchPut({
Expand All @@ -44,6 +44,46 @@ describe('batchPut', () => {
)
expect(fn).toBeCalledTimes(2);
});

it('Throws error on alternative failure', async () => {
expect.assertions(3);
let eventBridgeClient = new EventBridgeClient({});
let responsePayload: PutEventsCommandOutput = {
$metadata: {},
Entries: [
{
ErrorCode: "DifferentError"
}
]
}

let fn = jest.fn()
.mockImplementationOnce(() => Promise.resolve(responsePayload))

mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(fn)

const message = {test: 'test1'};
try {
await lastValueFrom(from([message])
.pipe(
batchPut({
eventBridgeClient,
detailType: 'test',
source: 'learningrecord.test',
eventBusName: 'eventbustest',
maxAttempts: 10,
throttleMs: 0
})
)
);
} catch (err) {
expect(err).toBeInstanceOf(Error);
expect((err as BatchPutError).code).toEqual('DifferentError');
expect((err as BatchPutError).data).toEqual(expect.objectContaining({Detail: JSON.stringify(message)}));
}
});
it('buffers entrys into one call', async () => {
let eventBridgeClient = new EventBridgeClient({});
let responsePayload = {
Expand All @@ -54,13 +94,16 @@ describe('batchPut', () => {
mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(fn)
await lastValueFrom(from([

const messages = [
{test: 'test1'},
{test: 'test2'},
{test: 'test3'}

])
];
await lastValueFrom(
from(messages)
.pipe(
distinct(({test}) => test),
batchPut({
eventBridgeClient,
detailType: 'test',
Expand All @@ -71,8 +114,9 @@ describe('batchPut', () => {
}),
toArray()
))

expect(fn.mock.calls[0][0].Entries.length).toBe(3);

expect(fn).toHaveBeenCalled();
expect(fn.mock.calls[0][0].Entries).toHaveLength(messages.length);
});
it('uses the event bus name specified in the config', async () => {
let eventBridgeClient = new EventBridgeClient({});
Expand Down Expand Up @@ -126,7 +170,7 @@ describe('batchPut', () => {
})
// toArray()
)

await lastValueFrom($obs);
expect(fn.mock.calls.length).toBe(1);
});
Expand All @@ -141,7 +185,7 @@ describe('batchPut', () => {
mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(fn)

let [event] = await lastValueFrom(from([{test: 'test1'}])
.pipe(
batchPut({
Expand Down Expand Up @@ -187,4 +231,72 @@ describe('batchPut', () => {
})
);
});


it('does not put groups of messages over the size limit', async () => {
const sizeLimit = 10 * 1024; // 10 KB
const largeMessage = { test: 'a'.repeat(sizeLimit / 2) }; // Creates a large message half the size of the limit
const messages = [largeMessage, largeMessage, largeMessage]; // This should exceed the limit when combined

let eventBridgeClient = new EventBridgeClient({});
let putCmd = jest.fn().mockImplementation(({Entries}) => Promise.resolve({
Entries: Entries.map((_: unknown, index: number) => ({ EventId: `event_${index}` }))
}));
mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(putCmd)

await lastValueFrom(
from(messages).pipe(
batchPut({
eventBridgeClient,
detailType: 'test',
source: 'learningrecord.test',
eventBusName: 'eventbustest',
maxAttempts: 1,
throttleMs: 0,
maxMessageSize: sizeLimit,
maxBatchCount: 10 // Set a higher batch count to test size limit
}),
toArray()
)
);

// The function should have split the messages into separate batches due to size constraints
expect(putCmd).toHaveBeenCalledTimes(3);
});

it('does not put groups of messages over the count limit', async () => {
const countLimit = 10;
const messages = new Array(countLimit + 5).fill({ test: 'test' }); // Creates more messages than the count limit


let eventBridgeClient = new EventBridgeClient({});
let putCmd = jest.fn().mockImplementation(({Entries}) => Promise.resolve({
Entries: Entries.map((_: unknown, index: number) => ({ EventId: `event_${index}` }))
}));
mockClient(eventBridgeClient)
.on(PutEventsCommand)
.callsFake(putCmd)

await lastValueFrom(
from(messages).pipe(
batchPut({
eventBridgeClient,
detailType: 'test',
source: 'learningrecord.test',
eventBusName: 'eventbustest',
maxAttempts: 1,
throttleMs: 0,
maxBatchCount: countLimit // Set the batch count limit
}),
toArray()
)
);

// The function should have split the messages into 2 batches: one with the count limit and one with the remainder
expect(putCmd).toHaveBeenCalledTimes(2);
expect(putCmd.mock.calls[0][0].Entries).toHaveLength(countLimit);
expect(putCmd.mock.calls[1][0].Entries).toHaveLength(5);
});
})
109 changes: 87 additions & 22 deletions packages/92green-aws-rxjs/src/eventbus/batchPut.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,75 @@
// @flow
import {Observable, interval, zip, of, EMPTY} from "rxjs";
import {map, filter, expand, bufferCount, concatMap,share, throttle} from 'rxjs/operators';
const MAX_EVENT_BRIDGE_PUT = 10;
import {map, concatMap, filter, expand, scan, share, throttle, endWith} from 'rxjs/operators';
const DEFAULT_THROTTLE_MS = 500;
const MAX_TOTAL_MESSAGE_SIZE = 262144; // 256 * 1024 - https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html
const DEFAULT_BATCH_SIZE = 10;

import type {EventBridgeClient, PutEventsRequestEntry, PutEventsResultEntry} from '@aws-sdk/client-eventbridge'
import {PutEventsCommand} from '@aws-sdk/client-eventbridge';

const END_SYMBOL = Symbol('END');

export class BatchPutError extends Error {
code: string;
data: unknown;

constructor(message: string, code: string, data: unknown) {
super(message);
this.code = code;
this.data = data;
}
}

export type BufferSizeParams<T> = {
bufferSize: number,
sizeFn: (data: T) => number,
maxBufferCount: number
};

export const bufferSize = <T>({bufferSize, sizeFn, maxBufferCount}: BufferSizeParams<T>) => (obs: Observable<T>): Observable<T[]> => {

return obs.pipe(
endWith(END_SYMBOL),
scan((acc, value) => {
if(value === END_SYMBOL) {
acc.result = acc.buffer;
acc.buffer = [];
acc.emit = true;
return acc;
}

const size = sizeFn(value);
if(size > bufferSize) {
throw new Error(`Message size ${size} is larger than buffer size ${bufferSize}`);
}
if (acc.size + size > bufferSize || acc.buffer.length >= maxBufferCount) {
acc.result = acc.buffer;
acc.buffer = [value];
acc.size = size;
acc.emit = true;
return acc;
}
acc.buffer.push(value);
acc.emit = false;
acc.size += size;
return acc;
}, {buffer: [] as T[], emit: false, size: 0, result: [] as T[]}),
filter(({emit}) => emit),
map(({result}) => result)
);
}


type Config = {
eventBridgeClient: EventBridgeClient,
retryOn?: string[],
detailType: string,
source: string,
eventBusName: string,
maxAttempts: number,
throttleMs: number
maxAttempts?: number,
throttleMs?: number,
maxMessageSize?: number,
maxBatchCount?: number
};

const RETRY_ON = [
Expand All @@ -29,13 +84,21 @@ type MetaData = {

type RecordTuple= [PutEventsRequestEntry, MetaData]

export default <T extends object>(config: Config) => (obs: Observable<T>) => {
export default <T>(config: Config) => (obs: Observable<T>) => {
const maxMessageSize = config.maxMessageSize || MAX_TOTAL_MESSAGE_SIZE;
const maxBatchCount = config.maxBatchCount || DEFAULT_BATCH_SIZE;
const maxAttempts = config.maxAttempts || 0;

const retryOn = config.retryOn || RETRY_ON;
const sendToEventBus = (obs: Observable<RecordTuple>): Observable<RecordTuple> => {
let input = obs.pipe(share());
let results = obs.pipe(
map(([record]) => record),
bufferCount(MAX_EVENT_BRIDGE_PUT),
map((tuple) => tuple[0]),
bufferSize({
sizeFn: (data: PutEventsRequestEntry) => JSON.stringify(data).length,
bufferSize: maxMessageSize,
maxBufferCount: maxBatchCount
}),
concatMap(records => config
.eventBridgeClient
.send(new PutEventsCommand({Entries: records})
Expand All @@ -49,7 +112,7 @@ export default <T extends object>(config: Config) => (obs: Observable<T>) => {
map(([input, result]) => {
let [record, info] = input;
return [record, {
...info,
...info,
result,
attempts: ++info.attempts
}];
Expand All @@ -69,26 +132,28 @@ export default <T extends object>(config: Config) => (obs: Observable<T>) => {
sendToEventBus,
expand(ii => of(ii)
.pipe(
filter(([, info]) => {
// Bail no result..
if(!info.result){
return false;
}
// Bail can't check error code;
if(!info.result.ErrorCode){
return false
}
filter(([item, info]) => {
// Event already pushed to EventBus, Don't retry;
if(info.result.EventId)
if(info.result?.EventId) {
return false
}

return retryOn.includes(info.result.ErrorCode) &&
info.attempts < config.maxAttempts;
const errorCode = info.result?.ErrorCode || 'UNKNOWN';

if(
info.attempts < maxAttempts
&& retryOn.includes(errorCode)
) {
return true;
}


throw new BatchPutError(`${errorCode} Error sending record to eventbridge`, errorCode, item);
}),
throttle(() => interval(config.throttleMs || DEFAULT_THROTTLE_MS)),
sendToEventBus
)
)

);
};
};

0 comments on commit 4ac1c10

Please sign in to comment.