Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replicated Growable Array CRDT and Tests #93

Merged
merged 8 commits into from
Aug 24, 2024
50 changes: 50 additions & 0 deletions packages/crdt/src/builtins/RGA/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# RGA (Replicated Growable Array) CRDT

This is an implementation of a Replicated Growable Array (RGA) Conflict-free Replicated Data Type (CRDT). RGA is a data structure that allows for concurrent editing of an array by multiple nodes, while automatically resolving most conflicts.

## Overview

The structure is designed to maintain a consistent state across multiple replicas, even when they receive operations in different orders. It achieves this by using unique identifiers for each element and a tombstone mechanism for deletions.

## Key Components

### RGAElement

Each element in the RGA is represented by an `RGAElement` object with the following properties:

- `vid`: A unique identifier for the element, consisting of a counter and a node ID.
- `value`: The actual value stored in the element (or null if the element is deleted).
- `parent`: The identifier of the element that precedes this one in the logical order.
- `isDeleted`: A flag indicating whether the element has been deleted.

### RGA Class

The `RGA` class manages the sequencer, which generates new identifiers, and the array of `RGAElement` objects providing methods for manipulating the array.

## Main Operations

1. **Insert**: Adds a new element at a specified index.
2. **Delete**: Marks an element as deleted (tombstone) at a specified index.
3. **Update**: Changes the value of an element at a specified index.
4. **Merge**: Combines the current RGA with another RGA, resolving conflicts automatically.
5. **getArray**: Serialises the current state of the RGA to an array of values.

## How It Works

1. **Unique Identifiers**: Each element has a unique identifier (`vid`) generated using a sequencer.

2. **Logical Ordering**: Elements are ordered based on their `parent` references and `vid` comparisons.

3. **Tombstones**: Deleted elements are not removed but marked as tombstones with the isDeleted property.

4. **Conflict Resolution**:

- For concurrent inserts at the same position (same parent element), the element with the higher `vid` is placed first.
- If the parent elements are different, the elements are inserted in the order of their parent's index.
- Deletions are preserved due to the tombstone mechanism.

5. **Merging**: When merging two RGAs, elements from the peer RGA are inserted into the current RGA, maintaining the correct order and resolving the conflicts.

## Limitations

The RGA may not be able to resolve complex backward interleaving scenarios. The insertElement method primarily relies on the parent index and a simple comparison of virtual IDs (vids) to determine the insertion position. This may lead to conflicts in some edge cases.
164 changes: 164 additions & 0 deletions packages/crdt/src/builtins/RGA/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/// Replicable Growable Array (RGA) CRDT
type Identifier = { counter: number; nodeId: string };

class RGAElement<T> {
// Virtual identifier of the element
vid: Identifier;
value: T | null;
parent: Identifier | null;
isDeleted: boolean;

constructor(
vid: Identifier,
value: T | null,
parent: Identifier | null,
isDeleted: boolean = false
) {
this.vid = vid;
this.value = value;
this.parent = parent;
this.isDeleted = isDeleted;
}
}

export class RGA<T> {
/// The sequencer is used to generate unique identifiers for each element
sequencer: Identifier;
/// For now we are using a simple array to store elements
/// This can be optimized using a Btree
elements: RGAElement<T>[];

/*
We are using an empty element as the head of the array to simplify the logic of merging two RGA instances.
It acts as an anchor and is the same for all replicas.
*/
constructor(
nodeId: string,
sequencer: Identifier = { counter: 0, nodeId: nodeId },
elements: RGAElement<T>[] = [
new RGAElement<T>({ counter: 0, nodeId: "" }, null, null, true),
]
) {
this.sequencer = sequencer;
this.elements = elements;
}

getArray(): T[] {
return this.elements
.filter((element) => !element.isDeleted)
.map((element) => element.value! as T);
}

clear(): void {
this.sequencer = { counter: 0, nodeId: this.sequencer.nodeId };
this.elements = [
new RGAElement<T>({ counter: 0, nodeId: "" }, null, null, true),
];
}

// Function to generate the next unique identifier
private nextSeq(sequencer: Identifier): Identifier {
return { counter: sequencer.counter + 1, nodeId: sequencer.nodeId };
}

// Check whether a < b, ids are never equal
private compareVIds(a: Identifier, b: Identifier): boolean {
if (a.counter !== b.counter) {
return a.counter < b.counter;
}
return a.nodeId < b.nodeId;
}

// Function to map a logical index (ignoring tombstones) to a physical index in the elements array
private indexWithTombstones(index: number): number {
let offset = 1; // Start from 1 to skip the head element
while (index > 0) {
if (!this.elements[offset].isDeleted) index--;
offset++;
}
return offset;
}

// Function to read the value at a given index
read(index: number): T | null {
let i = this.indexWithTombstones(index);
while (this.elements[i].isDeleted) i++;
return this.elements[i].value;
}

// Function to find the physical index of an element given the virtual id
private indexOfVId(ptr: Identifier): number {
for (let offset = 0; offset < this.elements.length; offset++) {
if (
ptr.counter === this.elements[offset].vid.counter &&
ptr.nodeId === this.elements[offset].vid.nodeId
) {
return offset;
}
}
return -1;
}

// Function to insert a new element after a given index, might not be immidiately after becuase we look at parents
insert(parentIndex: number, value: T): void {
const i = this.indexWithTombstones(parentIndex);
const parent = this.elements[i - 1].vid;
const newVId = this.nextSeq(this.sequencer);
this.insertElement(new RGAElement(newVId, value, parent));
}

// Function to insert a new element into the array
private insertElement(element: RGAElement<T>): void {
const parentIdx = this.indexOfVId(element.parent!);
let insertIdx = parentIdx + 1;
for (; insertIdx < this.elements.length; insertIdx++) {
let curr = this.elements[insertIdx];
let currParentIdx = this.indexOfVId(curr.parent!);
if (currParentIdx > parentIdx) break;
if (currParentIdx === parentIdx) {
if (this.compareVIds(curr.vid, element.vid)) break;
}
}
this.sequencer = {
...this.sequencer,
counter: Math.max(this.sequencer.counter, element.vid.counter),
};
// Check if its a duplicate
if (
this.elements[insertIdx - 1].vid.counter === element.vid.counter &&
this.elements[insertIdx - 1].vid.nodeId === element.vid.nodeId
) {
return;
}
this.elements.splice(insertIdx, 0, element);
}

// Function to delete an element from the RGA
delete(index: number): void {
let i = this.indexWithTombstones(index);
while (this.elements[i].isDeleted) i++;
this.elements[i].isDeleted = true;
}

// Function to update the value of an element
update(index: number, value: T): void {
let i = this.indexWithTombstones(index);
while (this.elements[i].isDeleted) i++;
this.elements[i].value = value;
}

// Merge another RGA instance into this one
merge(peerRGA: RGA<T>): void {
for (let i = 1; i < peerRGA.elements.length; i++) {
this.insertElement(peerRGA.elements[i]);
}

this.sequencer = {
...this.sequencer,
counter: Math.max(
this.sequencer.counter,
peerRGA.sequencer.counter
),
};
}
}
91 changes: 91 additions & 0 deletions packages/crdt/tests/RGA.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, test, expect, beforeEach } from "vitest";
import { RGA } from "../src/builtins/RGA"; // Adjust the import path according to your project structure

describe("Replicable Growable Array Tests", () => {
let rga: RGA<string>;
let peerRGA: RGA<string>;

beforeEach(() => {
rga = new RGA<string>("node1");
peerRGA = new RGA<string>("node2");
});

test("Test Insert", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.insert(0, "D");

expect(rga.getArray()).toEqual(["D", "A", "C", "B"]);
});

test("Test Read", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(1);

expect(rga.read(0)).toBe("A");
expect(rga.read(1)).toBe("B");
});

test("Test Insert and Delete", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.insert(1, "C");
rga.delete(0);
rga.delete(0);
expect(rga.getArray()).toEqual(["B"]);

rga.clear();

rga.insert(0, "A");
rga.insert(1, "B");
rga.delete(0);

expect(rga.getArray()).toEqual(["B"]);

rga.insert(0, "C");
rga.insert(1, "D");
expect(rga.getArray()).toEqual(["C", "D", "B"]);

rga.delete(1);
expect(rga.getArray()).toEqual(["C", "B"]);

rga.delete(1);
expect(rga.getArray()).toEqual(["C"]);

peerRGA.insert(0, "E");
peerRGA.insert(0, "F");
peerRGA.insert(2, "G");
peerRGA.insert(3, "H");
peerRGA.delete(1);
peerRGA.delete(1);
peerRGA.delete(1);
expect(peerRGA.getArray()).toEqual(["F"]);
});

test("Test Update", () => {
rga.insert(0, "A");
rga.insert(1, "B");
rga.update(0, "C");
rga.update(1, "D");

expect(rga.getArray()).toEqual(["C", "D"]);
});

test("Test Merge", () => {
rga.insert(0, "A");
rga.insert(1, "B");

peerRGA.insert(0, "C");
peerRGA.insert(1, "D");
peerRGA.insert(0, "E");

rga.merge(peerRGA);
expect(rga.getArray()).toEqual(["E", "C", "A", "D", "B"]);

peerRGA.merge(rga);
expect(peerRGA.getArray()).toEqual(rga.getArray());
});
});