Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mabels committed Feb 17, 2025
1 parent f5b8600 commit b0d2a3c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/apply-head-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,49 @@ export function applyHeadQueue<T extends DocTypes>(worker: ApplyHeadWorkerFuncti
let isProcessing = false;

async function* process() {
console.log("process:1", queue.length);
if (isProcessing || queue.length === 0) return;
isProcessing = true;
const allUpdates: DocUpdate<T>[] = [];
try {
while (queue.length > 0) {
console.log("process:1.1:", queue.length);
queue.sort((a, b) => (b.updates ? 1 : -1));
const task = queue.shift();
if (!task) continue;

console.log("process:1.2:", queue.length, worker.toString());
await worker(task.newHead, task.prevHead, task.updates !== undefined).catch((e) => {
console.log("process:1.2.1:", queue.length);
throw logger.Error().Err(e).Msg("int_applyHead worker error").AsError();
});
// console.timeEnd('int_applyHead worker')

console.log("process:1.3:", queue.length);
if (task.updates) {
allUpdates.push(...task.updates);
}
console.log("process:1.4:", queue.length);
// Yield the updates if there are no tasks with updates left in the queue or the current task has updates
if (!queue.some((t) => t.updates) || task.updates) {
const allTasksHaveUpdates = queue.every((task) => task.updates !== null);
console.log("process:1.5:", queue.length);
yield { updates: allUpdates, all: allTasksHaveUpdates };
console.log("process:1.6:", queue.length);
allUpdates.length = 0;
}
console.log("process:1.7:", queue.length);
}
} finally {
console.log("process:1.2");
isProcessing = false;
const generator = process();
let result = await generator.next();
while (!result.done) {
result = await generator.next();
}
}
console.log("process:2");
}

return {
Expand Down
4 changes: 4 additions & 0 deletions src/blockstore/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ export class Loader implements Loadable {
this.logger.Error().Err(e).Msg("error getting more readers");
}
this.carLog = [...uniqueCids([meta.cars, ...this.carLog, ...carHeader.cars], this.seenCompacted)];
console.log(">>>>> pre applyMeta", this.ebOpts.applyMeta.toString())
await this.ebOpts.applyMeta?.(carHeader.meta);
console.log(">>>>> post applyMeta")
} finally {
this.isCompacting = false;
}
Expand Down Expand Up @@ -541,7 +543,9 @@ export class Loader implements Loadable {

protected async getMoreReaders(cids: AnyLink[], store: ActiveStore) {
for (const cid of cids) {
console.log("getMoreReaders>>>", cid.toString());
await this.loadCar(cid, store);
}
console.log("getMoreReaders<<<");
}
}
22 changes: 21 additions & 1 deletion src/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,40 +100,58 @@ export class CRDTClockImpl {
// if (!(this.head && prevHead && newHead)) {
// throw new Error("missing head");
// }

console.log("int_applyHead:1")
const noLoader = !localUpdates;

// console.log("int_applyHead", this.applyHeadQueue.size(), this.head, newHead, prevHead, localUpdates);
const ogHead = sortClockHead(this.head);
console.log("int_applyHead:2")
newHead = sortClockHead(newHead);
console.log("int_applyHead:3")
if (compareClockHeads(ogHead, newHead)) {
console.log("int_applyHead:4")
return;
}
console.log("int_applyHead:5")
const ogPrev = sortClockHead(prevHead);
console.log("int_applyHead:6")
if (compareClockHeads(ogHead, ogPrev)) {
console.log("int_applyHead:7")
this.setHead(newHead);
return;
}

// const noLoader = this.head.length === 1 && !updates?.length
console.log("int_applyHead:8")
if (!this.blockstore) {
throw this.logger.Error().Msg("missing blockstore").AsError();
}
console.log("int_applyHead:9")
await validateBlocks(this.logger, newHead, this.blockstore);
console.log("int_applyHead:10")
if (!this.transaction) {
this.transaction = this.blockstore.openTransaction({ noLoader, add: false });
}
const tblocks = this.transaction;

console.log("int_applyHead:11")
const advancedHead = await advanceBlocks(this.logger, newHead, tblocks, this.head);
console.log("int_applyHead:12", tblocks, advancedHead)
const result = await root(tblocks, advancedHead);
console.log("int_applyHead:12.x", result.additions.length)
for (const { cid, bytes } of [
...result.additions,
// ...result.removals
]) {
console.log("int_applyHead:12.y", result.additions.length)
tblocks.putSync(cid, bytes);
}
console.log("int_applyHead:12.1")
if (!noLoader) {
console.log("int_applyHead:13")
await this.blockstore.commitTransaction(tblocks, { head: advancedHead }, { add: false, noLoader });
console.log("int_applyHead:14")
this.transaction = undefined;
}
this.setHead(advancedHead);
Expand Down Expand Up @@ -162,9 +180,11 @@ function compareClockHeads(head1: ClockHead, head2: ClockHead) {
async function advanceBlocks(logger: Logger, newHead: ClockHead, tblocks: CarTransaction, head: ClockHead) {
for (const cid of newHead) {
try {
console.log("advanceBlocks:1", cid.toString(), newHead.length)
head = await advance(tblocks, head, cid);
console.log("advanceBlocks:2", cid.toString(), head)
} catch (e) {
logger.Debug().Err(e).Msg("failed to advance head");
logger.Error().Err(e).Msg("failed to advance head");
// console.log('failed to advance head:', cid.toString(), e)
continue;
}
Expand Down
24 changes: 18 additions & 6 deletions tests/fireproof/attachable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ describe("join function", () => {
let db: Database;
let joinableDBs: string[] = [];
beforeAll(async () => {
const gdb = fireproof("gdb");
await gdb.put({ _id: `genesis`, value: `genesis` });
await gdb.close();
const set = Math.random().toString(16);
joinableDBs = await Promise.all(
new Array(1).fill(1).map(async (_, i) => {
Expand All @@ -53,10 +56,11 @@ describe("join function", () => {
car: `memory://car/${name}`,
meta: `memory://meta/${name}`,
file: `memory://file/${name}`,
wal: `memory://file/${name}`,
wal: `memory://wal/${name}`,
},
},
});
await db.put({ _id: `genesis`, value: `genesis` });
// await db.ready();
for (let j = 0; j < 10; j++) {
await db.put({ _id: `${i}-${j}`, value: `${i}-${set}` });
Expand All @@ -72,6 +76,7 @@ describe("join function", () => {
base: `memory://db-${set}`,
},
});
await db.put({ _id: `genesis`, value: `genesis` });
for (let j = 0; j < 10; j++) {
await db.put({ _id: `db-${j}`, value: `db-${set}` });
}
Expand All @@ -80,7 +85,7 @@ describe("join function", () => {
await db.close();
});

it.skip("it is joinable detachable", async () => {
it("it is joinable detachable", async () => {
const my = fireproof("my", {
storeUrls: {
base: "memory://my",
Expand All @@ -90,7 +95,12 @@ describe("join function", () => {
joinableDBs.map(async (name) => {
const tmp = fireproof(name, {
storeUrls: {
base: `memory://${name}`,
data: {
car: `memory://car/${name}`,
meta: `memory://meta/${name}`,
file: `memory://file/${name}`,
wal: `memory://wal/${name}`,
},
},
});
const res = await tmp.allDocs();
Expand All @@ -105,16 +115,18 @@ describe("join function", () => {
expect(my.ledger.crdt.blockstore.loader.attachedStores.remotes().length).toBe(0);
});

it.skip("it is inbound syncing", async () => {
it("it is inbound syncing", async () => {
console.log("joinableDBs-1", joinableDBs);
await Promise.all(
joinableDBs.map(async (name) => {
const attached = await db.attach(aJoinable(name));
expect(attached).toBeDefined();
}),
);
console.log("joinableDBs-2", joinableDBs);
expect(db.ledger.crdt.blockstore.loader.attachedStores.remotes().length).toBe(joinableDBs.length);
await new Promise((resolve) => setTimeout(resolve, 1000));
const res = await db.allDocs();
console.log("joinableDBs-3", joinableDBs);
expect(res.rows.length).toBe(100);
});
}, 1000000);
});

0 comments on commit b0d2a3c

Please sign in to comment.