From 8c5feb9f33931555ca5681718ab943fd5981b82f Mon Sep 17 00:00:00 2001 From: Christophe Diederichs <45171645+chm-diederichs@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:54:14 +0200 Subject: [PATCH] rocksdb: add named sessions (#539) * define scoped states per session * return byteLength from updated batch * pass length to createNamedSession * add tree method for merging one tree into another * add core method for committing state * state stores treeLength * session gets from state * handle case when no tree nodes are added * fix session get and truncate * core truncate passes entire state down * batch has all info after being reconciled * update batch tests * session uses state props * pass whole batch to be flushed * copy over bitfield to named session * minor corrections * update batch tests to be on named sessions * update core tests * make sure we pass state to createUpdate * add checkout option for batches * optionally pass discovery key to core * each state has an independent mutex * bitfield should full copy entire buffer * user data is available sync for default state * named session is always writable * commit can specify length * corestore owns storage * update flushed length prop * expose restore batch on main class * encryption should use session state * rebase main * fix storage usage * test core length is increasing * standard fixes * use state props and pass tree to multisig * fix bitfield page copy * update tests * move to session state to dedicated abstraction * pass parent state as capability * ensure we have correct treeLength and up to date bitfield when opening state * only write each bitfield page once per flush * truncate and clear mutate treeLength * fixes for batch tests * enable batch tests * overwrite if session when opts.refresh is set * enable batch clear test * close storage when we close the core * storage closes automatically * auto teardown any created core * we have to pass an error to mutex destruction... * close all test cores * more missing close * more closes * missing session close * close db in createIfMissing test * more closes * make sure all sessions are closed too * checkout should only truncate named session * state tracks active sessions on open and close * screen for failing test * core closes state * close existing state when creating named session * missing session close * more closes * pass runner to helper * close core instead * close state first and fix teardown order * close state last * missing close * missing close * missing close --------- Co-authored-by: Mathias Buus --- index.js | 93 ++++-- lib/bit-interlude.js | 46 ++- lib/bitfield.js | 19 ++ lib/core.js | 737 ++++++++++++++++++++++++------------------ lib/merkle-tree.js | 81 +++++ lib/replicator.js | 12 +- lib/verifier.js | 2 +- test/all.js | 2 +- test/basic.js | 76 ++++- test/batch.js | 484 +++++++++++++++++---------- test/bitfield.js | 2 +- test/cache.js | 10 + test/clear.js | 5 + test/compat.js | 2 + test/core.js | 118 +++---- test/encryption.js | 20 ++ test/helpers/index.js | 10 +- test/manifest.js | 14 +- test/merkle-tree.js | 2 +- test/preload.js | 8 + test/replicate.js | 17 +- test/sessions.js | 20 ++ test/snapshots.js | 8 + test/storage.js | 4 + test/timeouts.js | 12 + 25 files changed, 1168 insertions(+), 636 deletions(-) diff --git a/index.js b/index.js index 81fdbd05..1c0f865f 100644 --- a/index.js +++ b/index.js @@ -60,6 +60,7 @@ module.exports = class Hypercore extends EventEmitter { this.storage = null this.crypto = opts.crypto || hypercoreCrypto this.core = null + this.state = null this.replicator = null this.encryption = null this.extensions = new Map() @@ -241,7 +242,6 @@ module.exports = class Hypercore extends EventEmitter { _addSession (s) { this.sessions.push(s) - if (this.core) this.core.active++ } async setEncryptionKey (encryptionKey, opts) { @@ -264,6 +264,7 @@ module.exports = class Hypercore extends EventEmitter { this.encryption = o.encryption this.writable = this._isWritable() this.autoClose = o.autoClose + if (o.state) this.state = o.state.clone() if (o.core) this.tracer.setParent(o.core.tracer) @@ -317,6 +318,17 @@ module.exports = class Hypercore extends EventEmitter { ensureEncryption(this, opts) } + if (opts.name) { + // todo: need to make named sessions safe before ready + // atm we always copy the state in passCapabilities + await this.state.close() + this.state = await this.core.createSession(opts.name, opts.checkout, opts.refresh) + + if (opts.checkout !== undefined) { + await this.state.truncate(opts.checkout, this.fork) + } + } + if (opts.manifest && !this.core.header.manifest) { await this.core.setManifest(opts.manifest) } @@ -367,6 +379,7 @@ module.exports = class Hypercore extends EventEmitter { sessions: this.sessions, createIfMissing: opts.createIfMissing, readonly: unlocked, + discoveryKey: opts.discoveryKey, overwrite: opts.overwrite, key, keyPair: opts.keyPair, @@ -379,8 +392,10 @@ module.exports = class Hypercore extends EventEmitter { }) this.tracer.setParent(this.core.tracer) + this.state = this.core.state + if (opts.userData) { - const batch = this.core.storage.createWriteBatch() + const batch = this.state.storage.createWriteBatch() for (const [key, value] of Object.entries(opts.userData)) { this.core.userData(batch, key, value) } @@ -411,18 +426,18 @@ module.exports = class Hypercore extends EventEmitter { _getSnapshot () { if (this.sparse) { return { - length: this.core.tree.length, - byteLength: this.core.tree.byteLength, - fork: this.core.tree.fork, - compatLength: this.core.tree.length + length: this.state.tree.length, + byteLength: this.state.tree.byteLength, + fork: this.state.tree.fork, + compatLength: this.state.tree.length } } return { - length: this.core.header.hints.contiguousLength, + length: this.state.header.hints.contiguousLength, byteLength: 0, - fork: this.core.tree.fork, - compatLength: this.core.header.hints.contiguousLength + fork: this.state.tree.fork, + compatLength: this.state.header.hints.contiguousLength } } @@ -451,7 +466,6 @@ module.exports = class Hypercore extends EventEmitter { if (i === -1) return this.sessions.splice(i, 1) - this.core.active-- this.readable = false this.writable = false this.closed = true @@ -471,10 +485,13 @@ module.exports = class Hypercore extends EventEmitter { this._findingPeers = 0 - if (this.sessions.length || this.core.active > 0) { + if (this.sessions.length || this.state.active > 1) { + await this.state.close() + // if this is the last session and we are auto closing, trigger that first to enforce error handling - if (this.sessions.length === 1 && this.core.active === 1 && this.autoClose) await this.sessions[0].close(err) + if (this.sessions.length === 1 && this.state.active === 1 && this.autoClose) await this.sessions[0].close(err) // emit "fake" close as this is a session + this.emit('close', false) return } @@ -538,11 +555,11 @@ module.exports = class Hypercore extends EventEmitter { if (this._snapshot) return this._snapshot.length if (this.core === null) return 0 if (!this.sparse) return this.contiguousLength - return this.core.tree.length + return this.state.tree.length } - get indexedLength () { - return this.length + get flushedLength () { + return this.state === this.core.state ? this.core.tree.length : this.state.treeLength } /** @@ -552,7 +569,7 @@ module.exports = class Hypercore extends EventEmitter { if (this._snapshot) return this._snapshot.byteLength if (this.core === null) return 0 if (!this.sparse) return this.contiguousByteLength - return this.core.tree.byteLength - (this.core.tree.length * this.padding) + return this.state.tree.byteLength - (this.state.tree.length * this.padding) } get contiguousLength () { @@ -711,9 +728,7 @@ module.exports = class Hypercore extends EventEmitter { async setUserData (key, value, { flush = false } = {}) { if (this.opened === false) await this.opening - const batch = this.core.storage.createWriteBatch() - this.core.userData(batch, key, value) - await batch.flush() + await this.state.setUserData(key, value) } async getUserData (key) { @@ -725,7 +740,7 @@ module.exports = class Hypercore extends EventEmitter { } createTreeBatch () { - return this.core.tree.batch() + return this.state.tree.batch() } findingPeers () { @@ -810,9 +825,9 @@ module.exports = class Hypercore extends EventEmitter { if (this.opened === false) await this.opening if (!isValidIndex(start) || !isValidIndex(end)) throw ASSERTION('has range is invalid') - if (end === start + 1) return this.core.bitfield.get(start) + if (end === start + 1) return this.state.bitfield.get(start) - const i = this.core.bitfield.firstUnset(start) + const i = this.state.bitfield.firstUnset(start) return i === -1 || i >= end } @@ -861,7 +876,7 @@ module.exports = class Hypercore extends EventEmitter { if (start >= end) return cleared if (start >= this.length) return cleared - await this.core.clear(start, end, cleared) + await this.state.clear(start, end, cleared) return cleared } @@ -876,8 +891,8 @@ module.exports = class Hypercore extends EventEmitter { if (this.core.isFlushing) await this.core.flushed() - if (this.core.bitfield.get(index)) { - const reader = this.core.storage.createReadBatch() + if (this.state.bitfield.get(index)) { + const reader = this.state.storage.createReadBatch() block = this.core.blocks.get(reader, index) if (this.cache) this.cache.set(index, block) @@ -896,12 +911,17 @@ module.exports = class Hypercore extends EventEmitter { const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout if (timeout) req.context.setTimeout(req, timeout) - block = this._cacheOnResolve(index, req.promise, this.core.tree.fork) + block = this._cacheOnResolve(index, req.promise, this.state.tree.fork) } return block } + async restoreBatch (length, blocks) { + if (this.opened === false) await this.opening + return this.state.tree.restoreBatch(length) + } + async _cacheOnResolve (index, req, fork) { const resolved = await req @@ -970,27 +990,30 @@ module.exports = class Hypercore extends EventEmitter { if (this.opened === false) await this.opening const { - fork = this.core.tree.fork + 1, + fork = this.state.tree.fork + 1, keyPair = this.keyPair, signature = null } = typeof opts === 'number' ? { fork: opts } : opts + const isDefault = this.state === this.core.state const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey)) - if (writable === false && (newLength > 0 || fork !== this.core.tree.fork)) throw SESSION_NOT_WRITABLE() + if (isDefault && writable === false && (newLength > 0 || fork !== this.state.tree.fork)) throw SESSION_NOT_WRITABLE() - await this.core.truncate(newLength, fork, { keyPair, signature }) + await this.state.truncate(newLength, fork, { keyPair, signature }) // TODO: Should propagate from an event triggered by the oplog - this.replicator.updateAll() + if (this.state === this.core.state) this.replicator.updateAll() } async append (blocks, opts = {}) { if (this.opened === false) await this.opening + const isDefault = this.state === this.core.state + const { keyPair = this.keyPair, signature = null } = opts const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey)) - if (writable === false) throw SESSION_NOT_WRITABLE() + if (isDefault && writable === false) throw SESSION_NOT_WRITABLE() blocks = Array.isArray(blocks) ? blocks : [blocks] this.tracer.trace('append', { blocks }) @@ -1010,7 +1033,7 @@ module.exports = class Hypercore extends EventEmitter { } } - return this.core.append(buffers, { keyPair, signature, preappend }) + return this.state.append(buffers, { keyPair, signature, preappend }) } async treeHash (length) { @@ -1019,7 +1042,7 @@ module.exports = class Hypercore extends EventEmitter { length = this.core.tree.length } - const roots = await this.core.tree.getRoots(length) + const roots = await this.state.tree.getRoots(length) return this.crypto.tree(roots) } @@ -1105,8 +1128,8 @@ function toHex (buf) { } function preappend (blocks) { - const offset = this.core.tree.length - const fork = this.core.tree.fork + const offset = this.state.tree.length + const fork = this.state.tree.fork for (let i = 0; i < blocks.length; i++) { this.encryption.encrypt(offset + i, blocks[i], fork) diff --git a/lib/bit-interlude.js b/lib/bit-interlude.js index e4944c2e..eacfdc64 100644 --- a/lib/bit-interlude.js +++ b/lib/bit-interlude.js @@ -100,27 +100,47 @@ module.exports = class BitInterlude { this.ranges.push({ start, end }) } - flush (writer) { - for (const { start, end } of this.ranges) { - let index = start + flush (writer, debug) { + if (!this.ranges.length) return { ranges: [], drop: this.drop } - while (index < end) { - const page = this.bitfield.getPage(index, this.drop === false) + let index = this.ranges[0].start + const final = this.ranges[this.ranges.length - 1].end - const buf = b4a.allocUnsafe(page.bitfield.byteLength) - buf.set(page.bitfield) + let i = 0 - const last = (page.index + 1) * (buf.byteLength << 3) - const stop = end < last ? end : last + while (index < final) { + const page = this.bitfield.getPage(index, this.drop === false) + const buf = b4a.allocUnsafe(page.bitfield.byteLength) - const offset = page.index * buf.byteLength << 3 + const view = new DataView( + buf.buffer, + buf.byteOffset, + buf.byteLength + ) - quickbit.fill(buf, this.drop === false, index - offset, stop - offset) + for (let i = 0; i < page.bitfield.length; i++) { + view.setUint32(i * 4, page.bitfield[i], true) + } + + const last = (page.index + 1) * (buf.byteLength << 3) + const offset = page.index * buf.byteLength << 3 + + while (i < this.ranges.length) { + const { start, end } = this.ranges[i] + + const from = start < index ? index : start + const to = end < last ? end : last - writer.putBitfieldPage(page.index, buf) + quickbit.fill(buf, this.drop === false, from - offset, to - offset) - index = stop + index = to + + if (to === last) break + + i++ } + + writer.putBitfieldPage(page.index, buf) } return { diff --git a/lib/bitfield.js b/lib/bitfield.js index 90f840ce..d05f3983 100644 --- a/lib/bitfield.js +++ b/lib/bitfield.js @@ -210,6 +210,10 @@ module.exports = class Bitfield { } } + static from (bitfield) { + return new Bitfield(bitfield.toBuffer(bitfield._pages.maxLength * BITS_PER_PAGE)) + } + toBuffer (length) { const pages = Math.ceil(length / BITS_PER_PAGE) const buffer = b4a.allocUnsafe(pages * BYTES_PER_PAGE) @@ -242,6 +246,21 @@ module.exports = class Bitfield { return p || null } + merge (bitfield, length) { + let i = 0 + + while (i < length) { + const start = bitfield.firstSet(i) + i = bitfield.firstUnset(start) + + if (i === -1 || i > length) i = length + + this.setRange(start, i, true) + + if (i >= length) break + } + } + get (index) { const j = index & (BITS_PER_PAGE - 1) const i = (index - j) / BITS_PER_PAGE diff --git a/lib/core.js b/lib/core.js index 8c346d0f..7c37ecf4 100644 --- a/lib/core.js +++ b/lib/core.js @@ -19,28 +19,31 @@ const CORE = Symbol.for('core') const CONTIG = Symbol.for('contig') const TREE = Symbol.for('tree') const BITFIELD = Symbol.for('bitfield') +const USER_DATA = Symbol.for('user-data') class Update { - constructor (batch, bitfield, header) { + constructor (batch, bitfield, header, state) { this.batch = batch this.bitfield = new BitInterlude(bitfield) + this.state = state + this.contiguousLength = header.hints.contiguousLength this.tree = null - this._updates = [] + this.updates = [] this._coreUpdates = [] } async flushBitfield () { const update = await this.bitfield.flush(this.batch) - if (update) this._updates.push({ type: BITFIELD, update }) + if (update) this.updates.push({ type: BITFIELD, update }) } flushTreeBatch (batch) { const update = batch.commit(this.batch) - this._updates.push({ type: TREE, update }) + this.updates.push({ type: TREE, update }) if (batch.upgraded) { this.tree = { @@ -52,6 +55,11 @@ class Update { } } + setUserData (key, value) { + this.updates.push({ type: USER_DATA, update: { key, value } }) + this.batch.setUserData(key, value) + } + coreUpdate (update) { let { bitfield, status, value, from } = update @@ -74,17 +82,290 @@ class Update { if (this.tree) { this.batch.setCoreHead(this.tree) - this._updates.push({ type: HEAD, update: this.tree }) + this.updates.push({ type: HEAD, update: this.tree }) } // bitfield flushed before core updates for (const upd of this._coreUpdates) { - this._updates.push(upd) + this.updates.push(upd) } await this.batch.flush() - return this._updates + return this.updates + } + + async truncate (batch, from) { + const bitfield = { + drop: true, + start: batch.ancestors, + length: batch.treeLength - batch.ancestors + } + + this.bitfield.setRange(batch.ancestors, batch.treeLength, false) + + const status = (batch.length > batch.ancestors) ? 0b0011 : 0b0010 + + this.flushTreeBatch(batch) + this.coreUpdate({ status, bitfield, value: null, from }) + } +} + +class SessionState { + constructor (core, storage, blocks, tree, bitfield, treeLength) { + this.core = core + + this.storage = storage + this.mutex = new Mutex() + + this.blocks = blocks + this.tree = tree + this.bitfield = bitfield + + this.treeLength = treeLength + this.active = 1 + + this._onflush = null + this._flushing = null + this._activeBatch = null + } + + get isDefault () { + return this.core.state === this + } + + async close () { + if (--this.active > 0) return Promise.resolve() + + await this.storage.close() + await this.mutex.destroy(new Error('Closed')) + } + + clone () { + this.active++ + return this + } + + _clearActiveBatch (err) { + if (!this._activeBatch) return + + if (this._onflush) this._onflush(err) + + this._onflush = null + this._flushing = null + + this._activeBatch = null + } + + createUpdate () { + assert(!this._activeBatch) + + this._activeBatch = this.storage.createWriteBatch() + return new Update(this._activeBatch, this.bitfield, this.core.header, this) + } + + async flushUpdate (u) { + const flushing = this._flushUpdateBatch(u) + + try { + if (!this._flushing) this._flushing = flushing + + await flushing + } finally { + this._clearActiveBatch(this) + } + } + + flushed () { + if (!this._activeBatch) return + + if (this._flushing) return this._flushing + + this._flushing = new Promise(resolve => { + this._onflush = resolve + }) + + return this._flushing + } + + async _flushUpdateBatch (u) { + await u.flush() + + if (!u.updates.length) return + + for (const { type, update } of u.updates) { + switch (type) { + case TREE: // tree + if (!this.isDefault) this.tree.onupdate(update) + break + + case BITFIELD: // bitfield + this.bitfield.onupdate(update) + break + } + } + + if (!this.isDefault) return + + this.core._processUpdates(u.updates) + } + + async setUserData (key, value) { + await this.mutex.lock() + + try { + const update = this.createUpdate() + update.setUserData(key, value) + + return await this.flushUpdate(update) + } finally { + this._clearActiveBatch() + this.mutex.unlock() + } + } + + async truncate (length, fork, { signature, keyPair } = {}) { + if (this.tree.prologue && length < this.tree.prologue.length) { + throw INVALID_OPERATION('Truncation breaks prologue') + } + + if (!keyPair && this.isDefault) keyPair = this.core.header.keyPair + + await this.mutex.lock() + + try { + const batch = await this.tree.truncate(length, fork) + + if (!signature && keyPair && length > 0) signature = this.core.verifier.sign(batch, keyPair) + if (signature) batch.signature = signature + + const update = this.createUpdate() + + // upsert compat manifest + if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair) + + await update.truncate(batch, null) + + if (batch.length < this.treeLength) this.treeLength = batch.length + + await this.flushUpdate(update) + } finally { + this._clearActiveBatch() + this.mutex.unlock() + } + } + + async clear (start, end, cleared) { + await this.mutex.lock() + + try { + const bitfield = { + start, + length: end - start, + drop: true + } + + const update = this.createUpdate() + + update.bitfield.setRange(start, end, false) + + start = this.bitfield.firstSet(start + 1) + + // TODO: verify this: + // start = state.bitfield.lastSet(start) + 1 + // end = state.bitfield.firstSet(end) + + if (end === -1) end = this.tree.length + if (start === -1 || start >= this.tree.length) return + + this.blocks.clear(update.batch, start, end - start) + + update.coreUpdate({ status: 0, bitfield, value: null, from: null }) + + if (start < this.treeLength) this.treeLength = start + + await this.flushUpdate(update) + } finally { + this._clearActiveBatch() + this.mutex.unlock() + } + } + + async append (values, { signature, keyPair, preappend } = {}) { + if (!keyPair && this.isDefault) keyPair = this.core.header.keyPair + + await this.mutex.lock() + + try { + const update = this.createUpdate() + + // upsert compat manifest + if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair) + + if (preappend) await preappend(values) + + if (!values.length) { + await this.flushUpdate(update) + return { length: this.tree.length, byteLength: this.tree.byteLength } + } + + const batch = this.tree.batch() + for (const val of values) batch.append(val) + + // only multisig can have prologue so signature is always present + if (this.tree.prologue && batch.length < this.tree.prologue.length) { + throw INVALID_OPERATION('Append is not consistent with prologue') + } + + if (!signature && keyPair) signature = this.core.verifier.sign(batch, keyPair) + if (signature) batch.signature = signature + + update.flushTreeBatch(batch) + update.bitfield.setRange(batch.ancestors, batch.length, true) + + this.blocks.putBatch(update.batch, this.tree.length, values) + + const bitfield = { + drop: false, + start: batch.ancestors, + length: values.length + } + + update.coreUpdate({ + bitfield, + status: 0b0001, + value: null, + from: null + }) + + await this.flushUpdate(update) + + return { length: batch.length, byteLength: batch.byteLength } + } finally { + this._clearActiveBatch() + this.mutex.unlock() + } + } + + async upgrade (start, end, batch, values, keyPair) { + await this.mutex.lock() + + const update = this.createUpdate() + + try { + // upsert compat manifest + if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair) + + this.blocks.putBatch(update.batch, start, values) + + update.bitfield.setRange(start, end, true) + update.flushTreeBatch(batch) + + await this.flushUpdate(update) + } finally { + this._clearActiveBatch() + this.mutex.unlock() + } } } @@ -110,6 +391,8 @@ module.exports = class Core { this.sessions = sessions this.globalCache = globalCache + this.state = new SessionState(this, storage, this.blocks, tree, bitfield, tree.length) + this._manifestFlushed = !!header.manifest this._maxOplogSize = 65536 this._autoFlush = 1 @@ -119,10 +402,27 @@ module.exports = class Core { this._bitfield = null this._verifies = null this._verifiesFlushed = null - this._mutex = new Mutex() this._legacy = legacy } + async createSession (name, length, overwrite) { + const treeLength = length === undefined ? this.tree.length : length + + const storage = await this.storage.registerBatch(name, treeLength, overwrite) + const treeInfo = await storage.getCoreHead() + const bitfield = await Bitfield.open(storage) + + bitfield.merge(this.bitfield, treeLength) + + const tree = await MerkleTree.open(storage, { + crypto: this.crypto, + prologue: this.tree.prologue, + length: (length === treeLength || !treeInfo) ? treeLength : treeInfo.length + }) + + return new SessionState(this, storage, this.blocks, tree, bitfield, treeLength) + } + static async open (db, opts = {}) { const discoveryKey = opts.discoveryKey || (opts.key && hypercoreCrypto.discoveryKey(opts.key)) const storage = db.get(discoveryKey) @@ -246,36 +546,38 @@ module.exports = class Core { return new this(storage, header, compat, crypto, tree, blocks, bitfield, verifier, opts.sessions || [], legacy, opts.globalCache || null, opts.onupdate || noop, opts.onconflict || noop) } - async audit () { - await this._mutex.lock() + async audit (state = this.state) { + await state.mutex.lock() try { - const update = this._createUpdate() + const update = state.createUpdate() const corrections = await audit(this, update) if (corrections.blocks || corrections.tree) { - await this._flushUpdate(update) + await state.flushUpdate(update) } return corrections } finally { - await this._mutex.unlock() + state._clearActiveBatch() + await state.mutex.unlock() } } async setManifest (manifest) { - await this._mutex.lock() + await this.state.mutex.lock() try { if (manifest && this.header.manifest === null) { if (!Verifier.isValidManifest(this.header.key, manifest)) throw INVALID_CHECKSUM('Manifest hash does not match') - const update = this._createUpdate() + const update = this.state.createUpdate() this._setManifest(update, Verifier.createManifest(manifest), null) - await this._flushUpdate(update) + await this.state.flushUpdate(update) } } finally { - this._mutex.unlock() + this.state._clearActiveBatch() + this.state.mutex.unlock() } } @@ -299,17 +601,17 @@ module.exports = class Core { } async copyPrologue (src, { additional = [] } = {}) { - await this._mutex.lock() + await this.state.mutex.lock() try { - await src._mutex.lock() + await src.mutex.lock() } catch (err) { - this._mutex.unlock() + this.state.mutex.unlock() throw err } try { - const update = this._createUpdate() + const update = this.state.createUpdate() const prologue = this.header.manifest && this.header.manifest.prologue if (!prologue) throw INVALID_OPERATION('No prologue present') @@ -400,157 +702,119 @@ module.exports = class Core { this.userData(update.batch, key, value) } - await this._flushUpdate(update) + await this.state.flushUpdate(update) } finally { - this._clearActiveBatch() - src._mutex.unlock() - this._mutex.unlock() + this.state._clearActiveBatch() + src.mutex.unlock() + this.state.mutex.unlock() } } // async flush () { - // await this._mutex.lock() + // await this.state.mutex.lock() // try { // this._manifestFlushed = true // this._autoFlush = 4 // await this._flushBitfield(writer) // } finally { - // this._mutex.unlock() + // this.state.mutex.unlock() // } // } get isFlushing () { - return !!(this._flushing || this._activeBatch) + return !!(this._flushing || this.state._activeBatch) } - async _flushUpdateBatch (u) { - const updates = await u.flush() - await this._flushUpdates(updates) - } - - _clearActiveBatch (err) { - if (this._onflush) this._onflush(err) - - this._onflush = null - this._flushing = null - this._activeBatch = null - } - - _createUpdate () { - assert(!this._activeBatch) - - this._activeBatch = this.storage.createWriteBatch() - - return new Update(this._activeBatch, this.bitfield, this.header) + flushed () { + return this.state.flushed() } - async _flushUpdate (u) { - const flushing = this._flushUpdateBatch(u) + async _processUpdates (updates) { + for (const { type, update } of updates) { + switch (type) { + case HEAD: { + this.header.tree = update + break + } - if (!this._flushing) this._flushing = flushing + case CORE: { // core + this.onupdate(update) + break + } - await flushing + case CONTIG: { // contig + this.header.hints.contiguousLength = update + break + } - this._clearActiveBatch() - } + case TREE: // tree + if (update.truncated) addReorgHint(this.header.hints.reorgs, this.tree, update) + this.tree.onupdate(update) + break - flushed () { - if (!this._activeBatch) return + case BITFIELD: // bitfield + if (this.skipBitfield !== null) this._updateSkipBitfield(update) + break - if (this._flushing) return this._flushing + case USER_DATA: { // user data + let exists = false + for (const entry of this.header.userData) { + if (entry.key !== update.key) continue - this._flushing = new Promise(resolve => { - this._onflush = resolve - }) + entry.value = update.value + exists = true + break + } - return this._flushing - } + if (exists) continue - _appendBlocks (writer, values) { - return this.blocks.putBatch(writer, this.tree.length, values) + this.header.userData.push({ key: update.key, value: update.value }) + break + } + } + } } _writeBlock (writer, index, value) { this.blocks.put(writer, index, value) } - userData (writer, key, value) { - return writer.setUserData(key, value) + userData (update, key, value) { + return update.setUserData(key, value) } - async truncate (length, fork, { signature, keyPair = this.header.keyPair } = {}) { - if (this.tree.prologue && length < this.tree.prologue.length) { - throw INVALID_OPERATION('Truncation breaks prologue') - } - - this.truncating++ - await this._mutex.lock() - - // upsert compat manifest - if (this.verifier === null && keyPair) this._setManifest(null, keyPair) - - try { - const batch = await this.tree.truncate(length, fork) - if (length > 0) batch.signature = signature || this.verifier.sign(batch, keyPair) - - const update = this._createUpdate() - await this._truncate(update, batch, null) - await this._flushUpdate(update) - } finally { - this.truncating-- - this._mutex.unlock() - } - } - - async clearBatch () { - await this._mutex.lock() + async commit (state, { signature, keyPair = this.header.keyPair, length = state.tree.length, treeLength = state.treeLength } = {}) { + if (this.tree.fork !== state.tree.fork) return null - try { - const update = this._createUpdate() - - const len = this.bitfield.findFirst(false, this.tree.length) - if (len <= this.tree.length) return - - const batch = await this.tree.truncate(this.tree.length, this.tree.fork) - - batch.signature = this.tree.signature // same sig + if (this.tree.length > state.tree.length) return null // TODO: partial commit in the future if possible - update.bitfield.setRange(batch.ancestors, len, false) - update.flushTreeBatch(batch) - - await this._flushUpdate() - } finally { - this._mutex.unlock() - } - } - - async clear (start, end, cleared) { - await this._mutex.lock() - - try { - const bitfield = { - start, - length: end - start, - drop: true + if (this.tree.length > treeLength) { + for (const root of this.tree.roots) { + const batchRoot = await state.tree.get(root.index) + if (batchRoot.size !== root.size || !b4a.equals(batchRoot.hash, root.hash)) { + return null + } } + } - const update = this._createUpdate() + const promises = [] - update.bitfield.setRange(start, end, false) + const reader = state.storage.createReadBatch() + for (let i = treeLength; i < length; i++) promises.push(reader.getBlock(i)) + reader.tryFlush() - start = this.bitfield.firstSet(start + 1) + const values = await Promise.all(promises) - if (end === -1) end = this.tree.length - if (start === -1 || start >= this.tree.length) return + const batch = await this.tree.reconcile(state.tree, length, treeLength) + if (batch.upgraded) batch.signature = signature || this.verifier.sign(batch, keyPair) - this.blocks.clear(update.batch, start, end - start) + await this.state.upgrade(treeLength, length, batch, values, keyPair) - update.coreUpdate({ status: 0, bitfield, value: null, from: null }) + state.treeLength = batch.length - await this._flushUpdate(update) - } finally { - this._clearActiveBatch() - this._mutex.unlock() + return { + length: batch.length, + byteLength: batch.byteLength } } @@ -573,128 +837,6 @@ module.exports = class Core { // }) // } - // async insertBatch (batch, values, { signature, keyPair = this.header.keyPair, pending = false, treeLength = batch.treeLength } = {}) { - // await this._mutex.lock() - - // try { - // // upsert compat manifest - // if (this.verifier === null && keyPair) this._setManifest(null, keyPair) - - // if (this.tree.fork !== batch.fork) return null - - // if (this.tree.length > batch.treeLength) { - // if (this.tree.length > batch.length) return null // TODO: partial commit in the future if possible - - // for (const root of this.tree.roots) { - // const batchRoot = await batch.get(root.index) - // if (batchRoot.size !== root.size || !b4a.equals(batchRoot.hash, root.hash)) { - // return null - // } - // } - // } - - // const writer = this._createWriteBatch() - - // const adding = batch.length - treeLength - - // batch.upgraded = !pending && batch.length > this.tree.length - // batch.treeLength = this.tree.length - // batch.ancestors = this.tree.length - // if (batch.upgraded && !pending) batch.signature = signature || this.verifier.sign(batch, keyPair) - - // if (pending === true) batch.upgraded = false - - // const treeUpgrade = batch.upgraded ? batch : null - // const bitfield = { - // drop: false, - // start: treeLength, - // length: adding - // } - - // await this.blocks.putBatch(writer, treeLength, adding < values.length ? values.slice(0, adding) : values) - - // this._setBitfieldRange(bitfield.start, bitfield.length, true) - - // this._flushTreeBatch(writer, batch) - - // if (batch.upgraded) { - // this.header.tree.length = batch.length - // this.header.tree.rootHash = batch.hash() - // this.header.tree.signature = batch.signature - // } - - // const status = (batch.upgraded ? 0b0001 : 0) | updateContig(this.header, bitfield, this._bitfield) - // if (!pending) { - // // we already commit this, and now we signed it, so tell others - // if (treeUpgrade && treeLength > batch.treeLength) { - // bitfield.start = batch.treeLength - // bitfield.length = treeLength - batch.treeLength - // } - - // this.onupdate(status, bitfield, null, null) - // } - - // if (this._shouldFlush()) await this._flushBitfield(writer) - // } finally { - // this._mutex.unlock() - // } - - // return { length: batch.length, byteLength: batch.byteLength } - // } - - async append (values, { signature, keyPair = this.header.keyPair, preappend } = {}) { - await this._mutex.lock() - - try { - const update = this._createUpdate() - - // upsert compat manifest - if (this.verifier === null && keyPair) this._setManifest(update, null, keyPair) - - if (preappend) await preappend(values) - - if (!values.length) { - await this._flushUpdate(update) - return { length: this.tree.length, byteLength: this.tree.byteLength } - } - - const batch = this.tree.batch() - for (const val of values) batch.append(val) - - // only multisig can have prologue so signature is always present - if (this.tree.prologue && batch.length < this.tree.prologue.length) { - throw INVALID_OPERATION('Append is not consistent with prologue') - } - - batch.signature = signature || this.verifier.sign(batch, keyPair) - - update.flushTreeBatch(batch) - update.bitfield.setRange(batch.ancestors, batch.length, true) - - const byteLength = this._appendBlocks(update.batch, values) - - const bitfield = { - drop: false, - start: batch.ancestors, - length: values.length - } - - update.coreUpdate({ - bitfield, - status: 0b0001, - value: null, - from: null - }) - - await this._flushUpdate(update) - - return { length: batch.length, byteLength } - } finally { - this._clearActiveBatch() - this._mutex.unlock() - } - } - _verifyBatchUpgrade (update, batch, manifest) { if (!this.header.manifest) { if (!manifest && this.compat) manifest = Verifier.defaultSignerManifest(this.header.key) @@ -716,9 +858,9 @@ module.exports = class Core { } async _verifyExclusive ({ batch, bitfield, value, manifest, from }) { - await this._mutex.lock() + await this.state.mutex.lock() - const update = this._createUpdate() + const update = this.state.createUpdate() try { this._verifyBatchUpgrade(update, batch, manifest) @@ -736,11 +878,11 @@ module.exports = class Core { update.coreUpdate({ status: 0b0001, bitfield, value, from }) update.flushTreeBatch(batch) - await this._flushUpdate(update) + await this.state.flushUpdate(update) } finally { - this._clearActiveBatch() + this.state._clearActiveBatch() this.updating = false - this._mutex.unlock() + this.state.mutex.unlock() } return true @@ -749,9 +891,9 @@ module.exports = class Core { async _verifyShared () { if (!this._verifies.length) return false - await this._mutex.lock() + await this.state.mutex.lock() - const update = this._createUpdate() + const update = this.state.createUpdate() const verifies = this._verifies this._verifies = null @@ -788,10 +930,10 @@ module.exports = class Core { update.flushTreeBatch(batch) } - await this._flushUpdate(update) + await this.state.flushUpdate(update) } finally { - this._clearActiveBatch() - this._mutex.unlock() + this.state._clearActiveBatch() + this.state.mutex.unlock() } return verifies[0] !== null @@ -805,12 +947,15 @@ module.exports = class Core { const batch = this.tree.verifyFullyRemote(proof) + await this.state.mutex.lock() + try { - const update = this._createUpdate() + const update = this.state.createUpdate() this._verifyBatchUpgrade(update, batch, proof.manifest) - await this._flushUpdate(update) + await this.state.flushUpdate(update) } catch { + this.state.mutex.unlock() return true } @@ -825,11 +970,11 @@ module.exports = class Core { async verifyReorg (proof) { const batch = await this.tree.reorg(proof) - const update = this._createUpdate() + const update = this.state.createUpdate() this._verifyBatchUpgrade(update, batch, proof.manifest) - await this._flushUpdate(update) + await this.state.flushUpdate(update) return batch } @@ -874,38 +1019,24 @@ module.exports = class Core { if (!batch.commitable()) return false this.truncating++ - await this._mutex.lock() + await this.state.mutex.lock() try { if (!batch.commitable()) return false - const update = this._createUpdate() - await this._truncate(update, batch, from) - await this._flushUpdate(update) + const update = this.state.createUpdate() + await update.truncate(batch, from) + + await this.state.flushUpdate(update) } finally { - this._clearActiveBatch() + this.state._clearActiveBatch() this.truncating-- - this._mutex.unlock() + this.state.mutex.unlock() } return true } - async _truncate (update, batch, from) { - const bitfield = { - drop: true, - start: batch.ancestors, - length: this.tree.length - batch.ancestors - } - - update.bitfield.setRange(batch.ancestors, this.tree.length, false) - - const status = (batch.length > batch.ancestors) ? 0b0011 : 0b0010 - - update.flushTreeBatch(batch) - update.coreUpdate({ status, bitfield, value: null, from }) - } - openSkipBitfield () { if (this.skipBitfield !== null) return this.skipBitfield this.skipBitfield = new RemoteBitfield() @@ -915,39 +1046,6 @@ module.exports = class Core { return this.skipBitfield } - async _flushUpdates (updates) { - if (!updates.length) return - - for (const { type, update } of updates) { - switch (type) { - case HEAD: { - this.header.tree = update - break - } - - case CORE: { // core - this.onupdate(update) - break - } - - case CONTIG: { // contig - this.header.hints.contiguousLength = update - break - } - - case TREE: // tree - if (update.truncated) addReorgHint(this.header.hints.reorgs, this.tree, update) - this.tree.onupdate(update) - break - - case BITFIELD: // bitfield - this.bitfield.onupdate(update) - if (this.skipBitfield !== null) this._updateSkipBitfield(update) - break - } - } - } - _updateSkipBitfield ({ ranges, drop }) { for (const { start, end } of ranges) { this.skipBitfield.setRange(start, end - start, drop === false) @@ -956,8 +1054,7 @@ module.exports = class Core { async close () { this.closed = true - await this._mutex.destroy() - await this.storage.close() // TODO: add option where the storage is NOT closed for corestore + await this.state.close() // TODO: add option where the storage is NOT closed for corestore } } diff --git a/lib/merkle-tree.js b/lib/merkle-tree.js index 16afb9aa..23c4a644 100644 --- a/lib/merkle-tree.js +++ b/lib/merkle-tree.js @@ -458,12 +458,14 @@ class TreeProof { const [pNode, pSeek, pUpgrade, pAdditional] = await settleProof(this.pending) if (this.block) { + if (pNode === null) throw INVALID_OPERATION('Invalid block request') result.block = { index: this.block.index, value: null, // populated upstream, alloc it here for simplicity nodes: pNode } } else if (this.hash) { + if (pNode === null) throw INVALID_OPERATION('Invalid block request') result.hash = { index: this.hash.index, nodes: pNode @@ -527,6 +529,85 @@ module.exports = class MerkleTree { return batch } + async reconcile (tree, length, treeLength) { + const nodes = [] + const data = [] + + const from = this.length * 2 + const to = length * 2 + + const reader = tree.storage.createReadBatch() + + // upgrade + for (const ite = flat.iterator(0); ite.fullRoot(to); ite.nextTree()) { + // check if they already have the node + if (ite.index + ite.factor / 2 < from) continue + + if (nodes.length === 0 && ite.contains(from - 2)) { + const root = ite.index + const target = from - 2 + + ite.seek(target) + + while (ite.index !== root) { + ite.sibling() + if (ite.index > target) { + nodes.push(reader.getTreeNode(ite.index)) + } + ite.parent() + } + + continue + } + + nodes.push(reader.getTreeNode(ite.index)) + } + + for (let i = treeLength * 2; i < length * 2; i++) { + data.push(reader.getTreeNode(i)) + } + + reader.tryFlush() + + const batch = this.batch() + + // copy tree nodes + for (const node of await Promise.all(data)) { + if (node) batch.nodes.push(node) + } + + // no nodes to add + if (!nodes.length) return batch + + const q = new NodeQueue(await Promise.all(nodes), null) + + let grow = batch.roots.length > 0 + let i = 0 + + for (const ite = flat.iterator(0); ite.fullRoot(to); ite.nextTree()) { + if (i < batch.roots.length && batch.roots[i].index === ite.index) { + i++ + continue + } + + if (grow) { + grow = false + const root = ite.index + if (i < batch.roots.length) { + ite.seek(batch.roots[batch.roots.length - 1].index) + while (ite.index !== root) { + batch.appendRoot(q.shift(ite.sibling()), ite) + } + continue + } + } + + batch.appendRoot(q.shift(ite.index), ite) + } + + return batch + } + seek (bytes, padding) { return new ByteSeeker(this, bytes, padding) } diff --git a/lib/replicator.js b/lib/replicator.js index 3d0f3920..d6469eb9 100644 --- a/lib/replicator.js +++ b/lib/replicator.js @@ -1534,7 +1534,9 @@ module.exports = class Replicator { for (const peer of this.peers) peer.signalUpgrade() if (this._blocks.isEmpty() === false) this._resolveBlocksLocally() if (this._upgrade !== null) this._resolveUpgradeRequest(null) - if (this._ranges.length !== 0 || this._seeks.length !== 0) this._updateNonPrimary(true) + if (!this._blocks.isEmpty() || this._ranges.length !== 0 || this._seeks.length !== 0) { + this._updateNonPrimary(true) + } } // Called externally when a conflict has been detected and verified @@ -2223,17 +2225,17 @@ module.exports = class Replicator { } _closeSession () { - this.core.active-- + this.core.state.active-- // we were the last active ref, so lets shut things down - if (this.core.active === 0 && this.core.sessions.length === 0) { + if (this.core.state.active === 0 && this.core.sessions.length === 0) { this.destroy() this.core.close().catch(safetyCatch) return } // in case one session is still alive but its been marked for auto close also kill it - if (this.core.sessions.length === 1 && this.core.active === 1 && this.core.sessions[0].autoClose) { + if (this.core.sessions.length === 1 && this.core.state.active === 1 && this.core.sessions[0].autoClose) { this.core.sessions[0].close().catch(safetyCatch) } } @@ -2244,7 +2246,7 @@ module.exports = class Replicator { attachTo (protomux, useSession) { if (useSession) { - this.core.active++ + this.core.state.active++ } const makePeer = this._makePeer.bind(this, protomux, useSession) diff --git a/lib/verifier.js b/lib/verifier.js index dc2f2640..4c6c484a 100644 --- a/lib/verifier.js +++ b/lib/verifier.js @@ -153,7 +153,7 @@ module.exports = class Verifier { } } - throw new BAD_ARGUMENT('Public key is not a declared signer') + throw BAD_ARGUMENT('Public key is not a declared signer') } assemble (inputs) { diff --git a/test/all.js b/test/all.js index a7c5102c..dc6a6a13 100644 --- a/test/all.js +++ b/test/all.js @@ -8,7 +8,7 @@ async function runTests () { test.pause() await import('./basic.js') // todo: implement storageInfo API - // await import('./batch.js') // todo: implement batch api + await import('./batch.js') // todo: implement batch api await import('./bitfield.js') await import('./cache.js') await import('./clear.js') // todo: replace Info.bytesUsed API diff --git a/test/basic.js b/test/basic.js index 4f5466b1..67b529cf 100644 --- a/test/basic.js +++ b/test/basic.js @@ -18,7 +18,9 @@ test('basic', async function (t) { }) await core.append('hello') + t.is(core.length, 1) await core.append('world') + t.is(core.length, 2) const info = await core.info() @@ -37,6 +39,8 @@ test('core id', async function (t) { await core.ready() t.is(core.id, 'cfosnambcfosnambcfosnambcfosnambcfosnambcfosnambcfoo') + + await core.close() }) test('session id', async function (t) { @@ -50,6 +54,9 @@ test('session id', async function (t) { await session.ready() t.is(session.id, 'cfosnambcfosnambcfosnambcfosnambcfosnambcfosnambcfoo') + + await core.close() + await session.close() }) test('session', async function (t) { @@ -60,6 +67,8 @@ test('session', async function (t) { await session.append('test') t.alike(await core.get(0), b4a.from('test')) t.alike(await session.get(0), b4a.from('test')) + + await session.close() }) test('close', async function (t) { @@ -99,6 +108,8 @@ test('storage options', async function (t) { const core = new Hypercore({ storage: db }) await core.append('hello') t.alike(await core.get(0), b4a.from('hello')) + + await core.close() }) test.skip( @@ -111,6 +122,8 @@ test.skip( t.is(core.key, key) t.pass('creating a core with more than 32 byteLength key did not throw') + + await core.close() } ) @@ -119,6 +132,7 @@ test('createIfMissing', async function (t) { const core = new Hypercore(db, { createIfMissing: false }) await t.exception(core.ready()) + await db.close() }) test('reopen and overwrite', async function (t) { @@ -128,18 +142,22 @@ test('reopen and overwrite', async function (t) { const core = new Hypercore(await open()) await core.ready() + await core.close() const key = core.key const reopen = new Hypercore(await open()) await reopen.ready() t.alike(reopen.key, key, 'reopened the core') + await reopen.close() const overwritten = new Hypercore(await open(), { overwrite: true }) await overwritten.ready() t.unlike(overwritten.key, key, 'overwrote the core') + await overwritten.close() + async function open () { if (storage) await storage.close() storage = await createStorage(t, dir) @@ -159,6 +177,7 @@ test('truncate event has truncated-length and fork', async function (t) { await core.append(['a', 'b', 'c']) await core.truncate(2) + await core.close() }) test('treeHash gets the tree hash at a given core length', async function (t) { @@ -177,6 +196,8 @@ test('treeHash gets the tree hash at a given core length', async function (t) { for (let i = 0; i < 10; i++) { t.alike(await core.treeHash(i), hashes[i]) } + + await core.close() }) test('treeHash with default length', async function (t) { @@ -190,6 +211,9 @@ test('treeHash with default length', async function (t) { await core.append('a') t.unlike(await core.treeHash(), await core2.treeHash()) + + await core.close() + await core2.close() }) test('snapshot locks the state', async function (t) { @@ -209,6 +233,10 @@ test('snapshot locks the state', async function (t) { t.is(a.length, 0) t.is(b.length, 1) + + await core.close() + await a.close() + await b.close() }) test('downloading local range', async function (t) { @@ -225,6 +253,8 @@ test('downloading local range', async function (t) { await range.destroy() t.pass('did not throw') + + await core.close() }) test('read ahead', async function (t) { @@ -241,6 +271,8 @@ test('read ahead', async function (t) { await core.append('b') t.alike(await blk, 'b') + + await core.close() }) test('defaults for wait', async function (t) { @@ -271,6 +303,7 @@ test('defaults for wait', async function (t) { t.is(await s2.get(1), null) await s.close() + await s2.close() await core.close() }) @@ -292,6 +325,8 @@ test('has', async function (t) { t.ok(await core.has(i), `has ${i}`) } } + + await core.close() }) test('has range', async function (t) { @@ -306,6 +341,8 @@ test('has range', async function (t) { t.absent(await core.has(0, 5), 'does not have 0 to 4') t.ok(await core.has(0, 2), 'has 0 to 1') t.ok(await core.has(3, 5), 'has 3 to 4') + + await core.close() }) test.skip('storage info', async function (t) { @@ -315,6 +352,8 @@ test.skip('storage info', async function (t) { const info = await core.info({ storage: true }) t.snapshot(info.storage) + + await core.close() }) test('storage info, off by default', async function (t) { @@ -324,16 +363,20 @@ test('storage info, off by default', async function (t) { const info = await core.info() t.is(info.storage, null) + + await core.close() }) test('indexedLength mirrors core length (linearised core compat)', async function (t) { const core = await create(t) t.is(core.length, 0) - t.is(core.indexedLength, core.length) + t.is(core.flushedLength, core.length) await core.append(['a', 'b']) t.is(core.length, 2) - t.is(core.indexedLength, core.length) + t.is(core.flushedLength, core.length) + + await core.close() }) test('key is set sync', async function (t) { @@ -350,10 +393,10 @@ test('key is set sync', async function (t) { const core4 = new Hypercore(dir4, { }) // flush all db ops before teardown - t.teardown(() => core1.ready()) - t.teardown(() => core2.ready()) - t.teardown(() => core3.ready()) - t.teardown(() => core4.ready()) + t.teardown(() => core1.close()) + t.teardown(() => core2.close()) + t.teardown(() => core3.close()) + t.teardown(() => core4.close()) t.alike(core1.key, key) t.is(core2.key, null) @@ -375,6 +418,8 @@ test('disable writable option', async function (t) { } catch (err) { t.pass(err.code, 'SESSION_NOT_WRITABLE') } + + await core.close() }) test('disable session writable option', async function (t) { @@ -396,6 +441,9 @@ test('disable session writable option', async function (t) { } catch (err) { t.pass(err.code, 'SESSION_NOT_WRITABLE') } + + await session.close() + await core.close() }) test('session of a session with the writable option disabled', async function (t) { @@ -411,6 +459,10 @@ test('session of a session with the writable option disabled', async function (t } catch (err) { t.pass(err.code, 'SESSION_NOT_WRITABLE') } + + await s1.close() + await s2.close() + await core.close() }) test('writable session on a readable only core', async function (t) { @@ -430,6 +482,10 @@ test('writable session on a readable only core', async function (t) { } catch (err) { t.pass(err.code, 'SESSION_NOT_WRITABLE') } + + await s.close() + await a.close() + await core.close() }) test('append above the max suggested block size', async function (t) { @@ -448,6 +504,8 @@ test('append above the max suggested block size', async function (t) { } catch { t.pass('should throw') } + + await core.close() }) test('get undefined block is not allowed', async function (t) { @@ -461,6 +519,8 @@ test('get undefined block is not allowed', async function (t) { } catch (err) { t.pass(err.code, 'ERR_ASSERTION') } + + await core.close() }) test('valid manifest passed to a session is stored', async function (t) { @@ -485,4 +545,8 @@ test('valid manifest passed to a session is stored', async function (t) { await b.ready() t.alike(b.manifest, core.manifest) + + await a.close() + await b.close() + await core.close() }) diff --git a/test/batch.js b/test/batch.js index 2a5197f7..71843d84 100644 --- a/test/batch.js +++ b/test/batch.js @@ -1,116 +1,146 @@ const test = require('brittle') +const createTempDir = require('test-tmp') const b4a = require('b4a') +const Hypercore = require('../') +const { create, createStorage, replicate, eventFlush } = require('./helpers') + const NS = b4a.alloc(32) -const { create, replicate, eventFlush } = require('./helpers') test('batch append', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) + await b.ready() // todo: we shouldn't have to wait for ready + + t.unlike(b.state, core.state) + const info = await b.append(['de', 'fg']) t.is(core.length, 3) + + t.is(b.length, 5) t.alike(info, { length: 5, byteLength: 7 }) t.alike(await b.get(3), b4a.from('de')) t.alike(await b.get(4), b4a.from('fg')) - await b.flush() + t.is(core.length, 3) + + await core.core.commit(b.state) + t.is(core.length, 5) + + await b.close() }) test('batch has', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['de', 'fg']) for (let i = 0; i < b.length; i++) { t.ok(await b.has(i)) } + + await b.close() }) -test('append to core during batch', async function (t) { - const core = await create() +test.skip('append to core during batch', async function (t) { + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await core.append('d') await b.append('e') t.absent(await b.flush()) t.is(core.length, 4) + + await b.close() }) test('append to session during batch, create before batch', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) const s = core.session() - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('d') await s.append('d') - t.ok(await b.flush()) + t.ok(await core.core.commit(b.state)) t.is(s.length, 4) + + await b.close() + await s.close() }) test('append to session during batch, create after batch', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('d') const s = core.session() await s.append('d') - t.ok(await b.flush()) + t.ok(await core.core.commit(b.state)) t.is(s.length, 4) + + await s.close() + await b.close() }) test('batch truncate', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['de', 'fg']) - await b.truncate(4) + await b.truncate(4, { fork: 0 }) t.alike(await b.get(3), b4a.from('de')) - await t.exception(b.get(4)) + t.alike(await b.get(4, { wait: false }), null) - await b.flush() + await core.core.commit(b.state) t.is(core.length, 4) + + await b.close() }) test('truncate core during batch', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('a') await core.truncate(2) - t.absent(await b.flush()) + t.absent(await core.core.commit(b.state)) t.is(core.length, 2) + + await b.close() }) -test('batch truncate committed', async function (t) { - const core = await create() +test.skip('batch truncate committed', async function (t) { + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['de', 'fg']) await t.exception(b.truncate(2)) + + await b.close() }) test('batch close', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['de', 'fg']) await b.close() t.is(core.length, 3) @@ -120,28 +150,32 @@ test('batch close', async function (t) { }) test('batch close after flush', async function (t) { - const core = await create() + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() - await b.flush() + const b = core.session({ name: 'batch' }) + await b.ready() + + await core.core.commit(b.state) await b.close() }) -test('batch flush after close', async function (t) { - const core = await create() +test.skip('batch flush after close', async function (t) { + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) + await b.ready() + await b.close() - await t.exception(b.flush()) + await t.exception(core.core.commit(b.state)) }) -test('batch info', async function (t) { - const core = await create() +test.skip('batch info', async function (t) { + const core = await create(t) await core.append(['a', 'b', 'c']) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['de', 'fg']) const info = await b.info() @@ -150,45 +184,55 @@ test('batch info', async function (t) { t.is(info.byteLength, 7) t.unlike(await core.info(), info) - await b.flush() + await core.core.commit(b.state) t.alike(await core.info(), info) + + await b.close() }) test('simultaneous batches', async function (t) { - const core = await create() + const core = await create(t) - const b = core.batch() - const c = core.batch() - const d = core.batch() + const b = core.session({ name: '1' }) + const c = core.session({ name: '2' }) + const d = core.session({ name: '3' }) await b.append('a') await c.append(['a', 'c']) await d.append('c') - t.ok(await b.flush()) - t.ok(await c.flush()) - t.absent(await d.flush()) + t.ok(await core.core.commit(b.state)) + t.ok(await core.core.commit(c.state)) + t.absent(await core.core.commit(d.state)) + + await b.close() + await c.close() + await d.close() }) test('multiple batches', async function (t) { - const core = await create() + const core = await create(t) const session = core.session() - const b = core.batch() + const b = core.session({ name: 'batch1' }) await b.append('a') - await b.flush() + await core.core.commit(b.state) - const b2 = session.batch() + const b2 = session.session({ name: 'batch2' }) await b2.append('b') - await b2.flush() + await core.core.commit(b2.state) t.is(core.length, 2) + + await session.close() + await b.close() + await b2.close() }) -test('partial flush', async function (t) { - const core = await create() +test.skip('partial flush', async function (t) { + const core = await create(t) - const b = core.batch({ autoClose: false }) + const b = core.session({ name: 'batch' }) await b.append(['a', 'b', 'c', 'd']) @@ -220,67 +264,84 @@ test('partial flush', async function (t) { }) test('can make a tree batch', async function (t) { - const core = await create() + const core = await create(t) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('a') const batchTreeBatch = b.createTreeBatch() const batchHash = batchTreeBatch.hash() - await b.flush() + await core.core.commit(b.state) const treeBatch = core.createTreeBatch() const hash = treeBatch.hash() t.alike(hash, batchHash) + + await b.close() }) test('batched tree batch contains new nodes', async function (t) { - const core = await create() + const core = await create(t) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('a') const batchTreeBatch = b.createTreeBatch() const batchNode = await batchTreeBatch.get(0) - await b.flush() + await core.core.commit(b.state) const treeBatch = core.createTreeBatch() const node = await treeBatch.get(0) t.alike(node, batchNode) + + await b.close() }) test('batched tree batch proofs are equivalent', async function (t) { - const core = await create() + const core = await create(t) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(['a', 'b', 'c']) + const reader = b.state.storage.createReadBatch() const batchTreeBatch = b.createTreeBatch() - const batchProof = await batchTreeBatch.proof({ upgrade: { start: 0, length: 2 } }) + const batchProofIntermediate = await batchTreeBatch.proof(reader, { upgrade: { start: 0, length: 2 } }) + + await reader.flush() + + const batchProof = await batchProofIntermediate.settle() - await b.flush() + await core.core.commit(b.state) + const reader1 = core.state.storage.createReadBatch() const treeBatch = core.createTreeBatch() - const proof = await treeBatch.proof({ upgrade: { start: 0, length: 2 } }) - const treeProof = await core.core.tree.proof({ upgrade: { start: 0, length: 2 } }) + const proofIntermediate = await treeBatch.proof(reader, { upgrade: { start: 0, length: 2 } }) + const treeProofIntermediate = await core.core.tree.proof(reader1, { upgrade: { start: 0, length: 2 } }) + + await reader1.flush() + + const proof = await proofIntermediate.settle() + const treeProof = await treeProofIntermediate.settle() treeProof.upgrade.signature = null t.alike(proof, batchProof) t.alike(treeProof, batchProof) + + await b.close() }) -test('create tree batches', async function (t) { - const core = await create() +test.skip('create tree batches', async function (t) { + const core = await create(t) - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append('a') await b.append('b') @@ -324,11 +385,11 @@ test('create tree batches', async function (t) { t.absent(b.createTreeBatch(6)) t.absent(b.createTreeBatch(8, blocks)) - await b.flush() + await core.core.commit(b.state) t.is(core.length, 5) - const b2 = core.batch() + const b2 = core.session({ name: 'batch2' }) await b2.ready() t.absent(b2.createTreeBatch(3)) @@ -345,18 +406,21 @@ test('create tree batches', async function (t) { await b2.append('g') t.alike(b2.createTreeBatch().signable(NS), t7.signable(NS)) + + await b.close() + await b2.close() }) test('flush with bg activity', async function (t) { - const core = await create() - const clone = await create(core.key) + const core = await create(t) + const clone = await create(t, { keyPair: core.core.header.keyPair }) replicate(core, clone, t) await core.append('a') await clone.get(0) - const b = clone.batch({ autoClose: false }) + const b = clone.session({ name: 'batch' }) // bg await core.append('b') @@ -367,23 +431,25 @@ test('flush with bg activity', async function (t) { await b.append('b') - t.absent(await b.flush(), 'core is ahead, not flushing') + t.absent(await core.core.commit(b.state), 'core is ahead, not flushing') await b.append('c') - t.ok(await b.flush(), 'flushed!') + t.ok(await core.core.commit(b.state), 'flushed!') + + await b.close() }) test('flush with bg activity persists non conflicting values', async function (t) { - const core = await create() - const clone = await create(core.key) + const core = await create(t) + const clone = await create(t, core.key) replicate(core, clone, t) await core.append('a') await clone.get(0) - const b = clone.batch() + const b = clone.session({ name: 'batch' }) // bg await core.append('b') @@ -394,26 +460,28 @@ test('flush with bg activity persists non conflicting values', async function (t await eventFlush() - t.ok(await b.flush(), 'flushed!') + t.ok(await clone.core.commit(b.state), 'flushed!') t.alike(await clone.get(0, { wait: false }), b4a.from('a')) t.alike(await clone.get(1, { wait: false }), b4a.from('b')) t.alike(await clone.get(2, { wait: false }), b4a.from('c')) t.is(b.byteLength, clone.byteLength) - t.is(b.indexedLength, b.length, 'nothing buffered') + t.is(b.flushedLength, b.length, 'nothing buffered') + + await b.close() }) test('flush with conflicting bg activity', async function (t) { - const core = await create() - const clone = await create(core.key) + const core = await create(t) + const clone = await create(t, core.key) replicate(core, clone, t) await core.append('a') await clone.get(0) - const b = clone.batch({ autoClose: false }) + const b = clone.session({ name: 'batch' }) // bg await core.append('b') @@ -425,11 +493,13 @@ test('flush with conflicting bg activity', async function (t) { await b.append('c') await b.append('c') - t.absent(await b.flush(), 'cannot flush a batch with conflicts') + t.absent(await clone.core.commit(b.state), 'cannot flush a batch with conflicts') + + await b.close() }) -test('checkout batch', async function (t) { - const core = await create() +test.skip('checkout batch', async function (t) { + const core = await create(t) await core.append(['a', 'b']) const hash = core.createTreeBatch().hash() @@ -451,25 +521,29 @@ test('checkout batch', async function (t) { await b.truncate(3, b.fork) await b.append('d') t.ok(await b.flush()) + + await b.close() }) test('encryption and batches', async function (t) { - const core = await create({ encryptionKey: b4a.alloc(32) }) + const core = await create(t, { encryptionKey: b4a.alloc(32) }) await core.append(['a', 'b']) - const batch = core.batch() + const batch = core.session({ name: 'batch' }) + + await batch.ready() t.alike(await batch.get(0), b4a.from('a')) t.alike(await batch.get(1), b4a.from('b')) - const pre = batch.createTreeBatch(3, [b4a.from('c')]) + // const pre = batch.createTreeBatch(3, [b4a.from('c')]) await batch.append('c') const post = batch.createTreeBatch(3) t.is(batch.byteLength, 3) t.alike(await batch.get(2), b4a.from('c')) - await batch.flush() + await core.core.commit(batch.state) t.is(core.byteLength, 3) t.is(core.length, 3) @@ -478,20 +552,22 @@ test('encryption and batches', async function (t) { const final = core.createTreeBatch() - t.alike(pre.hash(), final.hash()) + // t.alike(pre.hash(), final.hash()) t.alike(post.hash(), final.hash()) + + await batch.close() }) test('encryption and bigger batches', async function (t) { - const core = await create({ encryptionKey: b4a.alloc(32) }) + const core = await create(t, { encryptionKey: b4a.alloc(32) }) await core.append(['a', 'b']) - const batch = core.batch() + const batch = core.session({ name: 'batch' }) t.alike(await batch.get(0), b4a.from('a')) t.alike(await batch.get(1), b4a.from('b')) - const pre = batch.createTreeBatch(5, [b4a.from('c'), b4a.from('d'), b4a.from('e')]) + // const pre = batch.createTreeBatch(5, [b4a.from('c'), b4a.from('d'), b4a.from('e')]) await batch.append(['c', 'd', 'e']) const post = batch.createTreeBatch(5) @@ -500,7 +576,7 @@ test('encryption and bigger batches', async function (t) { t.alike(await batch.get(3), b4a.from('d')) t.alike(await batch.get(4), b4a.from('e')) - await batch.flush() + await core.core.commit(batch.state) t.is(core.byteLength, 5) t.is(core.length, 5) @@ -511,166 +587,212 @@ test('encryption and bigger batches', async function (t) { const final = core.createTreeBatch() - t.alike(pre.hash(), final.hash()) + // t.alike(pre.hash(), final.hash()) t.alike(post.hash(), final.hash()) + + await batch.close() }) -test('persistent batch', async function (t) { - const core = await create() +// test('persistent batch', async function (t) { +// const core = await create(t) - await core.append(['a', 'b', 'c']) +// await core.append(['a', 'b', 'c']) - let batch = core.batch() +// let batch = core.session({ name: 'batch' }) - await batch.ready() - await batch.append(['d', 'e', 'f']) - await batch.flush({ pending: true }) +// await batch.ready() +// await batch.append(['d', 'e', 'f']) +// await batch.flush({ pending: true }) - batch = core.batch({ restore: true, autoClose: false }) +// batch = core.batch({ restore: true, autoClose: false }) - await batch.ready() +// await batch.ready() - t.is(batch.length, 6) - t.is(batch.byteLength, 6) - t.is(batch.indexedLength, 3) - t.alike(await batch.seek(4), [4, 0]) +// t.is(batch.length, 6) +// t.is(batch.byteLength, 6) +// t.is(batch.flushedLength, 3) +// t.alike(await batch.seek(4), [4, 0]) - const clone = await create(core.key) +// const clone = await create(t, core.key) - replicate(core, clone, t) +// replicate(core, clone, t) - clone.download() +// clone.download() - await t.test('download', async function (sub) { - const downloaded = [] - clone.on('download', function (index) { - downloaded.push(index) - }) - await eventFlush() - sub.alike(downloaded.sort(), [0, 1, 2], 'got non pending blocks') - }) +// await t.test('download', async function (sub) { +// const downloaded = [] +// clone.on('download', function (index) { +// downloaded.push(index) +// }) +// await eventFlush() +// sub.alike(downloaded.sort(), [0, 1, 2], 'got non pending blocks') +// }) - await batch.flush({ length: 5 }) +// await batch.flush({ length: 5 }) - t.is(core.length, 5) +// t.is(core.length, 5) - await t.test('download', async function (sub) { - const downloaded = [] - clone.on('download', function (index) { - downloaded.push(index) - }) - await eventFlush() - sub.alike(downloaded.sort(), [3, 4], 'got non pending blocks') - }) +// await t.test('download', async function (sub) { +// const downloaded = [] +// clone.on('download', function (index) { +// downloaded.push(index) +// }) +// await eventFlush() +// sub.alike(downloaded.sort(), [3, 4], 'got non pending blocks') +// }) - await batch.flush({ length: 6 }) +// await batch.flush({ length: 6 }) - t.is(core.length, 6) +// t.is(core.length, 6) - await t.test('download', async function (sub) { - const downloaded = [] - clone.on('download', function (index) { - downloaded.push(index) - }) - await eventFlush() - sub.alike(downloaded.sort(), [5], 'got non pending blocks') - }) +// await t.test('download', async function (sub) { +// const downloaded = [] +// clone.on('download', function (index) { +// downloaded.push(index) +// }) +// await eventFlush() +// sub.alike(downloaded.sort(), [5], 'got non pending blocks') +// }) - await batch.append('g') +// await batch.append('g') - t.is(batch.length, 7) +// t.is(batch.length, 7) - await batch.flush({ pending: true }) +// await batch.flush({ pending: true }) - t.is(core.length, 6) +// t.is(core.length, 6) - await batch.append('h') +// await batch.append('h') - t.is(batch.length, 8) +// t.is(batch.length, 8) - await batch.flush({ pending: true }) +// await batch.flush({ pending: true }) - t.is(batch.length, 8) +// t.is(batch.length, 8) - t.is(core.length, 6) +// t.is(core.length, 6) - await batch.flush() +// await batch.flush() - t.is(batch.length, 8) - t.is(core.length, 8) +// t.is(batch.length, 8) +// t.is(core.length, 8) - await t.test('download', async function (sub) { - const downloaded = [] - clone.on('download', function (index) { - downloaded.push(index) - }) - await eventFlush() - sub.alike(downloaded.sort(), [6, 7], 'got non pending blocks') - }) +// await t.test('download', async function (sub) { +// const downloaded = [] +// clone.on('download', function (index) { +// downloaded.push(index) +// }) +// await eventFlush() +// sub.alike(downloaded.sort(), [6, 7], 'got non pending blocks') +// }) +// }) + +test('persistent batch', async function (t) { + const dir = await createTempDir() + let storage = null + + const core = new Hypercore(await open()) + await core.ready() + + await core.append(['a', 'b', 'c']) + + const batch = core.session({ name: 'batch' }) + await batch.ready() + + await batch.append(['d', 'e', 'f']) + + t.is(batch.length, 6) + t.is(batch.byteLength, 6) + // t.is(batch.flushedLength, 3) + // t.alike(await batch.seek(4), [4, 0]) + + await core.close() + + const reopen = new Hypercore(await open()) + await reopen.ready() + + const reopened = reopen.session({ name: 'batch' }) + await reopened.ready() + + t.is(reopened.length, 6) + t.is(reopened.byteLength, 6) + // t.is(batch.flushedLength, 3) + // t.alike(await batch.seek(4), [4, 0]) + + await reopened.close() + await reopen.close() + + async function open () { + if (storage) await storage.close() + storage = await createStorage(t, dir) + return storage + } }) test('clear', async function (t) { - const core = await create() + const core = await create(t) await core.append('hello') - const clone = await create(core.key) + const clone = await create(t, core.key) - const b = clone.batch() + const b = clone.session({ name: 'b' }) await b.append('hello') - await b.flush() - await b.close() const [s1, s2] = replicate(core, clone, t) - await eventFlush() + await new Promise(resolve => clone.on('append', resolve)) + + await clone.core.commit(b.state) + await b.close() + t.ok(!!(await clone.get(0)), 'got block 0 proof') s1.destroy() s2.destroy() - const b1 = clone.batch() + const b1 = clone.session({ name: 'b1' }) await b1.ready() await b1.append('foo') - await b1.flush() + await t.exception(clone.core.commit(b1.state)) await b1.close() t.is(clone.length, 1, 'clone length is still 1') - const b2 = clone.batch({ clear: true }) + const b2 = clone.batch() await b2.ready() t.is(b2.length, 1, 'reset the batch') + + await b2.close() }) test('copy from with encrypted batch', async function (t) { const encryptionKey = b4a.alloc(32, 2) - const core = await create({ encryptionKey }) + const core = await create(t, { encryptionKey }) const blocks = 290 - const b = core.batch({ autoClose: false }) + const b = core.session({ name: 'batch' }) for (let i = 0; i < blocks; i++) { await b.append('block' + i) } - await b.flush({ keyPair: null }) - t.is(core.length, 0) - t.is(b._sessionLength, blocks) + t.is(b.length, blocks) const manifest = { prologue: { - length: b._sessionLength, + length: b.length, hash: b.createTreeBatch().hash() }, encryptionKey } - const clone = await create({ + const clone = await create(t, { manifest, encryptionKey }) @@ -682,19 +804,23 @@ test('copy from with encrypted batch', async function (t) { } t.alike(tree.hash(), manifest.prologue.hash) + + await b.close() }) test('batch append with huge batch', async function (t) { // Context: array.append(...otherArray) stops working after a certain amount of entries // due to a limit on the amount of function args // This caused a bug on large batches - const core = await create() + const core = await create(t) const bigBatch = (new Array(200_000)).fill('o') - const b = core.batch() + const b = core.session({ name: 'batch' }) await b.append(bigBatch) // Actually flushing such a big batch takes multiple minutes // so we only ensure that nothing crashed while appending t.pass('Can append a big batch') + + await b.close() }) diff --git a/test/bitfield.js b/test/bitfield.js index 78e00dd3..0fba8810 100644 --- a/test/bitfield.js +++ b/test/bitfield.js @@ -63,7 +63,7 @@ test('bitfield - reload', async function (t) { b.setRange(40000, 40001, true) b.setRange(1424242424, 1424242425, true) await flush(storage, b) - await storage.close() + await storage.db.close() } { diff --git a/test/cache.js b/test/cache.js index 07458209..ab9cec2b 100644 --- a/test/cache.js +++ b/test/cache.js @@ -24,6 +24,8 @@ test('session cache inheritance', async function (t) { const q = s.get(0) t.is(await p, await q, 'blocks are identical') + + await s.close() }) test('session cache opt-out', async function (t) { @@ -36,6 +38,8 @@ test('session cache opt-out', async function (t) { const q = s.get(0) t.not(await p, await q, 'blocks are not identical') + + await s.close() }) test('session cache override', async function (t) { @@ -50,6 +54,8 @@ test('session cache override', async function (t) { t.not(await p, await q, 'blocks are not identical') t.is(await q, await r, 'blocks are identical') + + await s.close() }) test('clear cache on truncate', async function (t) { @@ -98,6 +104,8 @@ test('session cache with different encodings', async function (t) { t.alike(await p, b4a.from('a')) t.is(await q, 'a') + + await s.close() }) test('cache is set through preload', async function (t) { @@ -118,4 +126,6 @@ test('globalCache set if passed in, and shared among sessions', async function ( const session = a.session() t.is(session.globalCache, globalCache, 'passed on to sessions') + + await session.close() }) diff --git a/test/clear.js b/test/clear.js index 2a08ff6d..b1f13ca8 100644 --- a/test/clear.js +++ b/test/clear.js @@ -18,6 +18,8 @@ test('clear', async function (t) { t.ok(await a.has(0), 'has 0') t.absent(await a.has(1), 'has not 1') t.ok(await a.has(2), 'has 2') + + await a.close() }) test('clear + replication', async function (t) { @@ -35,6 +37,9 @@ test('clear + replication', async function (t) { t.ok(await b.has(1), 'b not cleared') t.alike(await a.get(1), b4a.from('b'), 'a downloaded from b') + + await a.close() + await b.close() }) test('clear + replication, gossip', async function (t) { diff --git a/test/compat.js b/test/compat.js index 3ca09d99..cce755d6 100644 --- a/test/compat.js +++ b/test/compat.js @@ -30,5 +30,7 @@ for (const abi of abis) { } t.pass('blocks match') + + await core.close() }) } diff --git a/test/core.js b/test/core.js index 7fc0c94e..82546e04 100644 --- a/test/core.js +++ b/test/core.js @@ -8,13 +8,12 @@ test('core - append', async function (t) { const { core } = await create(t) { - const info = await core.append([ + const info = await core.state.append([ b4a.from('hello'), b4a.from('world') ]) - // t.alike(info, { length: 2, byteLength: 10 }) - t.alike(info, { length: 2, byteLength: undefined }) + t.alike(info, { length: 2, byteLength: 10 }) t.is(core.tree.length, 2) t.is(core.tree.byteLength, 10) t.alike([ @@ -27,12 +26,11 @@ test('core - append', async function (t) { } { - const info = await core.append([ + const info = await core.state.append([ b4a.from('hej') ]) - // t.alike(info, { length: 3, byteLength: 13 }) - t.alike(info, { length: 3, byteLength: undefined }) + t.alike(info, { length: 3, byteLength: 13 }) t.is(core.tree.length, 3) t.is(core.tree.byteLength, 13) t.alike([ @@ -50,49 +48,49 @@ test('core - append', async function (t) { test('core - append and truncate', async function (t) { const { core, reopen } = await create(t) - await core.append([ + await core.state.append([ b4a.from('hello'), b4a.from('world'), b4a.from('fo'), b4a.from('ooo') ]) - await core.truncate(3, 1) + await core.state.truncate(3, 1) t.is(core.tree.length, 3) t.is(core.tree.byteLength, 12) t.is(core.tree.fork, 1) t.alike(core.header.hints.reorgs, [{ from: 0, to: 1, ancestors: 3 }]) - await core.append([ + await core.state.append([ b4a.from('a'), b4a.from('b'), b4a.from('c'), b4a.from('d') ]) - await core.truncate(3, 2) + await core.state.truncate(3, 2) t.is(core.tree.length, 3) t.is(core.tree.byteLength, 12) t.is(core.tree.fork, 2) t.alike(core.header.hints.reorgs, [{ from: 0, to: 1, ancestors: 3 }, { from: 1, to: 2, ancestors: 3 }]) - await core.truncate(2, 3) + await core.state.truncate(2, 3) t.alike(core.header.hints.reorgs, [{ from: 2, to: 3, ancestors: 2 }]) - await core.append([b4a.from('a')]) - await core.truncate(2, 4) + await core.state.append([b4a.from('a')]) + await core.state.truncate(2, 4) - await core.append([b4a.from('a')]) - await core.truncate(2, 5) + await core.state.append([b4a.from('a')]) + await core.state.truncate(2, 5) - await core.append([b4a.from('a')]) - await core.truncate(2, 6) + await core.state.append([b4a.from('a')]) + await core.state.truncate(2, 6) - await core.append([b4a.from('a')]) - await core.truncate(2, 7) + await core.state.append([b4a.from('a')]) + await core.state.truncate(2, 7) t.is(core.header.hints.reorgs.length, 4) @@ -134,7 +132,7 @@ test('core - verify', async function (t) { t.is(clone.header.keyPair.publicKey, core.header.keyPair.publicKey) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) { const p = await getProof(core, { upgrade: { start: 0, length: 2 } }) @@ -159,7 +157,7 @@ test('core - verify parallel upgrades', async function (t) { t.is(clone.header.keyPair.publicKey, core.header.keyPair.publicKey) - await core.append([b4a.from('a'), b4a.from('b'), b4a.from('c'), b4a.from('d')]) + await core.state.append([b4a.from('a'), b4a.from('b'), b4a.from('c'), b4a.from('d')]) { const p1 = await getProof(core, { upgrade: { start: 0, length: 2 } }) @@ -192,7 +190,7 @@ test('core - update hook is triggered', async function (t) { ran |= 1 } - await core.append([b4a.from('a'), b4a.from('b'), b4a.from('c'), b4a.from('d')]) + await core.state.append([b4a.from('a'), b4a.from('b'), b4a.from('c'), b4a.from('d')]) const peer = {} @@ -231,7 +229,7 @@ test('core - update hook is triggered', async function (t) { ran |= 8 } - await core.truncate(1, 1) + await core.state.truncate(1, 1) core.onupdate = ({ status, bitfield, value, from }) => { t.ok(status & 0b01, 'was appended') @@ -240,7 +238,7 @@ test('core - update hook is triggered', async function (t) { ran |= 16 } - await core.append([b4a.from('e')]) + await core.state.append([b4a.from('e')]) clone.onupdate = ({ status, bitfield, value, from }) => { t.ok(status & 0b11, 'was appended and truncated') @@ -262,7 +260,7 @@ test('core - update hook is triggered', async function (t) { ran |= 64 } - await core.truncate(1, 2) + await core.state.truncate(1, 2) clone.onupdate = ({ status, bitfield, value, from }) => { t.ok(status & 0b10, 'was truncated') @@ -286,7 +284,7 @@ test('core - clone', async function (t) { await setUserData(core, 'hello', b4a.from('world')) - await core.append([ + await core.state.append([ b4a.from('hello'), b4a.from('world') ]) @@ -294,7 +292,7 @@ test('core - clone', async function (t) { const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: copy } = (await create(t, { manifest })) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) const userData = [] const str = copy.storage.createUserDataStream() @@ -320,7 +318,7 @@ test('core - clone', async function (t) { ) } - await core.append([b4a.from('c')]) + await core.state.append([b4a.from('c')]) // copy should be independent t.alike(copy.tree.signature, signature) @@ -331,16 +329,16 @@ test('core - clone', async function (t) { test('core - clone verify', async function (t) { const { core } = await create(t) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: copy } = await create(t, { manifest }) const { core: clone } = await create(t, { manifest }) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) // copy should be independent - await core.append([b4a.from('c')]) + await core.state.append([b4a.from('c')]) { const p = await getProof(copy, { upgrade: { start: 0, length: 2 } }) @@ -365,17 +363,17 @@ test('core - clone verify', async function (t) { test('core - partial clone', async function (t) { const { core } = await create(t) - await core.append([b4a.from('0')]) - await core.append([b4a.from('1')]) + await core.state.append([b4a.from('0')]) + await core.state.append([b4a.from('1')]) const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } - await core.append([b4a.from('2')]) - await core.append([b4a.from('3')]) + await core.state.append([b4a.from('2')]) + await core.state.append([b4a.from('3')]) const { core: copy } = (await create(t, { manifest })) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) t.is(core.tree.length, 4) t.is(copy.tree.length, 2) @@ -397,20 +395,20 @@ test('core - partial clone', async function (t) { test('core - clone with additional', async function (t) { const { core } = await create(t) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: copy } = await create(t, { manifest }) - await copy.copyPrologue(core, core.tree.signature) + await copy.copyPrologue(core.state, core.tree.signature) // copy should be independent - await core.append([b4a.from('c')]) + await core.state.append([b4a.from('c')]) const secondManifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: clone } = await create(t, { manifest: secondManifest }) - await clone.copyPrologue(copy, { additional: [b4a.from('c')] }) + await clone.copyPrologue(copy.state, { additional: [b4a.from('c')] }) t.is(clone.header.tree.length, 3) @@ -429,12 +427,12 @@ test('core - clone with additional', async function (t) { test('core - clone with additional, larger tree', async function (t) { const { core } = await create(t) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: copy } = await create(t, { manifest }) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) const additional = [ b4a.from('c'), @@ -447,13 +445,13 @@ test('core - clone with additional, larger tree', async function (t) { b4a.from('j') ] - await core.append(additional) + await core.state.append(additional) const secondManifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: clone } = await create(t, { manifest: secondManifest }) // copy should be independent - await clone.copyPrologue(copy, { additional }) + await clone.copyPrologue(copy.state, { additional }) t.is(clone.header.tree.length, core.header.tree.length) @@ -481,9 +479,9 @@ test('core - copyPrologue bails if core is not the same', async function (t) { const { core: copy } = await create(t, { manifest: { prologue: { hash: b4a.alloc(32), length: 1 } } }) // copy should be independent - await core.append([b4a.from('a')]) + await core.state.append([b4a.from('a')]) - await t.exception(copy.copyPrologue(core)) + await t.exception(copy.copyPrologue(core.state)) t.is(copy.header.hints.contiguousLength, 0) }) @@ -491,23 +489,23 @@ test('core - copyPrologue bails if core is not the same', async function (t) { test('core - copyPrologue can recover from bad additional', async function (t) { const { core } = await create(t) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) const manifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: copy } = await create(t, { manifest }) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) // copy should be independent - await core.append([b4a.from('c')]) + await core.state.append([b4a.from('c')]) const secondManifest = { prologue: { hash: await core.tree.hash(), length: core.tree.length } } const { core: clone } = await create(t, { manifest: secondManifest }) - await t.exception(clone.copyPrologue(copy, { additional: [b4a.from('d')] })) + await t.exception(clone.copyPrologue(copy.state, { additional: [b4a.from('d')] })) t.is(clone.header.hints.contiguousLength, 0) - await t.execution(clone.copyPrologue(copy, { additional: [b4a.from('c')] })) + await t.execution(clone.copyPrologue(copy.state, { additional: [b4a.from('c')] })) t.is(clone.header.hints.contiguousLength, 3) t.is(clone.header.tree.length, 3) @@ -523,7 +521,7 @@ test('core - copyPrologue can recover from bad additional', async function (t) { test('core - copyPrologue many', async function (t) { const { core } = await create(t, { compat: false, version: 1 }) - await core.append([b4a.from('a'), b4a.from('b')]) + await core.state.append([b4a.from('a'), b4a.from('b')]) const manifest = { ...core.header.manifest } manifest.prologue = { length: core.tree.length, hash: core.tree.hash() } @@ -532,7 +530,7 @@ test('core - copyPrologue many', async function (t) { const { core: copy2 } = await create(t, { manifest }) const { core: copy3 } = await create(t, { manifest }) - await copy.copyPrologue(core) + await copy.copyPrologue(core.state) t.alike(copy.header.manifest.signers[0].publicKey, core.header.manifest.signers[0].publicKey) @@ -540,7 +538,7 @@ test('core - copyPrologue many', async function (t) { t.is(copy.tree.byteLength, core.tree.byteLength) // copy should be independent - await core.append([b4a.from('c')]) + await core.state.append([b4a.from('c')]) // upgrade clone { @@ -550,8 +548,8 @@ test('core - copyPrologue many', async function (t) { t.ok(await copy2.verify(p)) } - await t.execution(copy2.copyPrologue(core)) - await t.execution(copy3.copyPrologue(core)) + await t.execution(copy2.copyPrologue(core.state)) + await t.execution(copy3.copyPrologue(core.state)) t.is(copy2.tree.length, core.tree.length) t.is(copy.tree.length, copy3.tree.length) @@ -564,7 +562,7 @@ test('core - copyPrologue many', async function (t) { manifest.prologue = { length: core.tree.length, hash: core.tree.hash() } const { core: copy4 } = await create(t, { manifest }) - await copy4.copyPrologue(copy2) + await copy4.copyPrologue(copy2.state) t.is(copy4.tree.length, 3) t.is(copy4.header.tree.length, 3) @@ -585,6 +583,8 @@ async function create (t, opts = {}) { const dkey = b4a.alloc(32, 1) let db = null + t.teardown(teardown, { order: 1 }) + const reopen = async () => { if (db) await db.close() @@ -600,6 +600,10 @@ async function create (t, opts = {}) { const core = await reopen() return { core, reopen } + + async function teardown () { + if (db) await db.close() + } } async function getBlock (core, i) { diff --git a/test/encryption.js b/test/encryption.js index b118baf2..0fb25929 100644 --- a/test/encryption.js +++ b/test/encryption.js @@ -125,6 +125,8 @@ test('encrypted session', async function (t) { const encrypted = await getBlock(s, 1) t.absent(encrypted.includes('world')) t.alike(await getBlock(a, 1), encrypted) + + await s.close() }) test('encrypted session before ready core', async function (t) { @@ -139,6 +141,9 @@ test('encrypted session before ready core', async function (t) { await a.append(['hello']) t.alike(await s.get(0), b4a.from('hello')) + + await s.close() + await a.close() }) test('encrypted session on unencrypted core', async function (t) { @@ -155,6 +160,8 @@ test('encrypted session on unencrypted core', async function (t) { const encrypted = await a.get(0) t.absent(encrypted.includes('hello')) + + await s.close() }) test('encrypted session on encrypted core, same key', async function (t) { @@ -168,6 +175,8 @@ test('encrypted session on encrypted core, same key', async function (t) { const unencrypted = await s.get(0) t.alike(unencrypted, b4a.from('hello')) t.alike(unencrypted, await a.get(0)) + + await s.close() }) test('encrypted session on encrypted core, different keys', async function (t) { @@ -183,6 +192,8 @@ test('encrypted session on encrypted core, different keys', async function (t) { const encrypted = await a.get(0) t.absent(encrypted.includes('hello')) + + await s.close() }) test('multiple gets to replicated, encrypted block', async function (t) { @@ -211,6 +222,8 @@ test('encrypted core from existing unencrypted core', async function (t) { const unencrypted = await b.get(0) t.alike(unencrypted, b4a.from('hello')) + + await b.close() }) test('from session sessions pass encryption', async function (t) { @@ -227,6 +240,10 @@ test('from session sessions pass encryption', async function (t) { t.absent(a.encryptionKey) t.ok(b.encryptionKey) t.ok(c.encryptionKey) + + await c.close() + await b.close() + await a.close() }) test('session keeps encryption', async function (t) { @@ -237,6 +254,9 @@ test('session keeps encryption', async function (t) { await b.ready() t.alike(b.encryptionKey, encryptionKey) + + await b.close() + await a.close() }) function getBlock (core, index) { diff --git a/test/helpers/index.js b/test/helpers/index.js index 6a053c90..fb846651 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -7,21 +7,17 @@ exports.create = async function (t, ...args) { const db = new CoreStorage(dir) - t.teardown(() => db.close(), { order: 1 }) - const core = new Hypercore(db, ...args) await core.ready() + t.teardown(() => core.close(), { order: 1 }) + return core } const createStorage = exports.createStorage = async function (t, dir) { if (!dir) dir = await createTempDir(t) - const db = new CoreStorage(dir) - - t.teardown(() => db.close(), { order: 1 }) - - return db + return new CoreStorage(dir) } exports.createStored = async function (t) { diff --git a/test/manifest.js b/test/manifest.js index ce7fb251..c3d89e1f 100644 --- a/test/manifest.js +++ b/test/manifest.js @@ -275,7 +275,9 @@ test('multisig - append', async function (t) { t.is(len, 1) - const batch = await core.batch() + const batch = await core.session({ name: 'batch' }) + batch.keyPair = null + await batch.append(b4a.from('0')) const sigBatch = batch.createTreeBatch() @@ -313,6 +315,8 @@ test('multisig - append', async function (t) { await core2.download({ start: 0, end: core.length }).downloaded() t.alike(await core2.get(0), b4a.from('0')) + + await batch.close() }) test('multisig - batch failed', async function (t) { @@ -336,7 +340,9 @@ test('multisig - batch failed', async function (t) { t.is(len, 1) - const batch = await core.batch() + const batch = await core.session({ name: 'batch' }) + batch.keyPair = null + await batch.append(b4a.from('0')) const sigBatch = batch.createTreeBatch() @@ -367,6 +373,8 @@ test('multisig - batch failed', async function (t) { await t.exception(p) t.is(core2.length, 0) + + await batch.close() }) test('multisig - patches', async function (t) { @@ -1460,6 +1468,8 @@ test('create verifier - open existing core with manifest', async function (t) { const compatCore = await create(null, { manifest, compat: true }) await t.execution(compatCore.ready()) // compat flag is unset internally + + await compatCore.close() }) function createMultiManifest (signers, prologue = null) { diff --git a/test/merkle-tree.js b/test/merkle-tree.js index d3372916..142fdefd 100644 --- a/test/merkle-tree.js +++ b/test/merkle-tree.js @@ -803,7 +803,7 @@ test('reopen a tree', async t => { t.alike(a.tree.roots.map(n => n.index), [31]) - await a.storage.close() + await a.storage.db.close() const a1 = await create(t, 0, dir) diff --git a/test/preload.js b/test/preload.js index bb797305..7d76433a 100644 --- a/test/preload.js +++ b/test/preload.js @@ -17,6 +17,8 @@ test('preload - storage', async function (t) { await core.append('hello world') t.is(core.length, 1) t.alike(await core.get(0), b4a.from('hello world')) + + await core.close() }) test('preload - from another core', async function (t) { @@ -33,6 +35,8 @@ test('preload - from another core', async function (t) { t.alike(first.key, second.key) t.is(first.sessions, second.sessions) + + await second.close() }) test('preload - custom keypair', async function (t) { @@ -48,6 +52,8 @@ test('preload - custom keypair', async function (t) { t.ok(core.writable) t.alike(core.key, keyPair.publicKey) + + await core.close() }) test('preload - sign/storage', async function (t) { @@ -68,4 +74,6 @@ test('preload - sign/storage', async function (t) { await core.append('hello world') t.is(core.length, 1) t.is(await core.get(0), 'hello world') + + await core.close() }) diff --git a/test/replicate.js b/test/replicate.js index 15f4b6cb..7c6036ad 100644 --- a/test/replicate.js +++ b/test/replicate.js @@ -1123,8 +1123,8 @@ test('replication session', async function (t) { t.is(a.sessions.length, 1) t.is(b.sessions.length, 1) - t.is(a.core.active, 2) - t.is(b.core.active, 2) + t.is(a.core.state.active, 2) + t.is(b.core.state.active, 2) s1.destroy() s2.destroy() @@ -1133,8 +1133,8 @@ test('replication session', async function (t) { t.is(a.sessions.length, 1) t.is(b.sessions.length, 1) - t.is(a.core.active, 1) - t.is(b.core.active, 1) + t.is(a.core.state.active, 1) + t.is(b.core.state.active, 1) }) test('replication session after stream opened', async function (t) { @@ -1150,8 +1150,8 @@ test('replication session after stream opened', async function (t) { t.is(a.sessions.length, 1) t.is(b.sessions.length, 1) - t.is(a.core.active, 2) - t.is(b.core.active, 2) + t.is(a.core.state.active, 2) + t.is(b.core.state.active, 2) s1.destroy() s2.destroy() @@ -1160,8 +1160,8 @@ test('replication session after stream opened', async function (t) { t.is(a.sessions.length, 1) t.is(b.sessions.length, 1) - t.is(a.core.active, 1) - t.is(b.core.active, 1) + t.is(a.core.state.active, 1) + t.is(b.core.state.active, 1) }) test('replication session keeps the core open', async function (t) { @@ -1261,6 +1261,7 @@ test('cancel block', async function (t) { await a.close() await b.close() + await session.close() }) test('try cancel block from a different session', async function (t) { diff --git a/test/sessions.js b/test/sessions.js index 44b98ffe..09fe4659 100644 --- a/test/sessions.js +++ b/test/sessions.js @@ -36,6 +36,9 @@ test('sessions - can create writable sessions from a read-only core', async func } t.is(core.length, 1) + + await session.close() + await core.close() }) test('sessions - auto close', async function (t) { @@ -124,6 +127,7 @@ test('sessions - close with from option', async function (t) { t.absent(core1.closed) t.alike(await core1.get(0), b4a.from('hello world')) + await core1.close() }) test('sessions - custom valueEncoding on session', async function (t) { @@ -136,6 +140,9 @@ test('sessions - custom valueEncoding on session', async function (t) { t.alike(await core2.get(0), { a: 1 }) t.alike(await core2.get(1), { b: 2 }) + + await core2.close() + await core1.close() }) test('sessions - custom preload hook on first/later sessions', async function (t) { @@ -158,6 +165,9 @@ test('sessions - custom preload hook on first/later sessions', async function (t await core2.ready() await preloadsTest + + await core2.close() + await core1.close() }) test('session inherits non-sparse setting', async function (t) { @@ -165,6 +175,9 @@ test('session inherits non-sparse setting', async function (t) { const s = a.session() t.is(s.sparse, false) + + await s.close() + await a.close() }) test('session on a from instance, pre-ready', async function (t) { @@ -179,6 +192,9 @@ test('session on a from instance, pre-ready', async function (t) { t.is(a.sessions, b.sessions) t.is(a.sessions, c.sessions) + + await b.close() + await c.close() }) test('session on a from instance does not inject itself to other sessions', async function (t) { @@ -198,4 +214,8 @@ test('session on a from instance does not inject itself to other sessions', asyn t.absent(b.encryption) t.ok(c.encryption) t.absent(d.encryption) + + await b.close() + await c.close() + await d.close() }) diff --git a/test/snapshots.js b/test/snapshots.js index 3701ddf1..e9eabb34 100644 --- a/test/snapshots.js +++ b/test/snapshots.js @@ -103,6 +103,12 @@ test('snapshots wait for ready', async function (t) { t.is(s3.length, 4, 'no changes') t.is(s4.length, 4, 'no changes') + + await coreCopy.close() + await s1.close() + await s2.close() + await s3.close() + await s4.close() }) test('snapshots are consistent', async function (t) { @@ -138,4 +144,6 @@ test('snapshots are consistent', async function (t) { t.exception(snapshot.get(1)) t.exception(snapshot.get(2)) t.is(await b, 'block #0.0') + + await snapshot.close() }) diff --git a/test/storage.js b/test/storage.js index afb0b8c4..37bdb8ed 100644 --- a/test/storage.js +++ b/test/storage.js @@ -17,6 +17,8 @@ test('storage layout', async function (t) { } snapshot(t, core) + + await core.close() }) test('encrypted storage layout', async function (t) { @@ -27,6 +29,8 @@ test('encrypted storage layout', async function (t) { } snapshot(t, core) + + await core.close() }) function snapshot (t, core) { diff --git a/test/timeouts.js b/test/timeouts.js index 7c37ff7e..697bef4c 100644 --- a/test/timeouts.js +++ b/test/timeouts.js @@ -17,6 +17,10 @@ test('core and session timeout property', async function (t) { t.is(b.timeout, 50) await new Promise(resolve => setTimeout(resolve, 100)) + + await core.close() + await a.close() + await b.close() }) test('core session inherits timeout property', async function (t) { @@ -33,6 +37,10 @@ test('core session inherits timeout property', async function (t) { t.is(b.timeout, 0) await new Promise(resolve => setTimeout(resolve, 100)) + + await core.close() + await a.close() + await b.close() }) test('get before timeout', async function (t) { @@ -83,6 +91,8 @@ test('session get after timeout', async function (t) { } catch (err) { t.is(err.code, 'REQUEST_TIMEOUT') } + + await session.close() }) test('session get after inherited timeout', async function (t) { @@ -97,6 +107,8 @@ test('session get after inherited timeout', async function (t) { } catch (err) { t.is(err.code, 'REQUEST_TIMEOUT') } + + await session.close() }) test('core constructor timeout but disable on get', async function (t) {