From b0d2a3c43b4002e5742769659d62af2a2f44d47d Mon Sep 17 00:00:00 2001 From: Meno Abels Date: Mon, 17 Feb 2025 15:56:36 +0100 Subject: [PATCH] WIP --- src/apply-head-queue.ts | 11 +++++++++++ src/blockstore/loader.ts | 4 ++++ src/crdt-clock.ts | 22 +++++++++++++++++++++- tests/fireproof/attachable.test.ts | 24 ++++++++++++++++++------ 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/apply-head-queue.ts b/src/apply-head-queue.ts index 2e8cf9f9d..81a96f223 100644 --- a/src/apply-head-queue.ts +++ b/src/apply-head-queue.ts @@ -26,31 +26,41 @@ export function applyHeadQueue(worker: ApplyHeadWorkerFuncti let isProcessing = false; async function* process() { + console.log("process:1", queue.length); if (isProcessing || queue.length === 0) return; isProcessing = true; const allUpdates: DocUpdate[] = []; try { while (queue.length > 0) { + console.log("process:1.1:", queue.length); queue.sort((a, b) => (b.updates ? 1 : -1)); const task = queue.shift(); if (!task) continue; + console.log("process:1.2:", queue.length, worker.toString()); await worker(task.newHead, task.prevHead, task.updates !== undefined).catch((e) => { + console.log("process:1.2.1:", queue.length); throw logger.Error().Err(e).Msg("int_applyHead worker error").AsError(); }); // console.timeEnd('int_applyHead worker') + console.log("process:1.3:", queue.length); if (task.updates) { allUpdates.push(...task.updates); } + console.log("process:1.4:", queue.length); // Yield the updates if there are no tasks with updates left in the queue or the current task has updates if (!queue.some((t) => t.updates) || task.updates) { const allTasksHaveUpdates = queue.every((task) => task.updates !== null); + console.log("process:1.5:", queue.length); yield { updates: allUpdates, all: allTasksHaveUpdates }; + console.log("process:1.6:", queue.length); allUpdates.length = 0; } + console.log("process:1.7:", queue.length); } } finally { + console.log("process:1.2"); isProcessing = false; const generator = process(); let result = await generator.next(); @@ -58,6 +68,7 @@ export function applyHeadQueue(worker: ApplyHeadWorkerFuncti result = await generator.next(); } } + console.log("process:2"); } return { diff --git a/src/blockstore/loader.ts b/src/blockstore/loader.ts index e7571a1ce..9ed2a12d2 100644 --- a/src/blockstore/loader.ts +++ b/src/blockstore/loader.ts @@ -223,7 +223,9 @@ export class Loader implements Loadable { this.logger.Error().Err(e).Msg("error getting more readers"); } this.carLog = [...uniqueCids([meta.cars, ...this.carLog, ...carHeader.cars], this.seenCompacted)]; + console.log(">>>>> pre applyMeta", this.ebOpts.applyMeta.toString()) await this.ebOpts.applyMeta?.(carHeader.meta); + console.log(">>>>> post applyMeta") } finally { this.isCompacting = false; } @@ -541,7 +543,9 @@ export class Loader implements Loadable { protected async getMoreReaders(cids: AnyLink[], store: ActiveStore) { for (const cid of cids) { + console.log("getMoreReaders>>>", cid.toString()); await this.loadCar(cid, store); } + console.log("getMoreReaders<<<"); } } diff --git a/src/crdt-clock.ts b/src/crdt-clock.ts index f5ba2535f..5915216f4 100644 --- a/src/crdt-clock.ts +++ b/src/crdt-clock.ts @@ -100,40 +100,58 @@ export class CRDTClockImpl { // if (!(this.head && prevHead && newHead)) { // throw new Error("missing head"); // } + + console.log("int_applyHead:1") const noLoader = !localUpdates; // console.log("int_applyHead", this.applyHeadQueue.size(), this.head, newHead, prevHead, localUpdates); const ogHead = sortClockHead(this.head); + console.log("int_applyHead:2") newHead = sortClockHead(newHead); + console.log("int_applyHead:3") if (compareClockHeads(ogHead, newHead)) { + console.log("int_applyHead:4") return; } + console.log("int_applyHead:5") const ogPrev = sortClockHead(prevHead); + console.log("int_applyHead:6") if (compareClockHeads(ogHead, ogPrev)) { + console.log("int_applyHead:7") this.setHead(newHead); return; } // const noLoader = this.head.length === 1 && !updates?.length + console.log("int_applyHead:8") if (!this.blockstore) { throw this.logger.Error().Msg("missing blockstore").AsError(); } + console.log("int_applyHead:9") await validateBlocks(this.logger, newHead, this.blockstore); + console.log("int_applyHead:10") if (!this.transaction) { this.transaction = this.blockstore.openTransaction({ noLoader, add: false }); } const tblocks = this.transaction; + console.log("int_applyHead:11") const advancedHead = await advanceBlocks(this.logger, newHead, tblocks, this.head); + console.log("int_applyHead:12", tblocks, advancedHead) const result = await root(tblocks, advancedHead); + console.log("int_applyHead:12.x", result.additions.length) for (const { cid, bytes } of [ ...result.additions, // ...result.removals ]) { + console.log("int_applyHead:12.y", result.additions.length) tblocks.putSync(cid, bytes); } + console.log("int_applyHead:12.1") if (!noLoader) { + console.log("int_applyHead:13") await this.blockstore.commitTransaction(tblocks, { head: advancedHead }, { add: false, noLoader }); + console.log("int_applyHead:14") this.transaction = undefined; } this.setHead(advancedHead); @@ -162,9 +180,11 @@ function compareClockHeads(head1: ClockHead, head2: ClockHead) { async function advanceBlocks(logger: Logger, newHead: ClockHead, tblocks: CarTransaction, head: ClockHead) { for (const cid of newHead) { try { + console.log("advanceBlocks:1", cid.toString(), newHead.length) head = await advance(tblocks, head, cid); + console.log("advanceBlocks:2", cid.toString(), head) } catch (e) { - logger.Debug().Err(e).Msg("failed to advance head"); + logger.Error().Err(e).Msg("failed to advance head"); // console.log('failed to advance head:', cid.toString(), e) continue; } diff --git a/tests/fireproof/attachable.test.ts b/tests/fireproof/attachable.test.ts index e83b53dfe..12c1fa024 100644 --- a/tests/fireproof/attachable.test.ts +++ b/tests/fireproof/attachable.test.ts @@ -42,6 +42,9 @@ describe("join function", () => { let db: Database; let joinableDBs: string[] = []; beforeAll(async () => { + const gdb = fireproof("gdb"); + await gdb.put({ _id: `genesis`, value: `genesis` }); + await gdb.close(); const set = Math.random().toString(16); joinableDBs = await Promise.all( new Array(1).fill(1).map(async (_, i) => { @@ -53,10 +56,11 @@ describe("join function", () => { car: `memory://car/${name}`, meta: `memory://meta/${name}`, file: `memory://file/${name}`, - wal: `memory://file/${name}`, + wal: `memory://wal/${name}`, }, }, }); + await db.put({ _id: `genesis`, value: `genesis` }); // await db.ready(); for (let j = 0; j < 10; j++) { await db.put({ _id: `${i}-${j}`, value: `${i}-${set}` }); @@ -72,6 +76,7 @@ describe("join function", () => { base: `memory://db-${set}`, }, }); + await db.put({ _id: `genesis`, value: `genesis` }); for (let j = 0; j < 10; j++) { await db.put({ _id: `db-${j}`, value: `db-${set}` }); } @@ -80,7 +85,7 @@ describe("join function", () => { await db.close(); }); - it.skip("it is joinable detachable", async () => { + it("it is joinable detachable", async () => { const my = fireproof("my", { storeUrls: { base: "memory://my", @@ -90,7 +95,12 @@ describe("join function", () => { joinableDBs.map(async (name) => { const tmp = fireproof(name, { storeUrls: { - base: `memory://${name}`, + data: { + car: `memory://car/${name}`, + meta: `memory://meta/${name}`, + file: `memory://file/${name}`, + wal: `memory://wal/${name}`, + }, }, }); const res = await tmp.allDocs(); @@ -105,16 +115,18 @@ describe("join function", () => { expect(my.ledger.crdt.blockstore.loader.attachedStores.remotes().length).toBe(0); }); - it.skip("it is inbound syncing", async () => { + it("it is inbound syncing", async () => { + console.log("joinableDBs-1", joinableDBs); await Promise.all( joinableDBs.map(async (name) => { const attached = await db.attach(aJoinable(name)); expect(attached).toBeDefined(); }), ); + console.log("joinableDBs-2", joinableDBs); expect(db.ledger.crdt.blockstore.loader.attachedStores.remotes().length).toBe(joinableDBs.length); - await new Promise((resolve) => setTimeout(resolve, 1000)); const res = await db.allDocs(); + console.log("joinableDBs-3", joinableDBs); expect(res.rows.length).toBe(100); - }); + }, 1000000); });