Skip to content

Commit

Permalink
rocksdb: add named sessions (#539)
Browse files Browse the repository at this point in the history
* define scoped states per session

* return byteLength from updated batch

* pass length to createNamedSession

* add tree method for merging one tree into another

* add core method for committing state

* state stores treeLength

* session gets from state

* handle case when no tree nodes are added

* fix session get and truncate

* core truncate passes entire state down

* batch has all info after being reconciled

* update batch tests

* session uses state props

* pass whole batch to be flushed

* copy over bitfield to named session

* minor corrections

* update batch tests to be on named sessions

* update core tests

* make sure we pass state to createUpdate

* add checkout option for batches

* optionally pass discovery key to core

* each state has an independent mutex

* bitfield should full copy entire buffer

* user data is available sync for default state

* named session is always writable

* commit can specify length

* corestore owns storage

* update flushed length prop

* expose restore batch on main class

* encryption should use session state

* rebase main

* fix storage usage

* test core length is increasing

* standard fixes

* use state props and pass tree to multisig

* fix bitfield page copy

* update tests

* move to session state to dedicated abstraction

* pass parent state as capability

* ensure we have correct treeLength and up to date bitfield when opening state

* only write each bitfield page once per flush

* truncate and clear mutate treeLength

* fixes for batch tests

* enable batch tests

* overwrite if session when opts.refresh is set

* enable batch clear test

* close storage when we close the core

* storage closes automatically

* auto teardown any created core

* we have to pass an error to mutex destruction...

* close all test cores

* more missing close

* more closes

* missing session close

* close db in createIfMissing test

* more closes

* make sure all sessions are closed too

* checkout should only truncate named session

* state tracks active sessions on open and close

* screen for failing test

* core closes state

* close existing state when creating named session

* missing session close

* more closes

* pass runner to helper

* close core instead

* close state first and fix teardown order

* close state last

* missing close

* missing close

* missing close

---------

Co-authored-by: Mathias Buus <[email protected]>
  • Loading branch information
chm-diederichs and mafintosh authored Sep 5, 2024
1 parent 342c1df commit 8c5feb9
Show file tree
Hide file tree
Showing 25 changed files with 1,168 additions and 636 deletions.
93 changes: 58 additions & 35 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ module.exports = class Hypercore extends EventEmitter {
this.storage = null
this.crypto = opts.crypto || hypercoreCrypto
this.core = null
this.state = null
this.replicator = null
this.encryption = null
this.extensions = new Map()
Expand Down Expand Up @@ -241,7 +242,6 @@ module.exports = class Hypercore extends EventEmitter {

_addSession (s) {
this.sessions.push(s)
if (this.core) this.core.active++
}

async setEncryptionKey (encryptionKey, opts) {
Expand All @@ -264,6 +264,7 @@ module.exports = class Hypercore extends EventEmitter {
this.encryption = o.encryption
this.writable = this._isWritable()
this.autoClose = o.autoClose
if (o.state) this.state = o.state.clone()

if (o.core) this.tracer.setParent(o.core.tracer)

Expand Down Expand Up @@ -317,6 +318,17 @@ module.exports = class Hypercore extends EventEmitter {
ensureEncryption(this, opts)
}

if (opts.name) {
// todo: need to make named sessions safe before ready
// atm we always copy the state in passCapabilities
await this.state.close()
this.state = await this.core.createSession(opts.name, opts.checkout, opts.refresh)

if (opts.checkout !== undefined) {
await this.state.truncate(opts.checkout, this.fork)
}
}

if (opts.manifest && !this.core.header.manifest) {
await this.core.setManifest(opts.manifest)
}
Expand Down Expand Up @@ -367,6 +379,7 @@ module.exports = class Hypercore extends EventEmitter {
sessions: this.sessions,
createIfMissing: opts.createIfMissing,
readonly: unlocked,
discoveryKey: opts.discoveryKey,
overwrite: opts.overwrite,
key,
keyPair: opts.keyPair,
Expand All @@ -379,8 +392,10 @@ module.exports = class Hypercore extends EventEmitter {
})
this.tracer.setParent(this.core.tracer)

this.state = this.core.state

if (opts.userData) {
const batch = this.core.storage.createWriteBatch()
const batch = this.state.storage.createWriteBatch()
for (const [key, value] of Object.entries(opts.userData)) {
this.core.userData(batch, key, value)
}
Expand Down Expand Up @@ -411,18 +426,18 @@ module.exports = class Hypercore extends EventEmitter {
_getSnapshot () {
if (this.sparse) {
return {
length: this.core.tree.length,
byteLength: this.core.tree.byteLength,
fork: this.core.tree.fork,
compatLength: this.core.tree.length
length: this.state.tree.length,
byteLength: this.state.tree.byteLength,
fork: this.state.tree.fork,
compatLength: this.state.tree.length
}
}

return {
length: this.core.header.hints.contiguousLength,
length: this.state.header.hints.contiguousLength,
byteLength: 0,
fork: this.core.tree.fork,
compatLength: this.core.header.hints.contiguousLength
fork: this.state.tree.fork,
compatLength: this.state.header.hints.contiguousLength
}
}

Expand Down Expand Up @@ -451,7 +466,6 @@ module.exports = class Hypercore extends EventEmitter {
if (i === -1) return

this.sessions.splice(i, 1)
this.core.active--
this.readable = false
this.writable = false
this.closed = true
Expand All @@ -471,10 +485,13 @@ module.exports = class Hypercore extends EventEmitter {

this._findingPeers = 0

if (this.sessions.length || this.core.active > 0) {
if (this.sessions.length || this.state.active > 1) {
await this.state.close()

// if this is the last session and we are auto closing, trigger that first to enforce error handling
if (this.sessions.length === 1 && this.core.active === 1 && this.autoClose) await this.sessions[0].close(err)
if (this.sessions.length === 1 && this.state.active === 1 && this.autoClose) await this.sessions[0].close(err)
// emit "fake" close as this is a session

this.emit('close', false)
return
}
Expand Down Expand Up @@ -538,11 +555,11 @@ module.exports = class Hypercore extends EventEmitter {
if (this._snapshot) return this._snapshot.length
if (this.core === null) return 0
if (!this.sparse) return this.contiguousLength
return this.core.tree.length
return this.state.tree.length
}

get indexedLength () {
return this.length
get flushedLength () {
return this.state === this.core.state ? this.core.tree.length : this.state.treeLength
}

/**
Expand All @@ -552,7 +569,7 @@ module.exports = class Hypercore extends EventEmitter {
if (this._snapshot) return this._snapshot.byteLength
if (this.core === null) return 0
if (!this.sparse) return this.contiguousByteLength
return this.core.tree.byteLength - (this.core.tree.length * this.padding)
return this.state.tree.byteLength - (this.state.tree.length * this.padding)
}

get contiguousLength () {
Expand Down Expand Up @@ -711,9 +728,7 @@ module.exports = class Hypercore extends EventEmitter {

async setUserData (key, value, { flush = false } = {}) {
if (this.opened === false) await this.opening
const batch = this.core.storage.createWriteBatch()
this.core.userData(batch, key, value)
await batch.flush()
await this.state.setUserData(key, value)
}

async getUserData (key) {
Expand All @@ -725,7 +740,7 @@ module.exports = class Hypercore extends EventEmitter {
}

createTreeBatch () {
return this.core.tree.batch()
return this.state.tree.batch()
}

findingPeers () {
Expand Down Expand Up @@ -810,9 +825,9 @@ module.exports = class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening
if (!isValidIndex(start) || !isValidIndex(end)) throw ASSERTION('has range is invalid')

if (end === start + 1) return this.core.bitfield.get(start)
if (end === start + 1) return this.state.bitfield.get(start)

const i = this.core.bitfield.firstUnset(start)
const i = this.state.bitfield.firstUnset(start)
return i === -1 || i >= end
}

Expand Down Expand Up @@ -861,7 +876,7 @@ module.exports = class Hypercore extends EventEmitter {
if (start >= end) return cleared
if (start >= this.length) return cleared

await this.core.clear(start, end, cleared)
await this.state.clear(start, end, cleared)

return cleared
}
Expand All @@ -876,8 +891,8 @@ module.exports = class Hypercore extends EventEmitter {

if (this.core.isFlushing) await this.core.flushed()

if (this.core.bitfield.get(index)) {
const reader = this.core.storage.createReadBatch()
if (this.state.bitfield.get(index)) {
const reader = this.state.storage.createReadBatch()
block = this.core.blocks.get(reader, index)

if (this.cache) this.cache.set(index, block)
Expand All @@ -896,12 +911,17 @@ module.exports = class Hypercore extends EventEmitter {
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

block = this._cacheOnResolve(index, req.promise, this.core.tree.fork)
block = this._cacheOnResolve(index, req.promise, this.state.tree.fork)
}

return block
}

async restoreBatch (length, blocks) {
if (this.opened === false) await this.opening
return this.state.tree.restoreBatch(length)
}

async _cacheOnResolve (index, req, fork) {
const resolved = await req

Expand Down Expand Up @@ -970,27 +990,30 @@ module.exports = class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening

const {
fork = this.core.tree.fork + 1,
fork = this.state.tree.fork + 1,
keyPair = this.keyPair,
signature = null
} = typeof opts === 'number' ? { fork: opts } : opts

const isDefault = this.state === this.core.state
const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey))
if (writable === false && (newLength > 0 || fork !== this.core.tree.fork)) throw SESSION_NOT_WRITABLE()
if (isDefault && writable === false && (newLength > 0 || fork !== this.state.tree.fork)) throw SESSION_NOT_WRITABLE()

await this.core.truncate(newLength, fork, { keyPair, signature })
await this.state.truncate(newLength, fork, { keyPair, signature })

// TODO: Should propagate from an event triggered by the oplog
this.replicator.updateAll()
if (this.state === this.core.state) this.replicator.updateAll()
}

async append (blocks, opts = {}) {
if (this.opened === false) await this.opening

const isDefault = this.state === this.core.state

const { keyPair = this.keyPair, signature = null } = opts
const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey))

if (writable === false) throw SESSION_NOT_WRITABLE()
if (isDefault && writable === false) throw SESSION_NOT_WRITABLE()

blocks = Array.isArray(blocks) ? blocks : [blocks]
this.tracer.trace('append', { blocks })
Expand All @@ -1010,7 +1033,7 @@ module.exports = class Hypercore extends EventEmitter {
}
}

return this.core.append(buffers, { keyPair, signature, preappend })
return this.state.append(buffers, { keyPair, signature, preappend })
}

async treeHash (length) {
Expand All @@ -1019,7 +1042,7 @@ module.exports = class Hypercore extends EventEmitter {
length = this.core.tree.length
}

const roots = await this.core.tree.getRoots(length)
const roots = await this.state.tree.getRoots(length)
return this.crypto.tree(roots)
}

Expand Down Expand Up @@ -1105,8 +1128,8 @@ function toHex (buf) {
}

function preappend (blocks) {
const offset = this.core.tree.length
const fork = this.core.tree.fork
const offset = this.state.tree.length
const fork = this.state.tree.fork

for (let i = 0; i < blocks.length; i++) {
this.encryption.encrypt(offset + i, blocks[i], fork)
Expand Down
46 changes: 33 additions & 13 deletions lib/bit-interlude.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,47 @@ module.exports = class BitInterlude {
this.ranges.push({ start, end })
}

flush (writer) {
for (const { start, end } of this.ranges) {
let index = start
flush (writer, debug) {
if (!this.ranges.length) return { ranges: [], drop: this.drop }

while (index < end) {
const page = this.bitfield.getPage(index, this.drop === false)
let index = this.ranges[0].start
const final = this.ranges[this.ranges.length - 1].end

const buf = b4a.allocUnsafe(page.bitfield.byteLength)
buf.set(page.bitfield)
let i = 0

const last = (page.index + 1) * (buf.byteLength << 3)
const stop = end < last ? end : last
while (index < final) {
const page = this.bitfield.getPage(index, this.drop === false)
const buf = b4a.allocUnsafe(page.bitfield.byteLength)

const offset = page.index * buf.byteLength << 3
const view = new DataView(
buf.buffer,
buf.byteOffset,
buf.byteLength
)

quickbit.fill(buf, this.drop === false, index - offset, stop - offset)
for (let i = 0; i < page.bitfield.length; i++) {
view.setUint32(i * 4, page.bitfield[i], true)
}

const last = (page.index + 1) * (buf.byteLength << 3)
const offset = page.index * buf.byteLength << 3

while (i < this.ranges.length) {
const { start, end } = this.ranges[i]

const from = start < index ? index : start
const to = end < last ? end : last

writer.putBitfieldPage(page.index, buf)
quickbit.fill(buf, this.drop === false, from - offset, to - offset)

index = stop
index = to

if (to === last) break

i++
}

writer.putBitfieldPage(page.index, buf)
}

return {
Expand Down
19 changes: 19 additions & 0 deletions lib/bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ module.exports = class Bitfield {
}
}

static from (bitfield) {
return new Bitfield(bitfield.toBuffer(bitfield._pages.maxLength * BITS_PER_PAGE))
}

toBuffer (length) {
const pages = Math.ceil(length / BITS_PER_PAGE)
const buffer = b4a.allocUnsafe(pages * BYTES_PER_PAGE)
Expand Down Expand Up @@ -242,6 +246,21 @@ module.exports = class Bitfield {
return p || null
}

merge (bitfield, length) {
let i = 0

while (i < length) {
const start = bitfield.firstSet(i)
i = bitfield.firstUnset(start)

if (i === -1 || i > length) i = length

this.setRange(start, i, true)

if (i >= length) break
}
}

get (index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE
Expand Down
Loading

0 comments on commit 8c5feb9

Please sign in to comment.