Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Jun 14, 2024
1 parent 32fef46 commit daa4766
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 118 deletions.
241 changes: 123 additions & 118 deletions lib/merkle-tree.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,25 +424,15 @@ class ByteSeeker {

module.exports = class MerkleTree {
constructor (storage, roots, fork, signature, prologue) {
this.storage = storage
this.crypto = crypto
this.fork = fork
this.roots = roots
this.length = roots.length ? totalSpan(roots) / 2 : 0
this.byteLength = totalSize(roots)
this.signature = signature
this.prologue = prologue

this.storage = storage
this.unflushed = new Map()
this.cache = new Xache({ maxSize: TREE_CACHE })
this.flushing = null
this.truncated = false
this.truncateTo = 0
}

addNode (node) {
if (node.size === 0 && b4a.equals(node.hash, BLANK_HASH)) node = blankNode(node.index)
this.unflushed.set(node.index, node)
}

batch () {
Expand Down Expand Up @@ -480,24 +470,27 @@ module.exports = class MerkleTree {
getRoots (length) {
const indexes = flat.fullRoots(2 * length)
const roots = new Array(indexes.length)
const batch = this.storage.createReadBatch()

for (let i = 0; i < indexes.length; i++) {
roots[i] = this.get(indexes[i], true)
roots[i] = batch.getTreeNode(indexes[i], true)
}

batch.tryFlush()

return Promise.all(roots)
}

setPrologue ({ hash, length }) {
this.prologue = { hash, length }
}

addNodes (nodes) {
for (let i = 0; i < nodes.length; i++) {
const node = nodes[i]
this.unflushed.set(node.index, node)
}
}
// addNodes (nodes) {
// for (let i = 0; i < nodes.length; i++) {
// const node = nodes[i]
// this.unflushed.set(node.index, node)
// }
// }

getNeededNodes (length, start, end) {
const nodes = new Map()
Expand Down Expand Up @@ -526,11 +519,14 @@ module.exports = class MerkleTree {
async upgradeable (length) {
const indexes = flat.fullRoots(2 * length)
const roots = new Array(indexes.length)
const batch = this.storage.createReadBatch()

for (let i = 0; i < indexes.length; i++) {
roots[i] = this.get(indexes[i], false)
roots[i] = batch.getTreeNode(indexes[i], false)
}

batch.tryFlush()

for (const node of await Promise.all(roots)) {
if (node === null) return false
}
Expand Down Expand Up @@ -568,74 +564,74 @@ module.exports = class MerkleTree {
return getStoredNode(this.storage, index, this.cache, error)
}

async flush () {
this.flushing = this.unflushed
this.unflushed = new Map()

try {
if (this.truncated) await this._flushTruncation()
await this._flushNodes()
} catch (err) {
for (const node of this.flushing.values()) {
if (!this.unflushed.has(node.index)) this.unflushed.set(node.index, node)
}
throw err
} finally {
this.flushing = null
}
}

_flushTruncation () {
return new Promise((resolve, reject) => {
const t = this.truncateTo
const offset = t === 0 ? 0 : (t - 1) * 80 + 40

this.storage.truncate(offset, (err) => {
if (err) return reject(err)

if (this.truncateTo === t) {
this.truncateTo = 0
this.truncated = false
}

resolve()
})
})
}

_flushNodes () {
// TODO: write neighbors together etc etc
// TODO: bench loading a full disk page and copy to that instead
return new Promise((resolve, reject) => {
const slab = b4a.allocUnsafe(40 * this.flushing.size)

let error = null
let missing = this.flushing.size + 1
let offset = 0

for (const node of this.flushing.values()) {
const state = {
start: 0,
end: 40,
buffer: slab.subarray(offset, offset += 40)
}

c.uint64.encode(state, node.size)
c.raw.encode(state, node.hash)

this.storage.write(node.index * 40, state.buffer, done)
}

done(null)

function done (err) {
if (err) error = err
if (--missing > 0) return
if (error) reject(error)
else resolve()
}
})
}
// async flush () {
// this.flushing = this.unflushed
// this.unflushed = new Map()

// try {
// if (this.truncated) await this._flushTruncation()
// await this._flushNodes()
// } catch (err) {
// for (const node of this.flushing.values()) {
// if (!this.unflushed.has(node.index)) this.unflushed.set(node.index, node)
// }
// throw err
// } finally {
// this.flushing = null
// }
// }

// _flushTruncation () {
// return new Promise((resolve, reject) => {
// const t = this.truncateTo
// const offset = t === 0 ? 0 : (t - 1) * 80 + 40

// this.storage.truncate(offset, (err) => {
// if (err) return reject(err)

// if (this.truncateTo === t) {
// this.truncateTo = 0
// this.truncated = false
// }

// resolve()
// })
// })
// }

// _flushNodes () {
// // TODO: write neighbors together etc etc
// // TODO: bench loading a full disk page and copy to that instead
// return new Promise((resolve, reject) => {
// const slab = b4a.allocUnsafe(40 * this.flushing.size)

// let error = null
// let missing = this.flushing.size + 1
// let offset = 0

// for (const node of this.flushing.values()) {
// const state = {
// start: 0,
// end: 40,
// buffer: slab.subarray(offset, offset += 40)
// }

// c.uint64.encode(state, node.size)
// c.raw.encode(state, node.hash)

// this.storage.write(node.index * 40, state.buffer, done)
// }

// done(null)

// function done (err) {
// if (err) error = err
// if (--missing > 0) return
// if (error) reject(error)
// else resolve()
// }
// })
// }

clear () {
this.cache = new Xache({ maxSize: this.cache.maxSize })
Expand Down Expand Up @@ -773,7 +769,8 @@ module.exports = class MerkleTree {
if (iteRightSpan >= head) return 0

let cnt = 0
while (!ite.contains(head) && (await this.get(ite.index, false)) === null) {
// TODO: we could prop use a read batch here and do this in blocks of X for perf
while (!ite.contains(head) && !(await this.storage.hasTreeNode(ite.index, false))) {
cnt++
ite.parent()
}
Expand Down Expand Up @@ -804,36 +801,34 @@ module.exports = class MerkleTree {
}

static async open (storage, opts = {}) {
await new Promise((resolve, reject) => {
storage.read(0, OLD_TREE.length, (err, buf) => {
if (err) return resolve()
if (b4a.equals(buf, OLD_TREE)) return reject(new Error('Storage contains an incompatible merkle tree'))
resolve()
})
})

const length = typeof opts.length === 'number'
? opts.length
: await autoLength(storage)

const roots = []
for (const index of flat.fullRoots(2 * length)) {
roots.push(await getStoredNode(storage, index, null, true))
}

return new MerkleTree(storage, roots, opts.fork || 0, opts.signature || null, opts.prologue || null)
for await (const node of storage.iterator())
console.log('sup')
// await new Promise((resolve, reject) => {
// storage.read(0, OLD_TREE.length, (err, buf) => {
// if (err) return resolve()
// if (b4a.equals(buf, OLD_TREE)) return reject(new Error('Storage contains an incompatible merkle tree'))
// resolve()
// })
// })

// const length = typeof opts.length === 'number'
// ? opts.length
// : await autoLength(storage)

// const roots = []
// for (const index of flat.fullRoots(2 * length)) {
// roots.push(await getStoredNode(storage, index, null, true))
// }

// return new MerkleTree(storage, roots, opts.fork || 0, opts.signature || null, opts.prologue || null)
}
}

async function getByteRange (tree, index) {
const head = 2 * tree.length
if (((index & 1) === 0 ? index : flat.rightSpan(index)) >= head) {
throw BAD_ARGUMENT('Index is out of bounds')
}
return [await tree.byteOffset(index), (await tree.get(index)).size]
async function getNodeSize (batch, index) {
return (await batch.getTreeNode(index, true)).size
}

async function getByteOffset (tree, index) {
async function getByteOffset (tree, index, batch) {
if (index === 2 * tree.length) return tree.byteLength
if ((index & 1) === 1) index = flat.leftSpan(index)

Expand All @@ -850,15 +845,21 @@ async function getByteOffset (tree, index) {

const ite = flat.iterator(node.index)

if (batch === null) batch = tree.storage.createReadBatch()
const promises = []

while (ite.index !== index) {
if (index < ite.index) {
ite.leftChild()
} else {
offset += (await tree.get(ite.leftChild())).size
promises.push(batch.getTreeNode(ite.leftChild(), true))
ite.sibling()
}
}

batch.tryFlush()
for (const node of await Promise.all(promises)) offset += node.size

return offset
}

Expand Down Expand Up @@ -1065,21 +1066,21 @@ function seekProof (tree, seekRoot, root, p) {
}
}

function blockAndSeekProof (tree, node, seek, seekRoot, root, p) {
function blockAndSeekProof (tree, batch, node, seek, seekRoot, root, p) {
if (!node) return seekProof(tree, seekRoot, root, p)

const ite = flat.iterator(node.index)

p.node = []
if (!node.value) p.node.push(tree.get(ite.index))
if (!node.value) p.node.push(batch.getTreeNode(ite.index, true))

while (ite.index !== root) {
ite.sibling()

if (seek && ite.contains(seekRoot) && ite.index !== seekRoot) {
seekProof(tree, seekRoot, ite.index, p)
} else {
p.node.push(tree.get(ite.index))
p.node.push(batch.getTreeNode(ite.index, true))
}

ite.parent()
Expand Down Expand Up @@ -1335,10 +1336,13 @@ async function generateProof (tree, block, hash, seek, upgrade) {
additionalUpgrade: null
}

// TODO: allow this to be passed...
const batch = tree.storage.createReadBatch()

if (node !== null && (!upgrade || node.lastIndex < upgrade.start)) {
subTree = nodesToRoot(node.index, node.nodes, to)
const seekRoot = seek ? await seekUntrustedTree(tree, subTree, seek.bytes, seek.padding) : head
blockAndSeekProof(tree, node, seek, seekRoot, subTree, p)
blockAndSeekProof(tree, batch, node, seek, seekRoot, subTree, p)
} else if ((node || seek) && upgrade) {
subTree = seek ? await seekFromHead(tree, to, seek.bytes, seek.padding) : node.index
}
Expand All @@ -1348,6 +1352,7 @@ async function generateProof (tree, block, hash, seek, upgrade) {
if (head > to) additionalUpgradeProof(tree, to, head, p)
}

batch.tryFlush()
const [pNode, pSeek, pUpgrade, pAdditional] = await settleProof(p)

if (block) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"hypercore-crypto": "^3.2.1",
"hypercore-errors": "^1.2.0",
"hypercore-id-encoding": "^1.2.0",
"hypercore-on-the-rocks": "^0.0.0",
"hypertrace": "^1.2.1",
"is-options": "^1.0.1",
"protomux": "^3.5.0",
Expand Down

0 comments on commit daa4766

Please sign in to comment.