Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replicated Growable Array CRDT and Tests #93

Merged
merged 8 commits into from
Aug 24, 2024
168 changes: 168 additions & 0 deletions packages/crdt/src/builtins/RGA/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/// Replicable Growable Array (RGA) CRDT
type Identifier = { counter: number; nodeId: string };

class RGAElement<T> {
public id: Identifier;
public value: T | null;

constructor(id: Identifier, value: T | null) {
this.id = id;
/// If the value is null, the element is in the tombstone state
this.value = value;
}
}

export class RGA<T> {
/// The sequencer is used to generate unique identifiers for each element
private _sequencer: Identifier;
/// For now we are using a simple array to store elements
/// This can be optimized using a Btree
private _elements: RGAElement<T>[];

constructor(
nodeId: string,
sequencer: Identifier = { counter: 0, nodeId: nodeId },
elements: RGAElement<T>[] = [
new RGAElement<T>({ counter: 0, nodeId: "" }, null),
]
) {
this._sequencer = sequencer;
this._elements = elements;
}

elements(): RGAElement<T>[] {
return this._elements;
}

getElements(): T[] {
return this._elements
.filter((element) => element.value !== null)
.map((element) => element.value! as T);
}

clear(): void {
this._sequencer = { counter: 0, nodeId: this._sequencer.nodeId };
this._elements = [
new RGAElement<T>({ counter: 0, nodeId: "" }, null),
]
}

isTombstone(element: RGAElement<T>): boolean {
return element.value === null;
}

nextSeqNr(sequencer: Identifier): Identifier {
return { counter: sequencer.counter + 1, nodeId: sequencer.nodeId };
}

// Function to map a logical index (ignoring tombstones) to a physical index in the elements array
indexWithTombstones(index: number): number {
// Start from 1 to skip the head element
let offset = 1;
while (index >= 0 && offset < this._elements.length) {
if(index === 0 && !this.isTombstone(this._elements[offset])) break;
if (!this.isTombstone(this._elements[offset])) index--;
offset++;
}
if (index > 0) {
throw new RangeError("Index not found");
}
return offset;
}

read(index: number): T | null {
const i = this.indexWithTombstones(index);
return this._elements[i].value;
}

// Function to find the physical index of a vertex given its virtual pointer
indexOfVPtr(ptr: Identifier): number {
for (let offset = 0; offset < this._elements.length; offset++) {
if (
ptr.counter === this._elements[offset].id.counter &&
ptr.nodeId === this._elements[offset].id.nodeId
) {
return offset;
}
}
throw new RangeError("Index not found");
}

// Function to find the correct insertion point for a new vertex
shift(offset: number, ptr: Identifier): number {
while (offset < this._elements.length) {
const next: Identifier = this._elements[offset].id;
if (
next.counter < ptr.counter ||
(next.counter === ptr.counter && next.nodeId < ptr.nodeId)
) {
return offset;
}
offset++;
}
return offset;
}

// Function to insert a new vertex in the graph
insert(index: number, value: T): void {
const i = this.indexWithTombstones(index);
const predecessor = this._elements[i - 1].id;
const ptr = this.nextSeqNr(this._sequencer);

const predecessorIdx = this.indexOfVPtr(predecessor);
const insertIdx = this.shift(predecessorIdx + 1, ptr);
this._sequencer = {
counter: Math.max(this._sequencer.counter, ptr.counter),
nodeId: this._sequencer.nodeId,
};
this._elements.splice(insertIdx, 0, new RGAElement(ptr, value));
}

// Function to delete a vertex from the graph
delete(index: number): void {
const i = this.indexWithTombstones(index);
const ptr = this._elements[i].id;
index = this.indexOfVPtr(ptr);
this._elements[index].value = null;
}

// Function to update the value of a vertex
update(index: number, value: T): void {
const i = this.indexWithTombstones(index);
this._elements[i].value = value;
}

// Merge another RGA instance into this one
merge(peerRGA: RGA<T>): void {
const newVertices: RGAElement<T>[] = [];

for (let i = 1; i < peerRGA._elements.length; i++) {
this.insert(i, peerRGA._elements[i].value!);
}

// Deduplicate and merge the vertices
const seen: Set<string> = new Set();
for (const vertex of this._elements) {
const key = `${vertex.id.counter}_${vertex.id.nodeId}`;
if (!seen.has(key)) {
newVertices.push(vertex);
seen.add(key);
} else {
const existingIndex = newVertices.findIndex(
(v) =>
v.id.counter === vertex.id.counter &&
v.id.nodeId === vertex.id.nodeId
);
if (existingIndex !== -1 && vertex.value === null) {
newVertices[existingIndex].value = null; // Ensure tombstone is applied
}
}
}

this._elements = newVertices;
this._sequencer = {
counter: Math.max(this._sequencer.counter, peerRGA._sequencer.counter),
nodeId: this._sequencer.nodeId,
};
}
}
114 changes: 114 additions & 0 deletions packages/crdt/tests/RGA.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { describe, test, expect, beforeEach } from "vitest";
import { RGA } from "../src/builtins/RGA"; // Adjust the import path according to your project structure

describe("Replicable Growable Array Tests", () => {
let rga: RGA<string>;
let peerRGA: RGA<string>;

beforeEach(() => {
rga = new RGA<string>("node1");
peerRGA = new RGA<string>("node2");
});

test("Test Insert", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.insert(0, "D");

expect(rga.getElements()).toEqual(["D", "A", "C", "B"]);
});

test("Test Read", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(1);

expect(rga.read(0)).toBe("A");
expect(rga.read(1)).toBe("B");
});

test("Test Insert and Delete", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(0);
rga.delete(0);
expect(rga.getElements()).toEqual(["B"]);

rga.clear();

rga.insert(0, "A");
rga.insert(1, "B");
rga.delete(0);

expect(rga.getElements()).toEqual(["B"]);

rga.insert(0, "C");
rga.insert(1, "D");
expect(rga.getElements()).toEqual(["C", "D", "B"]);

rga.delete(1);
expect(rga.getElements()).toEqual(["C", "B"]);

rga.delete(1);
expect(rga.getElements()).toEqual(["C"]);

peerRGA.insert(0, "E");
peerRGA.insert(0, "F");
peerRGA.insert(2, "G");
peerRGA.insert(3, "H");
peerRGA.delete(1);
peerRGA.delete(1);
peerRGA.delete(1);
expect(peerRGA.getElements()).toEqual(["F"]);
});

test("Test Update", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.update(0, "C");
rga.update(1, "D");

expect(rga.getElements()).toEqual(["C", "D"]);
});

test("Test Merge Order", () => {
rga.insert(0, "A");
rga.insert(1, "B");

peerRGA.insert(0, "C");
peerRGA.insert(1, "D");

rga.merge(peerRGA);

expect(rga.getElements()).toEqual(["A", "C", "B", "D"]);

Check failure on line 86 in packages/crdt/tests/RGA.test.ts

View workflow job for this annotation

GitHub Actions / tests

packages/crdt/tests/RGA.test.ts > Replicable Growable Array Tests > Test Merge Order

AssertionError: expected [ 'A', 'C', 'D', 'B' ] to deeply equal [ 'A', 'C', 'B', 'D' ] - Expected + Received Array [ "A", "C", - "B", "D", + "B", ] ❯ packages/crdt/tests/RGA.test.ts:86:35
});

test("Test Merge with Delete", () => {
rga.insert(0, 'A1');
peerRGA.insert(0, 'B1');

// Sync both replicas, both should be ["A1", "B1"]
rga.merge(peerRGA);
peerRGA.merge(rga);


// console.log(rga.elements());
// console.log(peerRGA.elements());
rga.insert(1, 'A2');
peerRGA.delete(1);
// console.log(rga.elements());
// console.log(peerRGA.elements());

expect(rga.getElements()).toEqual(['A1', 'A2', 'B1']);
expect(peerRGA.getElements()).toEqual(['A1']);

Check failure on line 106 in packages/crdt/tests/RGA.test.ts

View workflow job for this annotation

GitHub Actions / tests

packages/crdt/tests/RGA.test.ts > Replicable Growable Array Tests > Test Merge with Delete

AssertionError: expected [ 'B1', 'B1' ] to deeply equal [ 'A1' ] - Expected + Received Array [ - "A1", + "B1", + "B1", ] ❯ packages/crdt/tests/RGA.test.ts:106:39

rga.merge(peerRGA);
peerRGA.merge(rga);

expect(rga.getElements()).toEqual(peerRGA.getElements());
});

});
Loading