forked from czzarr/node-stream-to-mongo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
39 lines (32 loc) · 981 Bytes
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var Writable = require('stream').Writable;
var util = require('util');
var MongoClient = require('mongodb').MongoClient;
util.inherits(StreamToMongo, Writable);
module.exports = StreamToMongo;
function StreamToMongo(options) {
if(!(this instanceof StreamToMongo)) {
return new StreamToMongo(options);
}
Writable.call(this, { objectMode: true });
this.options = options;
}
StreamToMongo.prototype._write = function (obj, encoding, done) {
var self = this;
// Custom action definition
var action = this.options.action || function insert (obj, cb) {
this.collection.insert(obj, {w: 1}, cb);
};
if (!this.db) {
MongoClient.connect(this.options.db, function (err, db) {
if (err) throw err;
self.db = db;
self.on('finish', function () {
self.db.close();
});
self.collection = db.collection(self.options.collection);
action.call(self, obj, done);
});
} else {
action.call(self, obj, done);
}
};