Skip to content

Commit

Permalink
rocksdb: add force close option (#570)
Browse files Browse the repository at this point in the history
* add force close option

* do not handle requests if core is closed

* check replicator.destroyed

* close take opts object

* _close is called with force arg instead of opts obj

* add test for force close

* force close explicitly closes all sessions
  • Loading branch information
chm-diederichs authored Sep 25, 2024
1 parent d8a78e9 commit f946075
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
31 changes: 23 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,26 @@ module.exports = class Hypercore extends EventEmitter {
return !this._readonly && !!(this.keyPair && this.keyPair.secretKey)
}

close (err) {
close ({ error, force = !!error } = {}) {
if (this.closing) return this.closing
this.closing = this._close(err || null)

this.closing = this._close(error || null, force)
return this.closing
}

async _close (err) {
_forceClose (error) {
const sessions = [...this.sessions]

const closing = []
for (const session of sessions) {
if (session === this) continue
closing.push(session.close({ error, force: false }))
}

return Promise.all(closing)
}

async _close (error, force) {
if (this.opened === false) await this.opening

const i = this.sessions.indexOf(this)
Expand All @@ -482,18 +495,20 @@ module.exports = class Hypercore extends EventEmitter {

if (this.replicator !== null) {
this.replicator.findingPeers -= this._findingPeers
this.replicator.clearRequests(this.activeRequests, err)
this.replicator.clearRequests(this.activeRequests, error)
this.replicator.updateActivity(this._active ? -1 : 0)
}

this._findingPeers = 0

// check if there is still an active session
if (this.sessions.length || this.state.active > 1) {
if (force) {
await this._forceClose(error)
} else if (this.sessions.length || this.state.active > 1) {
// check if there is still an active session
await this.state.unref()

// if this is the last session and we are auto closing, trigger that first to enforce error handling
if (this.sessions.length === 1 && this.core.state.active === 1 && this.autoClose) await this.sessions[0].close(err)
if (this.sessions.length === 1 && this.core.state.active === 1 && this.autoClose) await this.sessions[0].close({ error })
// emit "fake" close as this is a session

this.emit('close', false)
Expand Down Expand Up @@ -638,7 +653,7 @@ module.exports = class Hypercore extends EventEmitter {
const sessions = [...this.sessions]

const all = []
for (const s of sessions) all.push(s.close(err))
for (const s of sessions) all.push(s.close({ error: err, force: false })) // force false or else infinite recursion
await Promise.allSettled(all)
}

Expand Down
4 changes: 3 additions & 1 deletion lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ class Peer {
return
}

if (this.replicator.destroyed) return

await this._handleRequest(msg)
}

Expand All @@ -739,7 +741,7 @@ class Peer {
}

async _handleRequests () {
if (this.receiverBusy) return
if (this.receiverBusy || this.replicator.destroyed) return
this.receiverBusy = true
this.protomux.cork()

Expand Down
19 changes: 19 additions & 0 deletions test/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,25 @@ test('replication session keeps the core open', async function (t) {
t.alike(blk, b4a.from('c'), 'still replicating due to session')
})

test('force close kills replication session', async function (t) {
const a = await create(t)
const b = await create(t, a.key)

await a.append(['a', 'b', 'c', 'd', 'e'])

replicate(a, b, t, { session: true })

await a.close({ force: true })
await eventFlush()

const blk = b.get(2, { timeout: 1000 })

t.ok(a.core.closed)
t.ok(a.replicator.destroyed)

await t.exception(blk, /REQUEST_TIMEOUT/)
})

test('replicate range that fills initial size of bitfield page', async function (t) {
const a = await create(t)
await a.append(new Array(2 ** 15).fill('a'))
Expand Down

0 comments on commit f946075

Please sign in to comment.