-
Notifications
You must be signed in to change notification settings - Fork 8
/
index.js
299 lines (243 loc) · 8.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
module.exports = function (db, options) {
return wrap(db, new Version(options))
}
module.exports.Version = Version
var wrap = require("level-onion")
var fix = require("level-fix-range")
var concat = require("terminus").concat
var gc = require("./gc")
// TODO until https://github.com/rvagg/node-levelup/issues/218 is resolved
var NotFoundError = require("levelup/lib/errors").NotFoundError
var NativeBatch = require("levelup/lib/batch")
var through = require("through2")
var u = require("./util")
var makeKey = u.makeKey
var unmakeKey = u.unmakeKey
var wrapCb = u.wrapCb
var encode = u.encode
var decode = u.decode
/**
* @param {Object} options Wrapping options
* - gcMaxVersions [no default] When doing GC it will only keep gcMaxVersions for each key
* - gcMaxAge [no default] When doing GC only keep versions where (latest_version) - gcMaxAge > version
* - gcFreqMs [60000] How often the GC runs to apply GC rules. Only runs if a gcMax* option is set.
* - gcBackup [no default] A level-version instance to put gc-culled records into.
* - gcCallback [no default] A callback to execute when gc sweeps complete
* - defaultVersion [Date.now] A function to provide the default version if none is specified.
* - delimiter [\xff] The internal delimiter to use.
*/
function Version(options) {
this.type = "version"
this.unique = true
options = options || {}
this.options = options
if (!options.defaultVersion) options.defaultVersion = Date.now
if (typeof options.defaultVersion != "function")
throw new Error("defaultVersion generator must be a function.")
this.delimiter = (options.delimiter != null) ? options.delimiter : "\xff"
this.defaultVersion = options.defaultVersion
}
Version.prototype.install = function (db, parent) {
var self = this
var sep = this.delimiter
setTimeout(function () {
self.gc = gc(db, self.options)
}, self.options.gcFreqMs)
/* -- put -- */
db.put = function (key, value, options, cb) {
if (!cb && typeof options == "function") {
cb = options
options = {}
}
if (options == null) options = {}
var version = (options.version != null) ? options.version : self.defaultVersion()
return parent.put(makeKey(sep, key, version), value, options, wrapCb(version, cb))
}
/* -- get -- */
db.get = function (key, options, cb) {
if (!cb && typeof options == "function") {
cb = options
options = {}
}
if (options == null) options = {}
if (options.version == null) {
return db.getLast(key, options, cb)
}
return parent.get(makeKey(sep, key, options.version), options, wrapCb(options.version, cb))
}
function getEnd(reverse, key, options, cb) {
if (!cb && typeof options == "function") {
cb = options
options = undefined
}
if (!cb) throw new Error("Get with no callback?")
function collect(records) {
if (!records || !records.length) {
return cb(new NotFoundError("Key not found in database [" + key + "]"))
}
var r = records[0]
// TODO other options?
if (options && options.valueEncoding == "json") r.value = JSON.parse(r.value)
return cb(null, r.value, r.version)
}
db.createVersionStream(key, {limit: 1, reverse: reverse})
.pipe(concat({objectMode: true}, collect))
}
db.getLast = function (key, options, cb) {
return getEnd(false, key, options, cb)
}
db.getFirst = function (key, options, cb) {
return getEnd(true, key, options, cb)
}
/* -- del -- */
db.del = function (key, options, cb) {
if (!cb && typeof options == "function") {
cb = options
options = {}
}
var version = (options.version != null) ? options.version : self.defaultVersion()
return parent.del(makeKey(sep, key, version), options, wrapCb(version, cb))
}
/* -- batch -- */
function Batch() {
this._batch = new NativeBatch(parent)
}
Batch.prototype.put = function (key, value, options) {
if (options == null) options = {}
var version = (options.version != null) ? options.version : self.defaultVersion()
this._batch.put(makeKey(sep, key, version), value, options)
return this
}
Batch.prototype.del = function (key, options) {
var version = (options.version != null) ? options.version : self.defaultVersion()
this._batch.del(makeKey(sep, key, version), options)
return this
}
Batch.prototype.clear = function () {
this._batch.clear()
return this
}
Batch.prototype.write = function (callback) {
parent.once("batch", function (batch) {
// TODO this has a potential race condition if someone is simultaneously writing batches
// against the non-version-wrapped parent.
db.emit("batch", batch)
})
this._batch.write(callback)
return this
}
db.batch = function (arr, options, cb) {
if (!arguments.length) return new Batch()
var transformed = arr.map(function (e) {
var version = (e.version != null) ? e.version : self.defaultVersion()
e.key = makeKey(sep, e.key, version)
return e
})
parent.batch(transformed, options, cb)
}
/* -- STREAMS -- */
/* -- createReadStream -- */
db.createReadStream = function (options) {
// additional options:
// minVersion -- Only include versions >= minVersion
// maxVersion -- Only include version <= maxVersion
// versionLimit -- Only return versionLimit records per key
options = options || {}
if (options.max != null) options.max = options.max + sep + sep
if (options.end != null) options.end = options.end + sep + sep
if (options._start) options.start = options._start
if (options._end) options.end = options._end
if (options.maxVersion == null) options.maxVersion = u.MAX_VERSION
if (options.minVersion == null) options.minVersion = u.MIN_VERSION
var removeKeys = (options.keys === false) ? true : false
options.keys = true
var filter = through({objectMode: true}, function (record, encoding, cb) {
if (typeof record != "object") {
if (options.keys) record = {key: record}
if (options.values) record = {value: record}
// if both are true... wtf?
}
// split off version key & add it to record
var kv = unmakeKey(sep, record.key)
if (options.versionLimit) {
if (kv.key != this.currentKey) {
this.currentKey = kv.key
this.currentCount = 0
}
if (this.currentCount++ >= options.versionLimit) return cb()
}
if (kv.version >= options.minVersion && kv.version <= options.maxVersion) {
record.version = kv.version
record.key = kv.key
this.push(record)
}
cb()
})
parent.createReadStream(fix(options))
.pipe(filter)
if (removeKeys) {
var stripKeys = through({objectMode: true}, function (record, encoding, cb) {
record.key = undefined
this.push(record)
cb()
})
filter.pipe(stripKeys)
return stripKeys
}
return filter
}
db.readStream = db.createReadStream
/* -- createKeyStream -- */
db.createKeyStream = function (options) {
options = options || {}
options.keys = true
options.values = false
return db.createReadStream(options)
}
db.keyStream = db.createKeyStream
/* -- createValueStream -- */
// TODO this may break the contract of levelup.createValueStream
// as this puts it in objectMode vs Buffer mode with raw values...
db.createValueStream = function (options) {
options = options || {}
options.keys = false
options.values = true
return db.createReadStream(options)
}
db.valueStream = db.createValueStream
/* -- createVersionStream -- */
db.createVersionStream = function (key, options) {
if (key == null) throw new Error("Key required for createVersionStream")
options = options || {}
// Ignore start/min end/max
options.start = options.min = options.end = options.max = undefined
options._start = (options.minVersion != null)
? makeKey(sep, key, options.minVersion)
: key + sep + sep
options._end = (options.maxVersion != null)
? makeKey(sep, key, options.maxVersion)
: key + sep
return db.createReadStream(options)
}
db.versionStream = db.createVersionStream
/* -- createWriteStream -- */
db.createWriteStream = function (options) {
options = options || {}
var transform = through({objectMode: true}, function (record, encoding, cb) {
var version = (record.version != null) ? record.version : self.defaultVersion()
// Important to make a copy here in case we're saving this in multiple places.
var insert = {type: record.type, key: makeKey(sep, record.key, version), value: record.value}
this.push(insert)
cb()
})
var ws = parent.createWriteStream(options)
transform.pipe(ws)
return transform
}
db.writeStream = db.createWriteStream
/* -- close -- */
db.close = function (cb) {
if (this.gc) this.gc.stop()
return parent.close()
}
}