Skip to content

Commit

Permalink
rocksdb: merge main (#564)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: HDegroote <[email protected]>
Co-authored-by: Mathias Buus <[email protected]>
Co-authored-by: rafapaezbas <[email protected]>
  • Loading branch information
4 people authored Sep 11, 2024
1 parent 6c213d7 commit 5516503
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 22 deletions.
19 changes: 16 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const {
BAD_ARGUMENT,
SESSION_CLOSED,
SESSION_NOT_WRITABLE,
SNAPSHOT_NOT_AVAILABLE
SNAPSHOT_NOT_AVAILABLE,
DECODING_ERROR
} = require('hypercore-errors')

const promises = Symbol.for('hypercore.promises')
Expand Down Expand Up @@ -247,6 +248,14 @@ module.exports = class Hypercore extends EventEmitter {
this.writable = this._isWritable()
}

setActive (bool) {
const active = !!bool
if (active === this._active || this.closing) return
this._active = active
if (!this.opened) return
this.replicator.updateActivity(this._active ? 1 : -1)
}

_passCapabilities (o) {
if (!this.keyPair) this.keyPair = o.keyPair
this.crypto = o.crypto
Expand Down Expand Up @@ -390,7 +399,7 @@ module.exports = class Hypercore extends EventEmitter {
if (opts.userData) {
const batch = this.state.storage.createWriteBatch()
for (const [key, value] of Object.entries(opts.userData)) {
this.core.userData(batch, key, value)
this.core.setUserData(batch, key, value)
}
await batch.flush()
}
Expand Down Expand Up @@ -1092,7 +1101,11 @@ module.exports = class Hypercore extends EventEmitter {

_decode (enc, block) {
if (this.padding) block = block.subarray(this.padding)
if (enc) return c.decode(enc, block)
try {
if (enc) return c.decode(enc, block)
} catch {
throw DECODING_ERROR()
}
return block
}
}
Expand Down
24 changes: 20 additions & 4 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ module.exports = class Core {
key: opts.key || (compat ? manifest.signers[0].publicKey : Verifier.manifestHash(manifest)),
manifest,
external: null,
keyPair,
keyPair: keyPair ? { publicKey: keyPair.publicKey, secretKey: keyPair.secretKey || null } : null,
userData: [],
tree: {
fork: 0,
Expand Down Expand Up @@ -528,6 +528,11 @@ module.exports = class Core {
header.tree.signature = unslab(header.tree.signature)
}

if (header.keyPair) {
header.keyPair.publicKey = unslab(header.keyPair.publicKey)
header.keyPair.secretKey = unslab(header.keyPair.secretKey)
}

if (opts.manifest) {
// if we provide a manifest and no key, verify that the stored key is the same
if (!opts.key && !Verifier.isValidManifest(header.key, Verifier.createManifest(opts.manifest))) {
Expand Down Expand Up @@ -562,6 +567,10 @@ module.exports = class Core {
await writer.flush()
}

for await (const { key, value } of storage.createUserDataStream()) {
header.userData.push({ key, value: unslab(value) })
}

// compat from earlier version that do not store contig length
// if (header.hints.contiguousLength === 0) {
// while (bitfield.get(header.hints.contiguousLength)) header.hints.contiguousLength++
Expand Down Expand Up @@ -728,7 +737,7 @@ module.exports = class Core {
update.flushTreeBatch(batch)

for await (const { key, value } of src.storage.createUserDataStream()) {
this.userData(update.batch, key, value)
this.setUserData(update.batch, key, value)
}

await this.state.flushUpdate(update)
Expand Down Expand Up @@ -797,7 +806,7 @@ module.exports = class Core {

if (exists) continue

this.header.userData.push({ key: update.key, value: update.value })
this.header.userData.push({ key: update.key, value: unslab(update.value) })
break
}
}
Expand All @@ -808,7 +817,14 @@ module.exports = class Core {
this.blocks.put(writer, index, value)
}

userData (update, key, value) {
userData (key, value) {
const update = this.state.createUpdate()
this.setUserData(update, key, value)

return this.state.flushUpdate(update)
}

setUserData (update, key, value) {
return update.setUserData(key, value)
}

Expand Down
73 changes: 69 additions & 4 deletions lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,18 @@ class Peer {
this.wireRange = this.channel.messages[8]
this.wireExtension = this.channel.messages[9]

// Same stats as replicator, but for this specific peer
this.stats = {
wireSync: { tx: 0, rx: 0 },
wireRequest: { tx: 0, rx: 0 },
wireCancel: { tx: 0, rx: 0 },
wireData: { tx: 0, rx: 0 },
wireWant: { tx: 0, rx: 0 },
wireBitfield: { tx: 0, rx: 0 },
wireRange: { tx: 0, rx: 0 },
wireExtension: { tx: 0, rx: 0 }
}

this.receiverQueue = new ReceiverQueue()
this.receiverBusy = false

Expand Down Expand Up @@ -466,10 +478,12 @@ class Peer {
start,
length
})
incrementTx(this.stats.wireRange, this.replicator.stats.wireRange)
}

extension (name, message) {
this.wireExtension.send({ name: name === this.lastExtensionSent ? '' : name, message })
incrementTx(this.stats.wireExtension, this.replicator.stats.wireExtension)
this.lastExtensionSent = name
}

Expand Down Expand Up @@ -501,6 +515,7 @@ class Peer {
downloading: this.replicator.isDownloading(),
hasManifest: !!this.core.header.manifest && this.core.compat === false
})
incrementTx(this.stats.wireSync, this.replicator.stats.wireSync)
}

onopen ({ seeks, capability }) {
Expand Down Expand Up @@ -752,6 +767,7 @@ class Peer {
if (req.msg.manifest && this.core.header.manifest) {
const manifest = this.core.header.manifest
this.wireData.send({ request: req.msg.id, fork: this.core.tree.fork, block: null, hash: null, seek: null, upgrade: null, manifest })
incrementTx(this.stats.wireData, this.replicator.stats.wireData)
return
}

Expand All @@ -772,6 +788,7 @@ class Peer {
upgrade: proof.upgrade,
manifest: proof.manifest
})
incrementTx(this.stats.wireData, this.replicator.stats.wireData)
}

_cancelRequest (req) {
Expand All @@ -789,6 +806,7 @@ class Peer {
if (this.roundtripQueue === null) this.roundtripQueue = new RoundtripQueue()
this.roundtripQueue.add(req.id)
this.wireCancel.send({ request: req.id })
incrementTx(this.stats.wireCancel, this.replicator.stats.wireCancel)
}

_checkIfConflict () {
Expand All @@ -808,6 +826,8 @@ class Peer {
length
}
})

incrementTx(this.stats.wireRequest, this.replicator.stats.wireRequest)
}

async ondata (data) {
Expand Down Expand Up @@ -994,8 +1014,8 @@ class Peer {
this._remoteContiguousLength = start
}

if (start === 0 && drop === false && length > this._remoteContiguousLength) {
this._remoteContiguousLength = length
if (start === 0 && drop === false) {
if (length > this._remoteContiguousLength) this._remoteContiguousLength = length
} else if (length === 1) {
const bitfield = this.core.skipBitfield === null ? this.core.bitfield : this.core.skipBitfield
this.remoteBitfield.set(start, has)
Expand Down Expand Up @@ -1117,7 +1137,7 @@ class Peer {
let index = off + i
if (index > s.seeker.end) index -= len

if (this.remoteBitfield.get(index) === false) continue
if (this._remoteHasBlock(index) === false) continue
if (this.core.bitfield.get(index) === true) continue
if (!this._hasTreeParent(index)) continue

Expand Down Expand Up @@ -1323,7 +1343,6 @@ class Peer {
if (this._remoteHasBlock(index) === false) continue

const req = this._makeRequest(false, 0, 0)
if (req === null) continue

req.hash = { index: 2 * index, nodes: f.batch.want.nodes }

Expand Down Expand Up @@ -1351,6 +1370,7 @@ class Peer {
start: i * DEFAULT_SEGMENT_SIZE,
length: DEFAULT_SEGMENT_SIZE
})
incrementTx(this.stats.wireWant, this.replicator.stats.wireWant)
}
}

Expand Down Expand Up @@ -1394,6 +1414,7 @@ class Peer {
this.tracer.trace('send', req)

this.wireRequest.send(req)
incrementTx(this.stats.wireRequest, this.replicator.stats.wireRequest)
}
}

Expand Down Expand Up @@ -1427,6 +1448,19 @@ module.exports = class Replicator {

this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT

// Note: nodata and unwant not currently tracked
// tx = transmitted, rx = received
this.stats = {
wireSync: { tx: 0, rx: 0 },
wireRequest: { tx: 0, rx: 0 },
wireCancel: { tx: 0, rx: 0 },
wireData: { tx: 0, rx: 0 },
wireWant: { tx: 0, rx: 0 },
wireBitfield: { tx: 0, rx: 0 },
wireRange: { tx: 0, rx: 0 },
wireExtension: { tx: 0, rx: 0 }
}

this._attached = new Set()
this._inflight = new InflightTracker()
this._blocks = new BlockTracker()
Expand Down Expand Up @@ -2026,12 +2060,25 @@ module.exports = class Replicator {
}

_onwant (peer, start, length) {
const contig = Math.min(this.core.tree.length, this.core.header.hints.contiguousLength)

if (start + length < contig || (this.core.tree.length === contig)) {
peer.wireRange.send({
drop: false,
start: 0,
length: contig
})
incrementTx(peer.stats.wireRange, this.stats.wireRange)
return
}

length = Math.min(length, this.core.tree.length - start)

peer.protomux.cork()

for (const msg of this.core.bitfield.want(start, length)) {
peer.wireBitfield.send(msg)
incrementTx(peer.stats.wireBitfield, this.stats.wireBitfield)
}

peer.protomux.uncork()
Expand Down Expand Up @@ -2405,18 +2452,22 @@ function onwiredrain (c) {
}

function onwiresync (m, c) {
incrementRx(c.userData.stats.wireSync, c.userData.replicator.stats.wireSync)
return c.userData.onsync(m)
}

function onwirerequest (m, c) {
incrementRx(c.userData.stats.wireRequest, c.userData.replicator.stats.wireRequest)
return c.userData.onrequest(m)
}

function onwirecancel (m, c) {
incrementRx(c.userData.stats.wireCancel, c.userData.replicator.stats.wireCancel)
return c.userData.oncancel(m)
}

function onwiredata (m, c) {
incrementRx(c.userData.stats.wireData, c.userData.replicator.stats.wireData)
return c.userData.ondata(m)
}

Expand All @@ -2425,6 +2476,7 @@ function onwirenodata (m, c) {
}

function onwirewant (m, c) {
incrementRx(c.userData.stats.wireWant, c.userData.replicator.stats.wireWant)
return c.userData.onwant(m)
}

Expand All @@ -2433,14 +2485,17 @@ function onwireunwant (m, c) {
}

function onwirebitfield (m, c) {
incrementRx(c.userData.stats.wireBitfield, c.userData.replicator.stats.wireBitfield)
return c.userData.onbitfield(m)
}

function onwirerange (m, c) {
incrementRx(c.userData.stats.wireRange, c.userData.replicator.stats.wireRange)
return c.userData.onrange(m)
}

function onwireextension (m, c) {
incrementRx(c.userData.stats.wireExtension, c.userData.replicator.stats.wireExtension)
return c.userData.onextension(m)
}

Expand All @@ -2455,3 +2510,13 @@ function isBlockRequest (req) {
function isUpgradeRequest (req) {
return req !== null && req.upgrade !== null
}

function incrementTx (stats1, stats2) {
stats1.tx++
stats2.tx++
}

function incrementRx (stats1, stats2) {
stats1.rx++
stats2.rx++
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hypercore",
"version": "10.37.15",
"version": "10.37.20",
"description": "Hypercore is a secure, distributed append-only log",
"main": "index.js",
"scripts": {
Expand Down
31 changes: 27 additions & 4 deletions test/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,32 @@ test('core - user data', async function (t) {
t.alike(await coreReopen.storage.getUserData('hej'), b4a.from('world'))
})

test('core - header does not retain slabs', async function (t) {
const { core, reopen } = await create(t)
await setUserData(core, 'hello', b4a.from('world'))

t.is(core.header.key.buffer.byteLength, 32, 'unslabbed key')
t.is(core.header.keyPair.publicKey.buffer.byteLength, 32, 'unslabbed public key')
t.is(core.header.keyPair.secretKey.buffer.byteLength, 64, 'unslabbed private key')
t.is(core.header.manifest.signers[0].namespace.buffer.byteLength, 32, 'unslabbed signers namespace')
t.is(core.header.manifest.signers[0].publicKey.buffer.byteLength, 32, 'unslabbed signers publicKey')

t.is(core.header.userData[0].value.buffer.byteLength, 5, 'unslabbed the userdata value')

// check the different code path when re-opening
const coreReopen = await reopen()

t.is(coreReopen.header.key.buffer.byteLength, 32, 'reopen unslabbed key')
t.is(coreReopen.header.keyPair.publicKey.buffer.byteLength, 32, 'reopen unslabbed public key')
t.is(coreReopen.header.keyPair.secretKey.buffer.byteLength, 64, 'reopen unslabbed secret key')
t.is(coreReopen.header.manifest.signers[0].namespace.buffer.byteLength, 32, 'reopen unslabbed signers namespace')
t.is(coreReopen.header.manifest.signers[0].publicKey.buffer.byteLength, 32, 'reopen unslabbed signers publicKey')

t.is(coreReopen.header.userData[0].value.buffer.byteLength, 5, 'reopen unslabbed the userdata value')

await coreReopen.close()
})

test('core - verify', async function (t) {
const { core } = await create(t)
const { core: clone } = await create(t, { keyPair: { publicKey: core.header.keyPair.publicKey } })
Expand Down Expand Up @@ -614,10 +640,7 @@ async function getBlock (core, i) {
}

async function setUserData (core, key, value) {
const w = core.storage.createWriteBatch()
const p = core.userData(w, key, value)
await w.flush()
return p
return core.userData(key, value)
}

async function getProof (core, req) {
Expand Down
Loading

0 comments on commit 5516503

Please sign in to comment.