This repository has been archived by the owner on Dec 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfirehose.ts
104 lines (88 loc) · 2.54 KB
/
firehose.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import _ from 'lodash-firecloud';
import aws from 'aws-sdk';
import {
LambdaContext
} from './types';
type RecordBatch = {
DeliveryStreamName: aws.Firehose.DeliveryStreamName;
Records: aws.Firehose.Record[];
byteSize: number;
};
// see https://docs.aws.amazon.com/firehose/latest/dev/limits.html
export let limits = {
batchByteSize: 4 * 1024 * 1024,
batchRecord: 500,
recordByteSize: 1000 * 1024
};
let _putRecordBatches = async function({
firehose,
recordBatches
}): Promise<number> {
let processedCount = 0;
for (let recordBatch of recordBatches) {
delete recordBatch.byteSize;
await firehose.putRecordBatch(recordBatch).promise();
processedCount = processedCount + (recordBatch.Records as any[]).length;
}
return processedCount;
};
export let putRecords = async function({
DeliveryStreamName,
ctx,
firehose = new aws.Firehose(),
records
}: {
DeliveryStreamName: aws.Firehose.DeliveryStreamName;
ctx: LambdaContext;
firehose: aws.Firehose;
records: aws.Firehose.Record[];
}): Promise<void | {
largeRecords: aws.Firehose.Record[];
}> {
let largeRecords = [] as aws.Firehose.Record[];
let recordBatches = [] as RecordBatch[];
let recordBatch = {
DeliveryStreamName,
Records: [],
byteSize: 0
} as RecordBatch;
let toProcessCount = records.length;
for (let record of records) {
let Data = JSON.stringify(record);
Data = `${Data}\n`;
let dataByteSize = Buffer.byteLength(Data);
if (dataByteSize > limits.recordByteSize) {
largeRecords.push(record);
await ctx.log.error(`Skipping record larger than ${limits.recordByteSize / 1024} KB: \
${dataByteSize / 1024} KB.`, {
record
});
toProcessCount = toProcessCount - 1;
continue;
}
if (recordBatch.byteSize + dataByteSize > limits.batchByteSize ||
recordBatch.Records.length + 1 > limits.batchRecord) {
recordBatches.push(recordBatch);
recordBatch = {
DeliveryStreamName,
Records: [],
byteSize: 0
} as RecordBatch;
}
recordBatch.byteSize = recordBatch.byteSize + dataByteSize;
recordBatch.Records.push({
Data
});
}
recordBatches.push(recordBatch);
recordBatches = _.reject(recordBatches, function(recordBatch) {
return recordBatch.byteSize === 0;
});
let processedCount = await _putRecordBatches({firehose, recordBatches});
if (processedCount !== toProcessCount) {
throw new Error(`Not all records processed. Expected ${toProcessCount}, actually ${processedCount}.`);
}
return {
largeRecords
};
};