From ad62f8493634a9d5ab62e909525c5823da6c9531 Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 2 Aug 2024 20:06:24 +0200 Subject: [PATCH 1/8] RGA first implementation --- packages/crdt/src/builtins/RGA/index.ts | 158 ++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 packages/crdt/src/builtins/RGA/index.ts diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts new file mode 100644 index 00000000..1323efd3 --- /dev/null +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -0,0 +1,158 @@ +/// Replicable Growable Array (RGA) CRDT +type Identifier = { counter: number; nodeId: string }; + +class RGAElement { + public id: Identifier; + public value: T | null; + + constructor(id: Identifier, value: T | null) { + this.id = id; + /// If the value is null, the element is in the tombstone state + this.value = value; + } +} + +export class RGA { + /// The sequencer is used to generate unique identifiers for each element + private _sequencer: Identifier; + /// For now we are using a simple array to store elements + /// This can be optimized using a Btree + private _elements: RGAElement[]; + + constructor( + nodeId: string, + sequencer: Identifier = { counter: 0, nodeId: nodeId }, + elements: RGAElement[] = [ + new RGAElement({ counter: 0, nodeId: "" }, null), + ] + ) { + this._sequencer = sequencer; + this._elements = elements; + } + + elements(): RGAElement[] { + return this._elements; + } + + getElements(): T[] { + return this._elements + .filter((element) => element.value !== null) + .map((element) => element.value! as T); + } + + isTombstone(element: RGAElement): boolean { + return element.value === null; + } + + nextSeqNr(sequencer: Identifier): Identifier { + return { counter: sequencer.counter + 1, nodeId: sequencer.nodeId }; + } + + // Function to map a logical index (ignoring tombstones) to a physical index in the elements array + indexWithTombstones(index: number): number { + // Start from 1 to skip the head element + let offset = 1; + while (index > 0) { + if (!this.isTombstone(this._elements[offset])) index--; + offset++; + } + return offset; + } + + read(index: number): T | null { + const i = this.indexWithTombstones(index); + return this._elements[i].value; + } + + // Function to find the physical index of a vertex given its virtual pointer + indexOfVPtr(ptr: Identifier): number { + for (let offset = 0; offset < this._elements.length; offset++) { + if ( + ptr.counter === this._elements[offset].id.counter && + ptr.nodeId === this._elements[offset].id.nodeId + ) { + return offset; + } + } + throw new RangeError("Index not found"); + } + + // Function to find the correct insertion point for a new vertex + shift(offset: number, ptr: Identifier): number { + while (offset < this._elements.length) { + const next: Identifier = this._elements[offset].id; + if ( + next.counter < ptr.counter || + (next.counter === ptr.counter && next.nodeId < ptr.nodeId) + ) { + return offset; + } + offset++; + } + return offset; + } + + // Function to insert a new vertex in the graph + insert(index: number, value: T): void { + const i = this.indexWithTombstones(index); + const predecessor = this._elements[i - 1].id; + const ptr = this.nextSeqNr(this._sequencer); + + const predecessorIdx = this.indexOfVPtr(predecessor); + const insertIdx = this.shift(predecessorIdx + 1, ptr); + this._sequencer = { + counter: Math.max(this._sequencer.counter, ptr.counter), + nodeId: this._sequencer.nodeId, + }; + this._elements.splice(insertIdx, 0, new RGAElement(ptr, value)); + } + + // Function to delete a vertex from the graph + delete(index: number): void { + const i = this.indexWithTombstones(index); + const ptr = this._elements[i].id; + console.log("Ptr: ", ptr); + index = this.indexOfVPtr(ptr); + console.log("Index: ", index); + this._elements[index].value = null; + } + + // Function to update the value of a vertex + update(index: number, value: T): void { + const i = this.indexWithTombstones(index); + this._elements[i].value = value; + } + + // Merge another RGA instance into this one + merge(peerRGA: RGA): void { + const newVertices: RGAElement[] = []; + const combinedVertices = [...this._elements, ...peerRGA._elements]; + + // Sort the combined vertices by their Identifier + combinedVertices.sort((a, b) => { + if (a.id.counter === b.id.counter) { + return a.id.nodeId.localeCompare(b.id.nodeId); + } + return a.id.counter - b.id.counter; + }); + + // Deduplicate and merge the vertices + const seen: Set = new Set(); + for (const vertex of combinedVertices) { + const key = `${vertex.id.counter}_${vertex.id.nodeId}`; + if (!seen.has(key)) { + newVertices.push(vertex); + seen.add(key); + } else { + const existingIndex = newVertices.findIndex( + (v) => + v.id.counter === vertex.id.counter && + v.id.nodeId === vertex.id.nodeId + ); + if (existingIndex !== -1 && vertex.value === null) { + newVertices[existingIndex].value = null; // Ensure tombstone is applied + } + } + } + } +} From df44186b020d8c6a2f6750cddb92b621329c1ef6 Mon Sep 17 00:00:00 2001 From: Jan Date: Sat, 3 Aug 2024 16:44:14 +0200 Subject: [PATCH 2/8] fix: indexWithTombstones function --- packages/crdt/src/builtins/RGA/index.ts | 36 ++++++++++++++++--------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index 1323efd3..4863ec38 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -40,6 +40,13 @@ export class RGA { .map((element) => element.value! as T); } + clear(): void { + this._sequencer = { counter: 0, nodeId: this._sequencer.nodeId }; + this._elements = [ + new RGAElement({ counter: 0, nodeId: "" }, null), + ] + } + isTombstone(element: RGAElement): boolean { return element.value === null; } @@ -52,10 +59,14 @@ export class RGA { indexWithTombstones(index: number): number { // Start from 1 to skip the head element let offset = 1; - while (index > 0) { + while (index >= 0 && offset < this._elements.length) { + if(index === 0 && !this.isTombstone(this._elements[offset])) break; if (!this.isTombstone(this._elements[offset])) index--; offset++; } + if (index > 0) { + throw new RangeError("Index not found"); + } return offset; } @@ -111,9 +122,7 @@ export class RGA { delete(index: number): void { const i = this.indexWithTombstones(index); const ptr = this._elements[i].id; - console.log("Ptr: ", ptr); index = this.indexOfVPtr(ptr); - console.log("Index: ", index); this._elements[index].value = null; } @@ -126,19 +135,14 @@ export class RGA { // Merge another RGA instance into this one merge(peerRGA: RGA): void { const newVertices: RGAElement[] = []; - const combinedVertices = [...this._elements, ...peerRGA._elements]; - - // Sort the combined vertices by their Identifier - combinedVertices.sort((a, b) => { - if (a.id.counter === b.id.counter) { - return a.id.nodeId.localeCompare(b.id.nodeId); - } - return a.id.counter - b.id.counter; - }); + + for (let i = 1; i < peerRGA._elements.length; i++) { + this.insert(i, peerRGA._elements[i].value!); + } // Deduplicate and merge the vertices const seen: Set = new Set(); - for (const vertex of combinedVertices) { + for (const vertex of this._elements) { const key = `${vertex.id.counter}_${vertex.id.nodeId}`; if (!seen.has(key)) { newVertices.push(vertex); @@ -154,5 +158,11 @@ export class RGA { } } } + + this._elements = newVertices; + this._sequencer = { + counter: Math.max(this._sequencer.counter, peerRGA._sequencer.counter), + nodeId: this._sequencer.nodeId, + }; } } From ad3d057eff6e24d24df02065ed761e17f8084a51 Mon Sep 17 00:00:00 2001 From: Jan Date: Sat, 3 Aug 2024 16:45:16 +0200 Subject: [PATCH 3/8] feat: added tests for RGA --- packages/crdt/tests/RGA.test.ts | 114 ++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 packages/crdt/tests/RGA.test.ts diff --git a/packages/crdt/tests/RGA.test.ts b/packages/crdt/tests/RGA.test.ts new file mode 100644 index 00000000..43b42339 --- /dev/null +++ b/packages/crdt/tests/RGA.test.ts @@ -0,0 +1,114 @@ +import { describe, test, expect, beforeEach } from "vitest"; +import { RGA } from "../src/builtins/RGA"; // Adjust the import path according to your project structure + +describe("Replicable Growable Array Tests", () => { + let rga: RGA; + let peerRGA: RGA; + + beforeEach(() => { + rga = new RGA("node1"); + peerRGA = new RGA("node2"); + }); + + test("Test Insert", () => { + rga.insert(0, "A"); + rga.insert(1, "B"); + rga.insert(1, "C"); + rga.insert(0, "D"); + + expect(rga.getElements()).toEqual(["D", "A", "C", "B"]); + }); + + test("Test Read", () => { + rga.insert(0, "A"); + rga.insert(1, "B"); + rga.insert(1, "C"); + rga.delete(1); + + expect(rga.read(0)).toBe("A"); + expect(rga.read(1)).toBe("B"); + }); + + test("Test Insert and Delete", () => { + rga.insert(0, "A"); + rga.insert(1, "B"); + rga.insert(1, "C"); + rga.delete(0); + rga.delete(0); + expect(rga.getElements()).toEqual(["B"]); + + rga.clear(); + + rga.insert(0, "A"); + rga.insert(1, "B"); + rga.delete(0); + + expect(rga.getElements()).toEqual(["B"]); + + rga.insert(0, "C"); + rga.insert(1, "D"); + expect(rga.getElements()).toEqual(["C", "D", "B"]); + + rga.delete(1); + expect(rga.getElements()).toEqual(["C", "B"]); + + rga.delete(1); + expect(rga.getElements()).toEqual(["C"]); + + peerRGA.insert(0, "E"); + peerRGA.insert(0, "F"); + peerRGA.insert(2, "G"); + peerRGA.insert(3, "H"); + peerRGA.delete(1); + peerRGA.delete(1); + peerRGA.delete(1); + expect(peerRGA.getElements()).toEqual(["F"]); + }); + + test("Test Update", () => { + rga.insert(0, "A"); + rga.insert(1, "B"); + rga.update(0, "C"); + rga.update(1, "D"); + + expect(rga.getElements()).toEqual(["C", "D"]); + }); + + test("Test Merge Order", () => { + rga.insert(0, "A"); + rga.insert(1, "B"); + + peerRGA.insert(0, "C"); + peerRGA.insert(1, "D"); + + rga.merge(peerRGA); + + expect(rga.getElements()).toEqual(["A", "C", "B", "D"]); + }); + + test("Test Merge with Delete", () => { + rga.insert(0, 'A1'); + peerRGA.insert(0, 'B1'); + + // Sync both replicas, both should be ["A1", "B1"] + rga.merge(peerRGA); + peerRGA.merge(rga); + + + // console.log(rga.elements()); + // console.log(peerRGA.elements()); + rga.insert(1, 'A2'); + peerRGA.delete(1); + // console.log(rga.elements()); + // console.log(peerRGA.elements()); + + expect(rga.getElements()).toEqual(['A1', 'A2', 'B1']); + expect(peerRGA.getElements()).toEqual(['A1']); + + rga.merge(peerRGA); + peerRGA.merge(rga); + + expect(rga.getElements()).toEqual(peerRGA.getElements()); + }); + +}); From 715071ee3c6307f24633885b846cffe2d773a9da Mon Sep 17 00:00:00 2001 From: Jan Date: Mon, 5 Aug 2024 12:22:05 +0200 Subject: [PATCH 4/8] fix: consistent naming, function scope addressed --- packages/crdt/src/builtins/RGA/index.ts | 138 ++++++++++++------------ packages/crdt/tests/RGA.test.ts | 35 +++--- 2 files changed, 85 insertions(+), 88 deletions(-) diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index 4863ec38..90938728 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -2,11 +2,12 @@ type Identifier = { counter: number; nodeId: string }; class RGAElement { - public id: Identifier; + // Virtual identifier of the element + public vid: Identifier; public value: T | null; - constructor(id: Identifier, value: T | null) { - this.id = id; + constructor(vid: Identifier, value: T | null) { + this.vid = vid; /// If the value is null, the element is in the tombstone state this.value = value; } @@ -34,7 +35,7 @@ export class RGA { return this._elements; } - getElements(): T[] { + getArray(): T[] { return this._elements .filter((element) => element.value !== null) .map((element) => element.value! as T); @@ -42,45 +43,49 @@ export class RGA { clear(): void { this._sequencer = { counter: 0, nodeId: this._sequencer.nodeId }; - this._elements = [ - new RGAElement({ counter: 0, nodeId: "" }, null), - ] + this._elements = [new RGAElement({ counter: 0, nodeId: "" }, null)]; } - isTombstone(element: RGAElement): boolean { + private isTombstone(element: RGAElement): boolean { return element.value === null; } - nextSeqNr(sequencer: Identifier): Identifier { + // Function to generate the next unique identifier + private nextSeq(sequencer: Identifier): Identifier { return { counter: sequencer.counter + 1, nodeId: sequencer.nodeId }; } + // Check whether a < b, ids are never equal + private compareVIds(a: Identifier, b: Identifier): boolean { + if (a.counter !== b.counter) { + return a.counter < b.counter; + } + return a.nodeId < b.nodeId; + } + // Function to map a logical index (ignoring tombstones) to a physical index in the elements array - indexWithTombstones(index: number): number { - // Start from 1 to skip the head element - let offset = 1; - while (index >= 0 && offset < this._elements.length) { - if(index === 0 && !this.isTombstone(this._elements[offset])) break; + private indexWithTombstones(index: number): number { + let offset = 1; // Start from 1 to skip the head element + while (index > 0) { if (!this.isTombstone(this._elements[offset])) index--; offset++; } - if (index > 0) { - throw new RangeError("Index not found"); - } return offset; } + // Function to read the value at a given index read(index: number): T | null { - const i = this.indexWithTombstones(index); + let i = this.indexWithTombstones(index); + while (this.isTombstone(this._elements[i])) i++; return this._elements[i].value; } - // Function to find the physical index of a vertex given its virtual pointer - indexOfVPtr(ptr: Identifier): number { + // Function to find the physical index of an element given the virtual id + private indexOfVId(ptr: Identifier): number { for (let offset = 0; offset < this._elements.length; offset++) { if ( - ptr.counter === this._elements[offset].id.counter && - ptr.nodeId === this._elements[offset].id.nodeId + ptr.counter === this._elements[offset].vid.counter && + ptr.nodeId === this._elements[offset].vid.nodeId ) { return offset; } @@ -88,80 +93,73 @@ export class RGA { throw new RangeError("Index not found"); } - // Function to find the correct insertion point for a new vertex - shift(offset: number, ptr: Identifier): number { + // Function to find the correct insertion point for a new element + private shift(offset: number, id: Identifier): number { while (offset < this._elements.length) { - const next: Identifier = this._elements[offset].id; - if ( - next.counter < ptr.counter || - (next.counter === ptr.counter && next.nodeId < ptr.nodeId) - ) { - return offset; - } + const next: Identifier = this._elements[offset].vid; + if (this.compareVIds(next, id)) return offset; offset++; } return offset; } - // Function to insert a new vertex in the graph insert(index: number, value: T): void { const i = this.indexWithTombstones(index); - const predecessor = this._elements[i - 1].id; - const ptr = this.nextSeqNr(this._sequencer); + const predecessor = this._elements[i - 1].vid; + const newVId = this.nextSeq(this._sequencer); + this.insertElement(predecessor, new RGAElement(newVId, value)); + } - const predecessorIdx = this.indexOfVPtr(predecessor); - const insertIdx = this.shift(predecessorIdx + 1, ptr); + // Function to insert a new element into the array + private insertElement( + predecessor: Identifier, + element: RGAElement + ): void { + const predecessorIdx = this.indexOfVId(predecessor); + const insertIdx = this.shift(predecessorIdx + 1, element.vid); this._sequencer = { - counter: Math.max(this._sequencer.counter, ptr.counter), + counter: Math.max(this._sequencer.counter, element.vid.counter), nodeId: this._sequencer.nodeId, }; - this._elements.splice(insertIdx, 0, new RGAElement(ptr, value)); + // Check if its a duplicate + if ( + insertIdx < this._elements.length && + this._elements[insertIdx].vid.counter === element.vid.counter && + this._elements[insertIdx].vid.nodeId === element.vid.nodeId + ) { + return; + } + this._elements.splice(insertIdx, 0, element); } - // Function to delete a vertex from the graph + // Function to delete an element from the RGA delete(index: number): void { - const i = this.indexWithTombstones(index); - const ptr = this._elements[i].id; - index = this.indexOfVPtr(ptr); - this._elements[index].value = null; + let i = this.indexWithTombstones(index); + while (this.isTombstone(this._elements[i])) i++; + this._elements[i].value = null; } - // Function to update the value of a vertex + // Function to update the value of an element update(index: number, value: T): void { - const i = this.indexWithTombstones(index); + let i = this.indexWithTombstones(index); + while (this.isTombstone(this._elements[i])) i++; this._elements[i].value = value; } // Merge another RGA instance into this one merge(peerRGA: RGA): void { - const newVertices: RGAElement[] = []; - - for (let i = 1; i < peerRGA._elements.length; i++) { - this.insert(i, peerRGA._elements[i].value!); - } - - // Deduplicate and merge the vertices - const seen: Set = new Set(); - for (const vertex of this._elements) { - const key = `${vertex.id.counter}_${vertex.id.nodeId}`; - if (!seen.has(key)) { - newVertices.push(vertex); - seen.add(key); - } else { - const existingIndex = newVertices.findIndex( - (v) => - v.id.counter === vertex.id.counter && - v.id.nodeId === vertex.id.nodeId - ); - if (existingIndex !== -1 && vertex.value === null) { - newVertices[existingIndex].value = null; // Ensure tombstone is applied - } - } + for (let i = 1; i < peerRGA.elements().length; i++) { + this.insertElement( + peerRGA.elements()[i - 1].vid, + peerRGA.elements()[i] + ); } - this._elements = newVertices; this._sequencer = { - counter: Math.max(this._sequencer.counter, peerRGA._sequencer.counter), + counter: Math.max( + this._sequencer.counter, + peerRGA._sequencer.counter + ), nodeId: this._sequencer.nodeId, }; } diff --git a/packages/crdt/tests/RGA.test.ts b/packages/crdt/tests/RGA.test.ts index 43b42339..3afaf540 100644 --- a/packages/crdt/tests/RGA.test.ts +++ b/packages/crdt/tests/RGA.test.ts @@ -16,7 +16,7 @@ describe("Replicable Growable Array Tests", () => { rga.insert(1, "C"); rga.insert(0, "D"); - expect(rga.getElements()).toEqual(["D", "A", "C", "B"]); + expect(rga.getArray()).toEqual(["D", "A", "C", "B"]); }); test("Test Read", () => { @@ -35,7 +35,7 @@ describe("Replicable Growable Array Tests", () => { rga.insert(1, "C"); rga.delete(0); rga.delete(0); - expect(rga.getElements()).toEqual(["B"]); + expect(rga.getArray()).toEqual(["B"]); rga.clear(); @@ -43,17 +43,17 @@ describe("Replicable Growable Array Tests", () => { rga.insert(1, "B"); rga.delete(0); - expect(rga.getElements()).toEqual(["B"]); + expect(rga.getArray()).toEqual(["B"]); rga.insert(0, "C"); rga.insert(1, "D"); - expect(rga.getElements()).toEqual(["C", "D", "B"]); + expect(rga.getArray()).toEqual(["C", "D", "B"]); rga.delete(1); - expect(rga.getElements()).toEqual(["C", "B"]); + expect(rga.getArray()).toEqual(["C", "B"]); rga.delete(1); - expect(rga.getElements()).toEqual(["C"]); + expect(rga.getArray()).toEqual(["C"]); peerRGA.insert(0, "E"); peerRGA.insert(0, "F"); @@ -62,7 +62,7 @@ describe("Replicable Growable Array Tests", () => { peerRGA.delete(1); peerRGA.delete(1); peerRGA.delete(1); - expect(peerRGA.getElements()).toEqual(["F"]); + expect(peerRGA.getArray()).toEqual(["F"]); }); test("Test Update", () => { @@ -71,7 +71,7 @@ describe("Replicable Growable Array Tests", () => { rga.update(0, "C"); rga.update(1, "D"); - expect(rga.getElements()).toEqual(["C", "D"]); + expect(rga.getArray()).toEqual(["C", "D"]); }); test("Test Merge Order", () => { @@ -80,35 +80,34 @@ describe("Replicable Growable Array Tests", () => { peerRGA.insert(0, "C"); peerRGA.insert(1, "D"); + peerRGA.insert(2, "E"); rga.merge(peerRGA); - expect(rga.getElements()).toEqual(["A", "C", "B", "D"]); + expect(rga.getArray()).toEqual(["A", "C", "B", "D"]); }); test("Test Merge with Delete", () => { - rga.insert(0, 'A1'); - peerRGA.insert(0, 'B1'); + rga.insert(0, "A1"); + peerRGA.insert(0, "B1"); // Sync both replicas, both should be ["A1", "B1"] rga.merge(peerRGA); peerRGA.merge(rga); - - + // console.log(rga.elements()); // console.log(peerRGA.elements()); - rga.insert(1, 'A2'); + rga.insert(1, "A2"); peerRGA.delete(1); // console.log(rga.elements()); // console.log(peerRGA.elements()); - expect(rga.getElements()).toEqual(['A1', 'A2', 'B1']); - expect(peerRGA.getElements()).toEqual(['A1']); + expect(rga.getArray()).toEqual(["A1", "A2", "B1"]); + expect(peerRGA.getArray()).toEqual(["A1"]); rga.merge(peerRGA); peerRGA.merge(rga); - expect(rga.getElements()).toEqual(peerRGA.getElements()); + expect(rga.getArray()).toEqual(peerRGA.getArray()); }); - }); From 35831f606826222523c2757d4b1ff2b8ef32a201 Mon Sep 17 00:00:00 2001 From: Jan Date: Mon, 5 Aug 2024 22:48:50 +0200 Subject: [PATCH 5/8] fix: added a parent field to RGAElement, new insert and merge --- packages/crdt/src/builtins/RGA/index.ts | 52 ++++++++++++------------- packages/crdt/tests/RGA.test.ts | 30 ++------------ 2 files changed, 29 insertions(+), 53 deletions(-) diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index 90938728..e1b96c51 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -5,11 +5,17 @@ class RGAElement { // Virtual identifier of the element public vid: Identifier; public value: T | null; + public parent: Identifier | null; - constructor(vid: Identifier, value: T | null) { + constructor( + vid: Identifier, + value: T | null, + parent: Identifier | null = null + ) { this.vid = vid; /// If the value is null, the element is in the tombstone state this.value = value; + this.parent = parent; } } @@ -90,42 +96,37 @@ export class RGA { return offset; } } - throw new RangeError("Index not found"); - } - - // Function to find the correct insertion point for a new element - private shift(offset: number, id: Identifier): number { - while (offset < this._elements.length) { - const next: Identifier = this._elements[offset].vid; - if (this.compareVIds(next, id)) return offset; - offset++; - } - return offset; + throw new RangeError("Element not found"); } insert(index: number, value: T): void { const i = this.indexWithTombstones(index); - const predecessor = this._elements[i - 1].vid; + const parent = this._elements[i - 1].vid; const newVId = this.nextSeq(this._sequencer); - this.insertElement(predecessor, new RGAElement(newVId, value)); + this.insertElement(new RGAElement(newVId, value, parent)); } // Function to insert a new element into the array - private insertElement( - predecessor: Identifier, - element: RGAElement - ): void { - const predecessorIdx = this.indexOfVId(predecessor); - const insertIdx = this.shift(predecessorIdx + 1, element.vid); + private insertElement(element: RGAElement): void { + const parentIdx = this.indexOfVId(element.parent!); + let insertIdx = parentIdx + 1; + for (; insertIdx < this._elements.length; insertIdx++) { + let curr = this._elements[insertIdx]; + // if (element.vid.counter > curr.vid.counter) break; + let currParentIdx = this.indexOfVId(curr.parent!); + if (currParentIdx > parentIdx) break; + if (currParentIdx === parentIdx) { + if (this.compareVIds(curr.vid, element.vid)) break; + } + } this._sequencer = { counter: Math.max(this._sequencer.counter, element.vid.counter), nodeId: this._sequencer.nodeId, }; // Check if its a duplicate if ( - insertIdx < this._elements.length && - this._elements[insertIdx].vid.counter === element.vid.counter && - this._elements[insertIdx].vid.nodeId === element.vid.nodeId + this._elements[insertIdx - 1].vid.counter === element.vid.counter && + this._elements[insertIdx - 1].vid.nodeId === element.vid.nodeId ) { return; } @@ -149,10 +150,7 @@ export class RGA { // Merge another RGA instance into this one merge(peerRGA: RGA): void { for (let i = 1; i < peerRGA.elements().length; i++) { - this.insertElement( - peerRGA.elements()[i - 1].vid, - peerRGA.elements()[i] - ); + this.insertElement(peerRGA.elements()[i]); } this._sequencer = { diff --git a/packages/crdt/tests/RGA.test.ts b/packages/crdt/tests/RGA.test.ts index 3afaf540..5b2c70a8 100644 --- a/packages/crdt/tests/RGA.test.ts +++ b/packages/crdt/tests/RGA.test.ts @@ -74,40 +74,18 @@ describe("Replicable Growable Array Tests", () => { expect(rga.getArray()).toEqual(["C", "D"]); }); - test("Test Merge Order", () => { + test("Test Merge", () => { rga.insert(0, "A"); rga.insert(1, "B"); peerRGA.insert(0, "C"); peerRGA.insert(1, "D"); - peerRGA.insert(2, "E"); - - rga.merge(peerRGA); - - expect(rga.getArray()).toEqual(["A", "C", "B", "D"]); - }); - - test("Test Merge with Delete", () => { - rga.insert(0, "A1"); - peerRGA.insert(0, "B1"); + peerRGA.insert(0, "E"); - // Sync both replicas, both should be ["A1", "B1"] rga.merge(peerRGA); - peerRGA.merge(rga); - - // console.log(rga.elements()); - // console.log(peerRGA.elements()); - rga.insert(1, "A2"); - peerRGA.delete(1); - // console.log(rga.elements()); - // console.log(peerRGA.elements()); + expect(rga.getArray()).toEqual(["E", "C", "A", "D", "B"]); - expect(rga.getArray()).toEqual(["A1", "A2", "B1"]); - expect(peerRGA.getArray()).toEqual(["A1"]); - - rga.merge(peerRGA); peerRGA.merge(rga); - - expect(rga.getArray()).toEqual(peerRGA.getArray()); + expect(peerRGA.getArray()).toEqual(rga.getArray()); }); }); From 6bb2755ec669edeaf6b04075b5320a1656101ffa Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 6 Aug 2024 11:11:11 +0200 Subject: [PATCH 6/8] docs: added README, deleted commented line --- packages/crdt/src/builtins/RGA/README.md | 48 ++++++++++++++++++++++++ packages/crdt/src/builtins/RGA/index.ts | 1 - 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 packages/crdt/src/builtins/RGA/README.md diff --git a/packages/crdt/src/builtins/RGA/README.md b/packages/crdt/src/builtins/RGA/README.md new file mode 100644 index 00000000..9269fc66 --- /dev/null +++ b/packages/crdt/src/builtins/RGA/README.md @@ -0,0 +1,48 @@ +# RGA (Replicated Growable Array) CRDT + +This is an implementation of a Replicated Growable Array (RGA) Conflict-free Replicated Data Type (CRDT). RGA is a data structure that allows for concurrent editing of an array by multiple nodes, while automatically resolving most conflicts. + +## Overview + +The structure is designed to maintain a consistent state across multiple replicas, even when they receive operations in different orders. It achieves this by using unique identifiers for each element and a tombstone mechanism for deletions. + +## Key Components + +### RGAElement + +Each element in the RGA is represented by an `RGAElement` object with the following properties: + +- `vid`: A unique identifier for the element, consisting of a counter and a node ID. +- `value`: The actual value stored in the element (or null if the element is deleted). +- `parent`: The identifier of the element that precedes this one in the logical order. + +### RGA Class + +The `RGA` class manages the sequencer, which generates new identifiers, and the array of `RGAElement` objects providing methods for manipulating the array. + +## Main Operations + +1. **Insert**: Adds a new element at a specified index. +2. **Delete**: Marks an element as deleted (tombstone) at a specified index. +3. **Update**: Changes the value of an element at a specified index. +4. **Merge**: Combines the current RGA with another RGA, resolving conflicts automatically. +5. **getArray**: Serialises the current state of the RGA to an array of values. + +## How It Works + +1. **Unique Identifiers**: Each element has a unique identifier (`vid`) generated using a sequencer. + +2. **Logical Ordering**: Elements are ordered based on their `parent` references and `vid` comparisons. + +3. **Tombstones**: Deleted elements are not removed but marked as tombstones (null value). + +4. **Conflict Resolution**: + - For concurrent inserts at the same position (same parent element), the element with the higher `vid` is placed first. + - If the parent elements are different, the elements are inserted in the order of their parent's index. + - Deletions are preserved due to the tombstone mechanism. + +5. **Merging**: When merging two RGAs, elements from the peer RGA are inserted into the current RGA, maintaining the correct order and resolving the conflicts. + +## Limitations + +The RGA may not be able to resolve complex backward interleaving scenarios. The insertElement method primarily relies on the parent index and a simple comparison of virtual IDs (vids) to determine the insertion position. This may lead to conflicts in some edge cases. diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index e1b96c51..fcea8dfb 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -112,7 +112,6 @@ export class RGA { let insertIdx = parentIdx + 1; for (; insertIdx < this._elements.length; insertIdx++) { let curr = this._elements[insertIdx]; - // if (element.vid.counter > curr.vid.counter) break; let currParentIdx = this.indexOfVId(curr.parent!); if (currParentIdx > parentIdx) break; if (currParentIdx === parentIdx) { From ce3fe5f21aa976137a2aff7992a671a61507287e Mon Sep 17 00:00:00 2001 From: Jan Lewandowski Date: Fri, 23 Aug 2024 17:06:29 +0900 Subject: [PATCH 7/8] refactor: dropped private attributes, added isDeleted instead of null, explained the need for root and updated the README --- packages/crdt/src/builtins/RGA/README.md | 18 +++-- packages/crdt/src/builtins/RGA/index.ts | 98 ++++++++++++------------ 2 files changed, 59 insertions(+), 57 deletions(-) diff --git a/packages/crdt/src/builtins/RGA/README.md b/packages/crdt/src/builtins/RGA/README.md index 9269fc66..d746614e 100644 --- a/packages/crdt/src/builtins/RGA/README.md +++ b/packages/crdt/src/builtins/RGA/README.md @@ -12,9 +12,10 @@ The structure is designed to maintain a consistent state across multiple replica Each element in the RGA is represented by an `RGAElement` object with the following properties: -- `vid`: A unique identifier for the element, consisting of a counter and a node ID. -- `value`: The actual value stored in the element (or null if the element is deleted). -- `parent`: The identifier of the element that precedes this one in the logical order. +- `vid`: A unique identifier for the element, consisting of a counter and a node ID. +- `value`: The actual value stored in the element (or null if the element is deleted). +- `parent`: The identifier of the element that precedes this one in the logical order. +- `isDeleted`: A flag indicating whether the element has been deleted. ### RGA Class @@ -34,12 +35,13 @@ The `RGA` class manages the sequencer, which generates new identifiers, and the 2. **Logical Ordering**: Elements are ordered based on their `parent` references and `vid` comparisons. -3. **Tombstones**: Deleted elements are not removed but marked as tombstones (null value). +3. **Tombstones**: Deleted elements are not removed but marked as tombstones with the isDeleted property. -4. **Conflict Resolution**: - - For concurrent inserts at the same position (same parent element), the element with the higher `vid` is placed first. - - If the parent elements are different, the elements are inserted in the order of their parent's index. - - Deletions are preserved due to the tombstone mechanism. +4. **Conflict Resolution**: + + - For concurrent inserts at the same position (same parent element), the element with the higher `vid` is placed first. + - If the parent elements are different, the elements are inserted in the order of their parent's index. + - Deletions are preserved due to the tombstone mechanism. 5. **Merging**: When merging two RGAs, elements from the peer RGA are inserted into the current RGA, maintaining the correct order and resolving the conflicts. diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index fcea8dfb..89bae024 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -3,57 +3,57 @@ type Identifier = { counter: number; nodeId: string }; class RGAElement { // Virtual identifier of the element - public vid: Identifier; - public value: T | null; - public parent: Identifier | null; + vid: Identifier; + value: T | null; + parent: Identifier | null; + isDeleted: boolean; constructor( vid: Identifier, value: T | null, - parent: Identifier | null = null + parent: Identifier | null = null, + isDeleted: boolean = false ) { this.vid = vid; - /// If the value is null, the element is in the tombstone state this.value = value; this.parent = parent; + this.isDeleted = isDeleted; } } export class RGA { /// The sequencer is used to generate unique identifiers for each element - private _sequencer: Identifier; + sequencer: Identifier; /// For now we are using a simple array to store elements /// This can be optimized using a Btree - private _elements: RGAElement[]; + elements: RGAElement[]; + /* + We are using an empty element as the head of the array to simplify the logic of merging two RGA instances. + It acts as an anchor and is the same for all replicas. + */ constructor( nodeId: string, sequencer: Identifier = { counter: 0, nodeId: nodeId }, elements: RGAElement[] = [ - new RGAElement({ counter: 0, nodeId: "" }, null), + new RGAElement({ counter: 0, nodeId: "" }, null, null, true), ] ) { - this._sequencer = sequencer; - this._elements = elements; - } - - elements(): RGAElement[] { - return this._elements; + this.sequencer = sequencer; + this.elements = elements; } getArray(): T[] { - return this._elements - .filter((element) => element.value !== null) + return this.elements + .filter((element) => !element.isDeleted) .map((element) => element.value! as T); } clear(): void { - this._sequencer = { counter: 0, nodeId: this._sequencer.nodeId }; - this._elements = [new RGAElement({ counter: 0, nodeId: "" }, null)]; - } - - private isTombstone(element: RGAElement): boolean { - return element.value === null; + this.sequencer = { counter: 0, nodeId: this.sequencer.nodeId }; + this.elements = [ + new RGAElement({ counter: 0, nodeId: "" }, null, null, true), + ]; } // Function to generate the next unique identifier @@ -73,7 +73,7 @@ export class RGA { private indexWithTombstones(index: number): number { let offset = 1; // Start from 1 to skip the head element while (index > 0) { - if (!this.isTombstone(this._elements[offset])) index--; + if (!this.elements[offset].isDeleted) index--; offset++; } return offset; @@ -82,27 +82,27 @@ export class RGA { // Function to read the value at a given index read(index: number): T | null { let i = this.indexWithTombstones(index); - while (this.isTombstone(this._elements[i])) i++; - return this._elements[i].value; + while (this.elements[i].isDeleted) i++; + return this.elements[i].value; } // Function to find the physical index of an element given the virtual id private indexOfVId(ptr: Identifier): number { - for (let offset = 0; offset < this._elements.length; offset++) { + for (let offset = 0; offset < this.elements.length; offset++) { if ( - ptr.counter === this._elements[offset].vid.counter && - ptr.nodeId === this._elements[offset].vid.nodeId + ptr.counter === this.elements[offset].vid.counter && + ptr.nodeId === this.elements[offset].vid.nodeId ) { return offset; } } - throw new RangeError("Element not found"); + return -1; } insert(index: number, value: T): void { const i = this.indexWithTombstones(index); - const parent = this._elements[i - 1].vid; - const newVId = this.nextSeq(this._sequencer); + const parent = this.elements[i - 1].vid; + const newVId = this.nextSeq(this.sequencer); this.insertElement(new RGAElement(newVId, value, parent)); } @@ -110,54 +110,54 @@ export class RGA { private insertElement(element: RGAElement): void { const parentIdx = this.indexOfVId(element.parent!); let insertIdx = parentIdx + 1; - for (; insertIdx < this._elements.length; insertIdx++) { - let curr = this._elements[insertIdx]; + for (; insertIdx < this.elements.length; insertIdx++) { + let curr = this.elements[insertIdx]; let currParentIdx = this.indexOfVId(curr.parent!); if (currParentIdx > parentIdx) break; if (currParentIdx === parentIdx) { if (this.compareVIds(curr.vid, element.vid)) break; } } - this._sequencer = { - counter: Math.max(this._sequencer.counter, element.vid.counter), - nodeId: this._sequencer.nodeId, + this.sequencer = { + ...this.sequencer, + counter: Math.max(this.sequencer.counter, element.vid.counter), }; // Check if its a duplicate if ( - this._elements[insertIdx - 1].vid.counter === element.vid.counter && - this._elements[insertIdx - 1].vid.nodeId === element.vid.nodeId + this.elements[insertIdx - 1].vid.counter === element.vid.counter && + this.elements[insertIdx - 1].vid.nodeId === element.vid.nodeId ) { return; } - this._elements.splice(insertIdx, 0, element); + this.elements.splice(insertIdx, 0, element); } // Function to delete an element from the RGA delete(index: number): void { let i = this.indexWithTombstones(index); - while (this.isTombstone(this._elements[i])) i++; - this._elements[i].value = null; + while (this.elements[i].isDeleted) i++; + this.elements[i].isDeleted = true; } // Function to update the value of an element update(index: number, value: T): void { let i = this.indexWithTombstones(index); - while (this.isTombstone(this._elements[i])) i++; - this._elements[i].value = value; + while (this.elements[i].isDeleted) i++; + this.elements[i].value = value; } // Merge another RGA instance into this one merge(peerRGA: RGA): void { - for (let i = 1; i < peerRGA.elements().length; i++) { - this.insertElement(peerRGA.elements()[i]); + for (let i = 1; i < peerRGA.elements.length; i++) { + this.insertElement(peerRGA.elements[i]); } - this._sequencer = { + this.sequencer = { + ...this.sequencer, counter: Math.max( - this._sequencer.counter, - peerRGA._sequencer.counter + this.sequencer.counter, + peerRGA.sequencer.counter ), - nodeId: this._sequencer.nodeId, }; } } From 75f5099f22fa84433de83738af9f9b9c8ccfeb6f Mon Sep 17 00:00:00 2001 From: Jan Lewandowski Date: Fri, 23 Aug 2024 18:14:56 +0900 Subject: [PATCH 8/8] refactor: insert argument name changed, parent is not null by default --- packages/crdt/src/builtins/RGA/index.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/crdt/src/builtins/RGA/index.ts b/packages/crdt/src/builtins/RGA/index.ts index 89bae024..5e812877 100644 --- a/packages/crdt/src/builtins/RGA/index.ts +++ b/packages/crdt/src/builtins/RGA/index.ts @@ -11,7 +11,7 @@ class RGAElement { constructor( vid: Identifier, value: T | null, - parent: Identifier | null = null, + parent: Identifier | null, isDeleted: boolean = false ) { this.vid = vid; @@ -28,7 +28,7 @@ export class RGA { /// This can be optimized using a Btree elements: RGAElement[]; - /* + /* We are using an empty element as the head of the array to simplify the logic of merging two RGA instances. It acts as an anchor and is the same for all replicas. */ @@ -99,8 +99,9 @@ export class RGA { return -1; } - insert(index: number, value: T): void { - const i = this.indexWithTombstones(index); + // Function to insert a new element after a given index, might not be immidiately after becuase we look at parents + insert(parentIndex: number, value: T): void { + const i = this.indexWithTombstones(parentIndex); const parent = this.elements[i - 1].vid; const newVId = this.nextSeq(this.sequencer); this.insertElement(new RGAElement(newVId, value, parent));