Skip to content

Commit

Permalink
rocksdb: add db snapshots (#562)
Browse files Browse the repository at this point in the history
* rename clone/close to ref/unref

* add snapshot method to session state

* snapshots are not writable

* snapshotted session takes state snapshot

* fall back to core if snapshot does not have data

* review by @mafintosh

* gc snapshot

* fix snapshot teardown

* _snapshot -> storageSnapshot

* dry it a bit

---------

Co-authored-by: Mathias Buus <[email protected]>
  • Loading branch information
chm-diederichs and mafintosh authored Sep 7, 2024
1 parent 3cfc609 commit cade0cd
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
54 changes: 34 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ module.exports = class Hypercore extends EventEmitter {
this.encryption = o.encryption
this.writable = this._isWritable()
this.autoClose = o.autoClose
if (o.state) this.state = o.state.clone()

if (o.state) this.state = this.snapshotted ? o.state.snapshot() : o.state.ref()

if (o.core) this.tracer.setParent(o.core.tracer)

Expand Down Expand Up @@ -321,7 +322,7 @@ module.exports = class Hypercore extends EventEmitter {
if (opts.name) {
// todo: need to make named sessions safe before ready
// atm we always copy the state in passCapabilities
await this.state.close()
await this.state.unref()
this.state = await this.core.createSession(opts.name, opts.checkout, opts.refresh)

if (opts.checkout !== undefined) {
Expand Down Expand Up @@ -486,7 +487,7 @@ module.exports = class Hypercore extends EventEmitter {
this._findingPeers = 0

if (this.sessions.length || this.state.active > 1) {
await this.state.close()
await this.state.unref()

// if this is the last session and we are auto closing, trigger that first to enforce error handling
if (this.sessions.length === 1 && this.state.active === 1 && this.autoClose) await this.sessions[0].close(err)
Expand Down Expand Up @@ -838,7 +839,6 @@ module.exports = class Hypercore extends EventEmitter {
this.tracer.trace('get', { index })

if (this.closing !== null) throw SESSION_CLOSED()
if (this._snapshot !== null && index >= this._snapshot.compatLength) throw SNAPSHOT_NOT_AVAILABLE()

const encoding = (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding

Expand Down Expand Up @@ -889,32 +889,34 @@ module.exports = class Hypercore extends EventEmitter {
async _get (index, opts) {
if (this.core.isFlushing) await this.core.flushed()

const reader = this.state.storage.createReadBatch()
const promise = reader.getBlock(index)
reader.tryFlush()
let block = await readBlock(this.state.createReadBatch(), index)

let block = await promise
// snapshot should check if core has block
if (block === null && this._snapshot !== null) {
checkSnapshot(this._snapshot, index)
block = await readBlock(this.core.state.createReadBatch(), index)
checkSnapshot(this._snapshot, index)
}

if (block !== null) {
if (this.cache) this.cache.set(index, Promise.resolve(block))
} else {
if (!this._shouldWait(opts, this.wait)) return null
return block
}

if (opts && opts.onwait) opts.onwait(index, this)
if (this.onwait) this.onwait(index, this)
if (!this._shouldWait(opts, this.wait)) return null

const activeRequests = (opts && opts.activeRequests) || this.activeRequests
if (opts && opts.onwait) opts.onwait(index, this)
if (this.onwait) this.onwait(index, this)

const req = this.replicator.addBlock(activeRequests, index)
req.snapshot = index < this.length
const activeRequests = (opts && opts.activeRequests) || this.activeRequests

const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)
const req = this.replicator.addBlock(activeRequests, index)
req.snapshot = index < this.length

block = this._cacheOnResolve(index, req.promise, this.state.tree.fork)
}
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

return block
return this._cacheOnResolve(index, req.promise, this.state.tree.fork)
}

async restoreBatch (length, blocks) {
Expand All @@ -930,6 +932,8 @@ module.exports = class Hypercore extends EventEmitter {
? unslab(resolved)
: resolved

if (this._snapshot !== null) checkSnapshot(this._snapshot, index)

if (this.cache && fork === this.core.tree.fork) {
this.cache.set(index, Promise.resolve(block))
}
Expand Down Expand Up @@ -1151,3 +1155,13 @@ function createCache (cache) {
function isValidIndex (index) {
return index === 0 || index > 0
}

function checkSnapshot (snapshot, index) {
if (index >= snapshot.compatLength) throw SNAPSHOT_NOT_AVAILABLE()
}

function readBlock (reader, index) {
const promise = reader.getBlock(index)
reader.tryFlush()
return promise
}
43 changes: 36 additions & 7 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,40 +113,69 @@ class Update {
}

class SessionState {
constructor (core, storage, blocks, tree, bitfield, treeLength) {
constructor (core, storage, blocks, tree, bitfield, treeLength, snapshot) {
this.core = core

this.storage = storage
this.storageSnapshot = snapshot || null

this.mutex = new Mutex()

this.blocks = blocks
this.tree = tree
this.bitfield = bitfield

this.treeLength = treeLength

this.active = 1

this._onflush = null
this._flushing = null
this._activeBatch = null
}

get isSnapshot () {
return this.storageSnapshot !== null
}

get isDefault () {
return this.core.state === this
}

async close () {
async unref () {
if (--this.active > 0) return Promise.resolve()

if (this.storageSnapshot) this.storageSnapshot.destroy()

await this.storage.close()
await this.mutex.destroy(new Error('Closed'))
}

clone () {
ref () {
this.active++
return this
}

snapshot () {
const snapshot = this.storage.snapshot()

const s = new SessionState(
this.core,
this.storage,
this.blocks,
this.tree,
this.bitfield,
this.treeLength,
snapshot
)

return s
}

createReadBatch () {
return this.storage.createReadBatch({ snapshot: this.storageSnapshot })
}

_clearActiveBatch (err) {
if (!this._activeBatch) return

Expand All @@ -159,7 +188,7 @@ class SessionState {
}

createUpdate () {
assert(!this._activeBatch)
assert(!this._activeBatch && !this.isSnapshot)

this._activeBatch = this.storage.createWriteBatch()
return new Update(this._activeBatch, this.bitfield, this.core.header, this)
Expand Down Expand Up @@ -391,7 +420,7 @@ module.exports = class Core {
this.sessions = sessions
this.globalCache = globalCache

this.state = new SessionState(this, storage, this.blocks, tree, bitfield, tree.length)
this.state = new SessionState(this, storage, this.blocks, tree, bitfield, tree.length, null)

this._manifestFlushed = !!header.manifest
this._maxOplogSize = 65536
Expand Down Expand Up @@ -420,7 +449,7 @@ module.exports = class Core {
length: (length === treeLength || !treeInfo) ? treeLength : treeInfo.length
})

return new SessionState(this, storage, this.blocks, tree, bitfield, treeLength)
return new SessionState(this, storage, this.blocks, tree, bitfield, treeLength, null)
}

static async open (db, opts = {}) {
Expand Down Expand Up @@ -1054,7 +1083,7 @@ module.exports = class Core {

async close () {
this.closed = true
await this.state.close() // TODO: add option where the storage is NOT closed for corestore
await this.state.unref() // TODO: add option where the storage is NOT closed for corestore
}
}

Expand Down

0 comments on commit cade0cd

Please sign in to comment.