forked from masumsoft/cassandra-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexport.js
118 lines (100 loc) · 3.87 KB
/
export.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
var Promise = require('bluebird');
var cassandra = require('cassandra-driver');
var fs = require('fs');
var jsonStream = require('JSONStream');
var HOST = process.env.HOST || '127.0.0.1';
var PORT = process.env.PORT || 9042;
var KEYSPACE = process.env.KEYSPACE;
if (!KEYSPACE) {
console.log('`KEYSPACE` must be specified as environment variable');
process.exit();
}
var USER = process.env.USER;
var PASSWORD = process.env.PASSWORD;
var authProvider;
if (USER && PASSWORD) {
authProvider = new cassandra.auth.PlainTextAuthProvider(USER, PASSWORD);
}
var systemClient = new cassandra.Client({contactPoints: [HOST], authProvider: authProvider, protocolOptions: {port: [PORT]}});
var client = new cassandra.Client({ contactPoints: [HOST], keyspace: KEYSPACE, authProvider: authProvider, protocolOptions: {port: [PORT]}});
function processTableExport(table) {
console.log('==================================================');
console.log('Reading table: ' + table);
return new Promise(function(resolve, reject) {
var jsonfile = fs.createWriteStream('data/' + table + '.json');
jsonfile.on('error', function (err) {
reject(err);
});
var processed = 0;
var startTime = Date.now();
jsonfile.on('finish', function () {
var timeTaken = (Date.now() - startTime) / 1000;
var throughput = processed / timeTaken;
console.log('Done with table, throughput: ' + throughput.toFixed(1) + ' rows/s');
resolve();
});
var writeStream = jsonStream.stringify('[', ',', ']');
writeStream.pipe(jsonfile);
var query = 'SELECT * FROM "' + table + '"';
var options = { prepare : true , fetchSize : 1000 };
client.eachRow(query, [], options, function (n, row) {
var rowObject = {};
row.forEach(function(value, key){
rowObject[key] = value;
});
processed++;
writeStream.write(rowObject);
}, function (err, result) {
if (err) {
reject(err);
return;
}
console.log('Streaming ' + processed + ' rows to: ' + table + '.json');
if (result.nextPage) {
result.nextPage();
return;
}
console.log('Finalizing writes into: ' + table + '.json');
writeStream.end();
});
});
}
systemClient.connect()
.then(function (){
var systemQuery = "SELECT columnfamily_name as table_name FROM system.schema_columnfamilies WHERE keyspace_name = ?";
if (systemClient.metadata.keyspaces.system_schema) {
systemQuery = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?";
}
console.log('Finding tables in keyspace: ' + KEYSPACE);
return systemClient.execute(systemQuery, [KEYSPACE]);
})
.then(function (result){
var tables = [];
for(var i = 0; i < result.rows.length; i++) {
tables.push(result.rows[i].table_name);
}
if (process.env.TABLE) {
return processTableExport(process.env.TABLE);
}
return Promise.each(tables, function(table){
return processTableExport(table);
});
})
.then(function (){
console.log('==================================================');
console.log('Completed exporting all tables from keyspace: ' + KEYSPACE);
var gracefulShutdown = [];
gracefulShutdown.push(systemClient.shutdown());
gracefulShutdown.push(client.shutdown());
Promise.all(gracefulShutdown)
.then(function (){
process.exit();
})
.catch(function (err){
console.log(err);
process.exit(1);
});
})
.catch(function (err){
console.log(err);
});