Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replicated Growable Array CRDT and Tests #93

Merged
merged 8 commits into from
Aug 24, 2024
48 changes: 48 additions & 0 deletions packages/crdt/src/builtins/RGA/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# RGA (Replicated Growable Array) CRDT

This is an implementation of a Replicated Growable Array (RGA) Conflict-free Replicated Data Type (CRDT). RGA is a data structure that allows for concurrent editing of an array by multiple nodes, while automatically resolving most conflicts.

## Overview

The structure is designed to maintain a consistent state across multiple replicas, even when they receive operations in different orders. It achieves this by using unique identifiers for each element and a tombstone mechanism for deletions.

## Key Components

### RGAElement

Each element in the RGA is represented by an `RGAElement` object with the following properties:

- `vid`: A unique identifier for the element, consisting of a counter and a node ID.
- `value`: The actual value stored in the element (or null if the element is deleted).
- `parent`: The identifier of the element that precedes this one in the logical order.

### RGA Class

The `RGA` class manages the sequencer, which generates new identifiers, and the array of `RGAElement` objects providing methods for manipulating the array.

## Main Operations

1. **Insert**: Adds a new element at a specified index.
2. **Delete**: Marks an element as deleted (tombstone) at a specified index.
3. **Update**: Changes the value of an element at a specified index.
4. **Merge**: Combines the current RGA with another RGA, resolving conflicts automatically.
5. **getArray**: Serialises the current state of the RGA to an array of values.

## How It Works

1. **Unique Identifiers**: Each element has a unique identifier (`vid`) generated using a sequencer.

2. **Logical Ordering**: Elements are ordered based on their `parent` references and `vid` comparisons.

3. **Tombstones**: Deleted elements are not removed but marked as tombstones (null value).

4. **Conflict Resolution**:
- For concurrent inserts at the same position (same parent element), the element with the higher `vid` is placed first.
- If the parent elements are different, the elements are inserted in the order of their parent's index.
- Deletions are preserved due to the tombstone mechanism.

5. **Merging**: When merging two RGAs, elements from the peer RGA are inserted into the current RGA, maintaining the correct order and resolving the conflicts.

## Limitations

The RGA may not be able to resolve complex backward interleaving scenarios. The insertElement method primarily relies on the parent index and a simple comparison of virtual IDs (vids) to determine the insertion position. This may lead to conflicts in some edge cases.
163 changes: 163 additions & 0 deletions packages/crdt/src/builtins/RGA/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/// Replicable Growable Array (RGA) CRDT
type Identifier = { counter: number; nodeId: string };

class RGAElement<T> {
// Virtual identifier of the element
public vid: Identifier;
public value: T | null;
public parent: Identifier | null;

constructor(
vid: Identifier,
value: T | null,
parent: Identifier | null = null
) {
this.vid = vid;
/// If the value is null, the element is in the tombstone state
this.value = value;
this.parent = parent;
}
}

export class RGA<T> {
/// The sequencer is used to generate unique identifiers for each element
private _sequencer: Identifier;
/// For now we are using a simple array to store elements
/// This can be optimized using a Btree
private _elements: RGAElement<T>[];

constructor(
nodeId: string,
sequencer: Identifier = { counter: 0, nodeId: nodeId },
elements: RGAElement<T>[] = [
new RGAElement<T>({ counter: 0, nodeId: "" }, null),
]
) {
this._sequencer = sequencer;
this._elements = elements;
}

elements(): RGAElement<T>[] {
return this._elements;
}

getArray(): T[] {
return this._elements
.filter((element) => element.value !== null)
.map((element) => element.value! as T);
}

clear(): void {
this._sequencer = { counter: 0, nodeId: this._sequencer.nodeId };
this._elements = [new RGAElement<T>({ counter: 0, nodeId: "" }, null)];
}

private isTombstone(element: RGAElement<T>): boolean {
return element.value === null;
}

// Function to generate the next unique identifier
private nextSeq(sequencer: Identifier): Identifier {
return { counter: sequencer.counter + 1, nodeId: sequencer.nodeId };
}

// Check whether a < b, ids are never equal
private compareVIds(a: Identifier, b: Identifier): boolean {
if (a.counter !== b.counter) {
return a.counter < b.counter;
}
return a.nodeId < b.nodeId;
}

// Function to map a logical index (ignoring tombstones) to a physical index in the elements array
private indexWithTombstones(index: number): number {
let offset = 1; // Start from 1 to skip the head element
while (index > 0) {
if (!this.isTombstone(this._elements[offset])) index--;
offset++;
}
return offset;
}

// Function to read the value at a given index
read(index: number): T | null {
let i = this.indexWithTombstones(index);
while (this.isTombstone(this._elements[i])) i++;
return this._elements[i].value;
}

// Function to find the physical index of an element given the virtual id
private indexOfVId(ptr: Identifier): number {
for (let offset = 0; offset < this._elements.length; offset++) {
if (
ptr.counter === this._elements[offset].vid.counter &&
ptr.nodeId === this._elements[offset].vid.nodeId
) {
return offset;
}
}
throw new RangeError("Element not found");
}

insert(index: number, value: T): void {
const i = this.indexWithTombstones(index);
const parent = this._elements[i - 1].vid;
const newVId = this.nextSeq(this._sequencer);
this.insertElement(new RGAElement(newVId, value, parent));
}

// Function to insert a new element into the array
private insertElement(element: RGAElement<T>): void {
const parentIdx = this.indexOfVId(element.parent!);
let insertIdx = parentIdx + 1;
for (; insertIdx < this._elements.length; insertIdx++) {
let curr = this._elements[insertIdx];
let currParentIdx = this.indexOfVId(curr.parent!);
if (currParentIdx > parentIdx) break;
if (currParentIdx === parentIdx) {
if (this.compareVIds(curr.vid, element.vid)) break;
}
}
this._sequencer = {
counter: Math.max(this._sequencer.counter, element.vid.counter),
nodeId: this._sequencer.nodeId,
};
// Check if its a duplicate
if (
this._elements[insertIdx - 1].vid.counter === element.vid.counter &&
this._elements[insertIdx - 1].vid.nodeId === element.vid.nodeId
) {
return;
}
this._elements.splice(insertIdx, 0, element);
}

// Function to delete an element from the RGA
delete(index: number): void {
let i = this.indexWithTombstones(index);
while (this.isTombstone(this._elements[i])) i++;
this._elements[i].value = null;
}

// Function to update the value of an element
update(index: number, value: T): void {
let i = this.indexWithTombstones(index);
while (this.isTombstone(this._elements[i])) i++;
this._elements[i].value = value;
}

// Merge another RGA instance into this one
merge(peerRGA: RGA<T>): void {
for (let i = 1; i < peerRGA.elements().length; i++) {
this.insertElement(peerRGA.elements()[i]);
}

this._sequencer = {
counter: Math.max(
this._sequencer.counter,
peerRGA._sequencer.counter
),
nodeId: this._sequencer.nodeId,
};
}
}
91 changes: 91 additions & 0 deletions packages/crdt/tests/RGA.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, test, expect, beforeEach } from "vitest";
import { RGA } from "../src/builtins/RGA"; // Adjust the import path according to your project structure

describe("Replicable Growable Array Tests", () => {
let rga: RGA<string>;
let peerRGA: RGA<string>;

beforeEach(() => {
rga = new RGA<string>("node1");
peerRGA = new RGA<string>("node2");
});

test("Test Insert", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.insert(0, "D");

expect(rga.getArray()).toEqual(["D", "A", "C", "B"]);
});

test("Test Read", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(1);

expect(rga.read(0)).toBe("A");
expect(rga.read(1)).toBe("B");
});

test("Test Insert and Delete", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(0);
rga.delete(0);
expect(rga.getArray()).toEqual(["B"]);

rga.clear();

rga.insert(0, "A");
rga.insert(1, "B");
rga.delete(0);

expect(rga.getArray()).toEqual(["B"]);

rga.insert(0, "C");
rga.insert(1, "D");
expect(rga.getArray()).toEqual(["C", "D", "B"]);

rga.delete(1);
expect(rga.getArray()).toEqual(["C", "B"]);

rga.delete(1);
expect(rga.getArray()).toEqual(["C"]);

peerRGA.insert(0, "E");
peerRGA.insert(0, "F");
peerRGA.insert(2, "G");
peerRGA.insert(3, "H");
peerRGA.delete(1);
peerRGA.delete(1);
peerRGA.delete(1);
expect(peerRGA.getArray()).toEqual(["F"]);
});

test("Test Update", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.update(0, "C");
rga.update(1, "D");

expect(rga.getArray()).toEqual(["C", "D"]);
});

test("Test Merge", () => {
rga.insert(0, "A");
rga.insert(1, "B");

peerRGA.insert(0, "C");
peerRGA.insert(1, "D");
peerRGA.insert(0, "E");

rga.merge(peerRGA);
expect(rga.getArray()).toEqual(["E", "C", "A", "D", "B"]);

peerRGA.merge(rga);
expect(peerRGA.getArray()).toEqual(rga.getArray());
});
});