diff --git a/index.js b/index.js index 58a43bc8..ab7f5935 100644 --- a/index.js +++ b/index.js @@ -264,7 +264,8 @@ module.exports = class Hypercore extends EventEmitter { this.encryption = o.encryption this.writable = this._isWritable() this.autoClose = o.autoClose - if (o.state) this.state = o.state.clone() + + if (o.state) this.state = this.snapshotted ? o.state.snapshot() : o.state.ref() if (o.core) this.tracer.setParent(o.core.tracer) @@ -321,7 +322,7 @@ module.exports = class Hypercore extends EventEmitter { if (opts.name) { // todo: need to make named sessions safe before ready // atm we always copy the state in passCapabilities - await this.state.close() + await this.state.unref() this.state = await this.core.createSession(opts.name, opts.checkout, opts.refresh) if (opts.checkout !== undefined) { @@ -486,7 +487,7 @@ module.exports = class Hypercore extends EventEmitter { this._findingPeers = 0 if (this.sessions.length || this.state.active > 1) { - await this.state.close() + await this.state.unref() // if this is the last session and we are auto closing, trigger that first to enforce error handling if (this.sessions.length === 1 && this.state.active === 1 && this.autoClose) await this.sessions[0].close(err) @@ -838,7 +839,6 @@ module.exports = class Hypercore extends EventEmitter { this.tracer.trace('get', { index }) if (this.closing !== null) throw SESSION_CLOSED() - if (this._snapshot !== null && index >= this._snapshot.compatLength) throw SNAPSHOT_NOT_AVAILABLE() const encoding = (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding @@ -889,32 +889,34 @@ module.exports = class Hypercore extends EventEmitter { async _get (index, opts) { if (this.core.isFlushing) await this.core.flushed() - const reader = this.state.storage.createReadBatch() - const promise = reader.getBlock(index) - reader.tryFlush() + let block = await readBlock(this.state.createReadBatch(), index) - let block = await promise + // snapshot should check if core has block + if (block === null && this._snapshot !== null) { + checkSnapshot(this._snapshot, index) + block = await readBlock(this.core.state.createReadBatch(), index) + checkSnapshot(this._snapshot, index) + } if (block !== null) { if (this.cache) this.cache.set(index, Promise.resolve(block)) - } else { - if (!this._shouldWait(opts, this.wait)) return null + return block + } - if (opts && opts.onwait) opts.onwait(index, this) - if (this.onwait) this.onwait(index, this) + if (!this._shouldWait(opts, this.wait)) return null - const activeRequests = (opts && opts.activeRequests) || this.activeRequests + if (opts && opts.onwait) opts.onwait(index, this) + if (this.onwait) this.onwait(index, this) - const req = this.replicator.addBlock(activeRequests, index) - req.snapshot = index < this.length + const activeRequests = (opts && opts.activeRequests) || this.activeRequests - const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout - if (timeout) req.context.setTimeout(req, timeout) + const req = this.replicator.addBlock(activeRequests, index) + req.snapshot = index < this.length - block = this._cacheOnResolve(index, req.promise, this.state.tree.fork) - } + const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout + if (timeout) req.context.setTimeout(req, timeout) - return block + return this._cacheOnResolve(index, req.promise, this.state.tree.fork) } async restoreBatch (length, blocks) { @@ -930,6 +932,8 @@ module.exports = class Hypercore extends EventEmitter { ? unslab(resolved) : resolved + if (this._snapshot !== null) checkSnapshot(this._snapshot, index) + if (this.cache && fork === this.core.tree.fork) { this.cache.set(index, Promise.resolve(block)) } @@ -1151,3 +1155,13 @@ function createCache (cache) { function isValidIndex (index) { return index === 0 || index > 0 } + +function checkSnapshot (snapshot, index) { + if (index >= snapshot.compatLength) throw SNAPSHOT_NOT_AVAILABLE() +} + +function readBlock (reader, index) { + const promise = reader.getBlock(index) + reader.tryFlush() + return promise +} diff --git a/lib/core.js b/lib/core.js index d025e789..c48cf51f 100644 --- a/lib/core.js +++ b/lib/core.js @@ -113,10 +113,12 @@ class Update { } class SessionState { - constructor (core, storage, blocks, tree, bitfield, treeLength) { + constructor (core, storage, blocks, tree, bitfield, treeLength, snapshot) { this.core = core this.storage = storage + this.storageSnapshot = snapshot || null + this.mutex = new Mutex() this.blocks = blocks @@ -124,6 +126,7 @@ class SessionState { this.bitfield = bitfield this.treeLength = treeLength + this.active = 1 this._onflush = null @@ -131,22 +134,48 @@ class SessionState { this._activeBatch = null } + get isSnapshot () { + return this.storageSnapshot !== null + } + get isDefault () { return this.core.state === this } - async close () { + async unref () { if (--this.active > 0) return Promise.resolve() + if (this.storageSnapshot) this.storageSnapshot.destroy() + await this.storage.close() await this.mutex.destroy(new Error('Closed')) } - clone () { + ref () { this.active++ return this } + snapshot () { + const snapshot = this.storage.snapshot() + + const s = new SessionState( + this.core, + this.storage, + this.blocks, + this.tree, + this.bitfield, + this.treeLength, + snapshot + ) + + return s + } + + createReadBatch () { + return this.storage.createReadBatch({ snapshot: this.storageSnapshot }) + } + _clearActiveBatch (err) { if (!this._activeBatch) return @@ -159,7 +188,7 @@ class SessionState { } createUpdate () { - assert(!this._activeBatch) + assert(!this._activeBatch && !this.isSnapshot) this._activeBatch = this.storage.createWriteBatch() return new Update(this._activeBatch, this.bitfield, this.core.header, this) @@ -391,7 +420,7 @@ module.exports = class Core { this.sessions = sessions this.globalCache = globalCache - this.state = new SessionState(this, storage, this.blocks, tree, bitfield, tree.length) + this.state = new SessionState(this, storage, this.blocks, tree, bitfield, tree.length, null) this._manifestFlushed = !!header.manifest this._maxOplogSize = 65536 @@ -420,7 +449,7 @@ module.exports = class Core { length: (length === treeLength || !treeInfo) ? treeLength : treeInfo.length }) - return new SessionState(this, storage, this.blocks, tree, bitfield, treeLength) + return new SessionState(this, storage, this.blocks, tree, bitfield, treeLength, null) } static async open (db, opts = {}) { @@ -1054,7 +1083,7 @@ module.exports = class Core { async close () { this.closed = true - await this.state.close() // TODO: add option where the storage is NOT closed for corestore + await this.state.unref() // TODO: add option where the storage is NOT closed for corestore } }