forked from crccheck/kinesis-console-consumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
139 lines (122 loc) · 4.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
'use strict'
// http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html
const Readable = require('stream').Readable
const debug = require('debug')('kinesis-console-consumer')
function getStreams (client) {
return client.listStreams({}).promise()
}
function getShardId (client, streamName) {
const params = {
StreamName: streamName,
}
return client.describeStream(params).promise()
.then((data) => {
if (!data.StreamDescription.Shards.length) {
throw new Error('No shards!')
}
debug('getShardId found %d shards', data.StreamDescription.Shards.length)
return data.StreamDescription.Shards.map((x) => x.ShardId)
})
}
function getShardIterator (client, streamName, shardId, options) {
const params = Object.assign({
ShardId: shardId,
ShardIteratorType: 'LATEST',
StreamName: streamName,
}, options || {})
return client.getShardIterator(params).promise()
.then((data) => {
debug('getShardIterator got iterator id: %s', data.ShardIterator)
return data.ShardIterator
})
}
class KinesisStreamReader extends Readable {
constructor (client, streamName, options) {
super({
objectMode: !!options.parser, // Should this always be true?
})
this.client = client
this.streamName = streamName
this.options = Object.assign({
interval: 2000,
parser: (x) => x,
filter: new RegExp(options.RegexFilter)
}, options)
this._started = false // TODO this is probably built into Streams
this.iterators = new Set()
}
_startKinesis () {
const whitelist = ['ShardIteratorType', 'Timestamp', 'StartingSequenceNumber']
const shardIteratorOptions = Object.keys(this.options)
.filter((x) => whitelist.indexOf(x) !== -1)
.reduce((result, key) => Object.assign(result, {[key]: this.options[key]}), {})
return getShardId(this.client, this.streamName)
.then((shardIds) => {
const shardIterators = shardIds.map((shardId) =>
getShardIterator(this.client, this.streamName, shardId, shardIteratorOptions))
return Promise.all(shardIterators)
})
.then((shardIterators) => {
shardIterators.forEach((shardIterator) => this.readShard(shardIterator))
})
.catch((err) => {
this.emit('error', err) || console.log(err, err.stack)
})
}
readShard (shardIterator) {
this.iterators.add(shardIterator)
debug('readShard starting from %s (out of %d)', shardIterator, this.iterators.size)
const params = {
ShardIterator: shardIterator,
Limit: 10000, // https://github.com/awslabs/amazon-kinesis-client/issues/4#issuecomment-56859367
}
// Not written using Promises because they make it harder to keep the program alive here
this.client.getRecords(params, (err, data) => {
if (err) {
this.emit('error', err) || console.log(err, err.stack)
return
}
if (data.MillisBehindLatest > 60 * 1000) {
debug('warning: behind by %d milliseconds', data.MillisBehindLatest)
}
data.Records.forEach((x) => {
var record = this.options.parser(x.Data)
if(this.options.NewLine) record += '\n'
if(this.options.filter.test(record)) this.push(record)
})
if (data.Records.length) {
this.emit('checkpoint', data.Records[data.Records.length - 1].SequenceNumber)
}
this.iterators.delete(shardIterator)
if (!data.NextShardIterator) {
debug('readShard.closed %s', shardIterator)
// TODO this.end() when number of shards closed == number of shards being read
// this._started = 0
return
}
setTimeout(() => {
this.readShard(data.NextShardIterator)
// idleTimeBetweenReadsInMillis http://docs.aws.amazon.com/streams/latest/dev/kinesis-low-latency.html
}, this.options.interval)
})
}
_read (size) {
if (this._started) {
return
}
this._startKinesis()
.then(() => {
this._started = 2
})
.catch((err) => {
this.emit('error', err) || console.log(err, err.stack)
})
this._started = 1
}
}
// EXPORTS
//////////
exports.getStreams = getStreams
exports._getShardId = getShardId
exports._getShardIterator = getShardIterator
exports.KinesisStreamReader = KinesisStreamReader