forked from crccheck/kinesis-console-consumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcli.js
executable file
·73 lines (66 loc) · 2.49 KB
/
cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env node
'use strict'
const fs = require('fs')
const AWS = require('aws-sdk')
const program = require('commander')
const updateNotifier = require('update-notifier')
const index = require('./')
const pkg = JSON.parse(fs.readFileSync(`${__dirname}/package.json`))
const client = new AWS.Kinesis()
program
.version(pkg.version)
.arguments('<stream_name>')
// XXX --list works as an accident of not specifying a stream_name
.option('--list', 'Just list all streams and exit')
.option('--type-latest', '(DEFAULT) start reading any new data (LATEST)')
.option('--type-oldest', 'start reading from the oldest data (TRIM_HORIZON)')
.option('--type-at <sequence_number>', 'start reading from this sequence number (AT_SEQUENCE_NUMBER)')
.option('--type-after <sequence_number>', 'start reading after this sequence number (AFTER_SEQUENCE_NUMBER)')
.option('--type-timestamp <timestamp>', 'start reading after this time (units: epoch seconds) (AT_TIMESTAMP)')
.option('--new-line', 'print each record to a new line')
.option('--regex-filter <regexFilter>', 'filter data using this regular expression')
.action((streamName) => {
if (program.list) {
// Hack program.args to be empty so the getStreams block below will run instead
program.args = []
return
}
const options = {}
if (program.typeTimestamp) {
options.ShardIteratorType = 'AT_TIMESTAMP'
if (isNaN(program.typeTimestamp)) {
options.Timestamp = program.typeTimestamp
} else {
options.Timestamp = parseInt(program.typeTimestamp, 10)
}
} else if (program.typeAfter) {
options.ShardIteratorType = 'AFTER_SEQUENCE_NUMBER'
options.StartingSequenceNumber = program.typeAfter
} else if (program.typeAt) {
options.ShardIteratorType = 'AT_SEQUENCE_NUMBER'
options.StartingSequenceNumber = program.typeAt
} else if (program.typeOldest) {
options.ShardIteratorType = 'TRIM_HORIZON'
} else {
options.ShardIteratorType = 'LATEST'
}
if(program.newLine) {
options.NewLine = true
} else {
options.Newline = false
}
if (program.regexFilter) {
options.RegexFilter = program.regexFilter
} else {
options.RegexFilter = ".*"
}
const reader = new index.KinesisStreamReader(client, streamName, options)
reader.pipe(process.stdout)
})
.parse(process.argv)
if (!program.args.length) {
index.getStreams(client)
.then(console.log)
.catch(console.error)
}
updateNotifier({pkg}).notify()