Skip to content

Commit

Permalink
Merge pull request #399 from streamich/crdt-indexed-codec
Browse files Browse the repository at this point in the history
JSON CRDT indexed codec
  • Loading branch information
streamich authored Nov 6, 2023
2 parents 95826b1 + 40c43ba commit 38f29b3
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 219 deletions.
5 changes: 5 additions & 0 deletions src/json-crdt/__tests__/fuzzer/SessionLogical.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {encode as encodeJson} from '../../../json-crdt-patch/codec/verbose/encod
import {Encoder as BinaryEncoder} from '../../codec/structural/binary/Encoder';
import {Encoder as CompactEncoder} from '../../codec/structural/compact/Encoder';
import {Encoder as JsonEncoder} from '../../codec/structural/verbose/Encoder';
import {Encoder as IndexedBinaryEncoder} from '../../codec/indexed/binary/Encoder';
import {Decoder as IndexedBinaryDecoder} from '../../codec/indexed/binary/Decoder';
import {generateInteger} from './util';
import {Model} from '../..';
import {Patch} from '../../../json-crdt-patch/Patch';
Expand All @@ -27,6 +29,8 @@ const compactEncoder = new CompactEncoder();
const compactDecoder = new CompactDecoder();
const binaryEncoder = new BinaryEncoder();
const binaryDecoder = new BinaryDecoder();
const indexedBinaryEncoder = new IndexedBinaryEncoder();
const indexedBinaryDecoder = new IndexedBinaryDecoder();

export class SessionLogical {
public models: Model[] = [];
Expand Down Expand Up @@ -192,6 +196,7 @@ export class SessionLogical {
if (randomU32(0, 1)) model = jsonDecoder.decode(jsonEncoder.encode(model));
if (randomU32(0, 1)) model = compactDecoder.decode(compactEncoder.encode(model));
if (randomU32(0, 1)) model = binaryDecoder.decode(binaryEncoder.encode(model));
if (randomU32(0, 1)) model = indexedBinaryDecoder.decode(indexedBinaryEncoder.encode(model));
}
for (let j = 0; j < this.concurrency; j++) {
const patches = this.patches[j];
Expand Down
189 changes: 98 additions & 91 deletions src/json-crdt/codec/indexed/binary/Decoder.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
import {
ConNode,
JsonNode,
ValNode,
ArrNode,
ArrChunk,
BinNode,
BinChunk,
ObjNode,
StrNode,
StrChunk,
} from '../../../nodes';
import * as nodes from '../../../nodes';
import {ClockTable} from '../../../../json-crdt-patch/codec/clock/ClockTable';
import {CrdtReader} from '../../../../json-crdt-patch/util/binary/CrdtReader';
import {IndexedFields, FieldName, IndexedNodeFields} from './types';
import {ITimestampStruct, IVectorClock, Timestamp, VectorClock} from '../../../../json-crdt-patch/clock';
import {Model, UNDEFINED} from '../../../model/Model';
import {MsgPackDecoderFast} from '../../../../json-pack/msgpack';
import {CborDecoderBase} from '../../../../json-pack/cbor/CborDecoderBase';
import {CRDT_MAJOR} from '../../structural/binary/constants';

export class Decoder {
public readonly dec = new MsgPackDecoderFast<CrdtReader>(new CrdtReader());
public readonly dec: CborDecoderBase<CrdtReader>;
protected doc!: Model;
protected clockTable?: ClockTable;

constructor(reader?: CrdtReader) {
this.dec = new CborDecoderBase<CrdtReader>(reader || new CrdtReader());
}

public decode<M extends Model>(
fields: IndexedFields,
ModelConstructor: new (clock: IVectorClock) => M = Model as unknown as new (clock: IVectorClock) => M,
Expand All @@ -47,14 +41,17 @@ export class Decoder {
const rootValue = this.ts();
doc.root.set(rootValue);
}
const docIndex = doc.index;
for (const field in fields) {
const index = doc.index;
const keys = Object.keys(fields);
const length = keys.length;
for (let i = 0; i < length; i++) {
const field = keys[i];
if (field.length < 3) continue; // Skip "c" and "r".
const arr = fields[field as FieldName];
const id = clockTable.parseField(field as FieldName);
reader.reset(arr);
const node = this.decodeNode(id);
docIndex.set(node.id, node);
index.set(id, node);
}
return doc;
}
Expand All @@ -64,107 +61,117 @@ export class Decoder {
return new Timestamp(this.clockTable!.byIdx[sessionIndex].sid, timeDiff);
}

protected decodeNode(id: ITimestampStruct): JsonNode {
protected decodeNode(id: ITimestampStruct): nodes.JsonNode {
const reader = this.dec.reader;
const byte = reader.u8();
if (byte <= 0b10001111) return this.cObj(id, byte & 0b1111);
else if (byte <= 0b10011111) return this.cArr(id, byte & 0b1111);
else if (byte <= 0b10111111) return this.cStr(id, byte & 0b11111);
else {
switch (byte) {
case 0xc4:
return this.cBin(id, reader.u8());
case 0xc5:
return this.cBin(id, reader.u16());
case 0xc6:
return this.cBin(id, reader.u32());
case 0xd4:
return this.cConst(id);
case 0xd5:
return new ConNode(id, this.ts());
case 0xd6:
return this.cVal(id);
case 0xde:
return this.cObj(id, reader.u16());
case 0xdf:
return this.cObj(id, reader.u32());
case 0xdc:
return this.cArr(id, reader.u16());
case 0xdd:
return this.cArr(id, reader.u32());
case 0xd9:
return this.cStr(id, reader.u8());
case 0xda:
return this.cStr(id, reader.u16());
case 0xdb:
return this.cStr(id, reader.u32());
}
const octet = reader.u8();
const major = octet >> 5;
const minor = octet & 0b11111;
const length = minor < 24 ? minor : minor === 24 ? reader.u8() : minor === 25 ? reader.u16() : reader.u32();
switch (major) {
case CRDT_MAJOR.CON:
return this.decodeCon(id, length);
case CRDT_MAJOR.VAL:
return this.decodeVal(id);
case CRDT_MAJOR.OBJ:
return this.decodeObj(id, length);
case CRDT_MAJOR.VEC:
return this.decodeVec(id, length);
case CRDT_MAJOR.STR:
return this.decodeStr(id, length);
case CRDT_MAJOR.BIN:
return this.decodeBin(id, length);
case CRDT_MAJOR.ARR:
return this.decodeArr(id, length);
}

return UNDEFINED;
}

public cConst(id: ITimestampStruct): ConNode {
const val = this.dec.val();
return new ConNode(id, val);
public decodeCon(id: ITimestampStruct, length: number): nodes.ConNode {
const decoder = this.dec;
const data = !length ? decoder.val() : this.ts();
const node = new nodes.ConNode(id, data);
return node;
}

public cVal(id: ITimestampStruct): ValNode {
public decodeVal(id: ITimestampStruct): nodes.ValNode {
const val = this.ts();
return new ValNode(this.doc, id, val);
const node = new nodes.ValNode(this.doc, id, val);
return node;
}

public cObj(id: ITimestampStruct, length: number): ObjNode {
public decodeObj(id: ITimestampStruct, length: number): nodes.ObjNode {
const decoder = this.dec;
const obj = new ObjNode(this.doc, id);
const obj = new nodes.ObjNode(this.doc, id);
const keys = obj.keys;
for (let i = 0; i < length; i++) {
const key = String(decoder.val());
const key = decoder.val() + '';
const val = this.ts();
keys.set(key, val);
}
return obj;
}

protected cStr(id: ITimestampStruct, length: number): StrNode {
const decoder = this.dec;
const node = new StrNode(id);
node.ingest(length, () => {
const chunkId = this.ts();
const val = decoder.val();
if (typeof val === 'number') return new StrChunk(chunkId, val, '');
const data = String(val);
return new StrChunk(chunkId, data.length, data);
});
public decodeVec(id: ITimestampStruct, length: number): nodes.VecNode {
const reader = this.dec.reader;
const node = new nodes.VecNode(this.doc, id);
const elements = node.elements;
for (let i = 0; i < length; i++) {
const octet = reader.u8();
if (!octet) elements.push(undefined);
else elements.push(this.ts());
}
return node;
}

protected cBin(id: ITimestampStruct, length: number): BinNode {
const decoder = this.dec;
const reader = decoder.reader;
const node = new BinNode(id);
node.ingest(length, () => {
const chunkId = this.ts();
const [deleted, length] = reader.b1vu28();
if (deleted) return new BinChunk(chunkId, length, undefined);
const data = reader.buf(length);
return new BinChunk(chunkId, length, data);
});
protected decodeStr(id: ITimestampStruct, length: number): nodes.StrNode {
const node = new nodes.StrNode(id);
node.ingest(length, this.decodeStrChunk);
return node;
}

protected cArr(id: ITimestampStruct, length: number): ArrNode {
private decodeStrChunk = (): nodes.StrChunk => {
const decoder = this.dec;
const reader = decoder.reader;
const node = new ArrNode(this.doc, id);
node.ingest(length, () => {
const chunkId = this.ts();
const [deleted, length] = reader.b1vu28();
if (deleted) return new ArrChunk(chunkId, length, undefined);
const data: ITimestampStruct[] = [];
for (let i = 0; i < length; i++) data.push(this.ts());
return new ArrChunk(chunkId, length, data);
});
const id = this.ts();
const isTombstone = reader.uint8[reader.x] === 0;
if (isTombstone) {
reader.x++;
const length = reader.vu39();
return new nodes.StrChunk(id, length, '');
}
const text: string = decoder.readAsStr() as string;
return new nodes.StrChunk(id, text.length, text);
};

protected decodeBin(id: ITimestampStruct, length: number): nodes.BinNode {
const node = new nodes.BinNode(id);
node.ingest(length, this.decodeBinChunk);
return node;
}

private decodeBinChunk = (): nodes.BinChunk => {
const id = this.ts();
const reader = this.dec.reader;
const [deleted, length] = reader.b1vu56();
if (deleted) return new nodes.BinChunk(id, length, undefined);
else return new nodes.BinChunk(id, length, reader.buf(length));
};

protected decodeArr(id: ITimestampStruct, length: number): nodes.ArrNode {
const node = new nodes.ArrNode(this.doc, id);
node.ingest(length, this.decodeArrChunk);
return node;
}

private decodeArrChunk = (): nodes.ArrChunk => {
const id = this.ts();
const reader = this.dec.reader;
const [deleted, length] = reader.b1vu56();
if (deleted) return new nodes.ArrChunk(id, length, undefined);
else {
const data: ITimestampStruct[] = [];
for (let i = 0; i < length; i++) data.push(this.ts());
return new nodes.ArrChunk(id, length, data);
}
};
}
Loading

0 comments on commit 38f29b3

Please sign in to comment.