Skip to content

Commit

Permalink
Merge pull request #558 from streamich/patch-log
Browse files Browse the repository at this point in the history
`PatchLog` improvements
  • Loading branch information
streamich authored Mar 26, 2024
2 parents 3ec6776 + 4d06e69 commit 0a1e5be
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 107 deletions.
23 changes: 11 additions & 12 deletions src/json-crdt/file/File.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Model} from '../model';
import {PatchLog} from './PatchLog';
import {PatchLog} from '../history/PatchLog';
import {printTree} from '../../util/print/printTree';
import {decodeModel, decodeNdjsonComponents, decodePatch, decodeSeqCborComponents} from './util';
import {Patch} from '../../json-crdt-patch';
Expand Down Expand Up @@ -46,9 +46,8 @@ export class File implements Printable {
if (history) {
const [start, patches] = history;
if (start) {
const startModel = decodeModel(start);
log = new PatchLog(startModel);
for (const patch of patches) log.push(decodePatch(patch));
log = new PatchLog(() => decodeModel(start));
for (const patch of patches) log.end.applyPatch(decodePatch(patch));
}
}
if (!log) throw new Error('NO_HISTORY');
Expand All @@ -57,7 +56,7 @@ export class File implements Printable {
for (const patch of frontier) {
const patchDecoded = decodePatch(patch);
decodedModel.applyPatch(patchDecoded);
log.push(patchDecoded);
log.end.applyPatch(patchDecoded);
}
}
const file = new File(decodedModel, log);
Expand All @@ -75,7 +74,7 @@ export class File implements Printable {
}

public static fromModel(model: Model<any>, options: FileOptions = {}): File {
return new File(model, PatchLog.fromModel(model), options);
return new File(model, PatchLog.fromNewModel(model), options);
}

constructor(
Expand All @@ -88,7 +87,7 @@ export class File implements Printable {
const id = patch.getId();
if (!id) return;
this.model.applyPatch(patch);
this.log.push(patch);
this.log.end.applyPatch(patch);
}

/**
Expand All @@ -100,10 +99,10 @@ export class File implements Printable {
const api = model.api;
const autoflushUnsubscribe = api.autoFlush();
const onPatchUnsubscribe = api.onPatch.listen((patch) => {
log.push(patch);
log.end.applyPatch(patch);
});
const onFlushUnsubscribe = api.onFlush.listen((patch) => {
log.push(patch);
log.end.applyPatch(patch);
});
return () => {
autoflushUnsubscribe();
Expand Down Expand Up @@ -153,7 +152,7 @@ export class File implements Printable {
const patchFormat = params.history ?? 'binary';
switch (patchFormat) {
case 'binary': {
history[0] = this.log.start.toBinary();
history[0] = this.log.start().toBinary();
this.log.patches.forEach(({v}) => {
history[1].push(v.toBinary());
});
Expand All @@ -162,7 +161,7 @@ export class File implements Printable {
case 'compact': {
const encoder = this.options.structuralCompactEncoder;
if (!encoder) throw new Error('NO_COMPACT_ENCODER');
history[0] = encoder.encode(this.log.start);
history[0] = encoder.encode(this.log.start());
const encodeCompact = this.options.patchCompactEncoder;
if (!encodeCompact) throw new Error('NO_COMPACT_PATCH_ENCODER');
const list = history[1];
Expand All @@ -174,7 +173,7 @@ export class File implements Printable {
case 'verbose': {
const encoder = this.options.structuralVerboseEncoder;
if (!encoder) throw new Error('NO_VERBOSE_ENCODER');
history[0] = encoder.encode(this.log.start);
history[0] = encoder.encode(this.log.start());
const encodeVerbose = this.options.patchVerboseEncoder;
if (!encodeVerbose) throw new Error('NO_VERBOSE_PATCH_ENCODER');
const list = history[1];
Expand Down
61 changes: 0 additions & 61 deletions src/json-crdt/file/PatchLog.ts

This file was deleted.

10 changes: 5 additions & 5 deletions src/json-crdt/file/__tests__/File.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ test('can create File from new model', () => {
}),
);
const file = File.fromModel(model);
expect(file.log.start.view()).toBe(undefined);
expect(file.log.start().view()).toBe(undefined);
expect(file.model.view()).toEqual({
foo: 'bar',
});
expect(file.log.start.clock.sid).toBe(file.model.clock.sid);
expect(file.log.start().clock.sid).toBe(file.model.clock.sid);
});

test.todo('patches are flushed and stored in memory');
Expand Down Expand Up @@ -56,7 +56,7 @@ describe('.toBinary()', () => {
const file2 = File.fromNdjson(blob);
expect(file2.model.view()).toEqual({foo: 'bar'});
expect(file2.model !== file.model).toBe(true);
expect(file.log.start.view()).toEqual(undefined);
expect(file.log.start().view()).toEqual(undefined);
expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'});
});

Expand All @@ -66,7 +66,7 @@ describe('.toBinary()', () => {
const file2 = File.fromSeqCbor(blob);
expect(file2.model.view()).toEqual({foo: 'bar'});
expect(file2.model !== file.model).toBe(true);
expect(file.log.start.view()).toEqual(undefined);
expect(file.log.start().view()).toEqual(undefined);
expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'});
});
});
Expand All @@ -78,7 +78,7 @@ describe('.toBinary()', () => {
params.format === 'seq.cbor' ? File.fromSeqCbor(blob, fileEncoders) : File.fromNdjson(blob, fileEncoders);
expect(file2.model.view()).toEqual(file.model.view());
expect(file2.model !== file.model).toBe(true);
expect(file2.log.start.view()).toEqual(undefined);
expect(file2.log.start().view()).toEqual(undefined);
expect(file2.log.replayToEnd().view()).toEqual(file.model.view());
expect(file2.log.patches.size()).toBe(file.log.patches.size());
};
Expand Down
29 changes: 0 additions & 29 deletions src/json-crdt/file/__tests__/PatchLog.spec.ts

This file was deleted.

149 changes: 149 additions & 0 deletions src/json-crdt/history/PatchLog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
import {ITimestampStruct, Patch, compare} from '../../json-crdt-patch';
import {printTree} from '../../util/print/printTree';
import {AvlMap} from '../../util/trees/avl/AvlMap';
import {Model} from '../model';
import {first, next} from '../../util/trees/util';
import type {Printable} from '../../util/print/types';

export class PatchLog implements Printable {
/**
* Creates a `PatchLog` instance from a newly JSON CRDT model. Checks if
* the model API buffer has any initial operations applied, if yes, it
* uses them to create the initial state of the log.
*
* @param model A new JSON CRDT model, just created with
* `Model.withLogicalClock()` or `Model.withServerClock()`.
* @returns A new `PatchLog` instance.
*/
public static fromNewModel(model: Model<any>): PatchLog {
const clock = model.clock.clone();
const log = new PatchLog(() => new Model(clock));
const api = model.api;
if (api.builder.patch.ops.length) log.end.applyPatch(api.flush());
return log;
}

/**
* Model factory function that creates a new JSON CRDT model instance, which
* is used as the starting point of the log. It is called every time a new
* model is needed to replay the log.
*
* @readonly Internally this function may be updated, but externally it is
* read-only.
*/
public start: () => Model;

/**
* The end of the log, the current state of the document. It is the model
* instance that is used to apply new patches to the log.
*
* @readonly
*/
public readonly end: Model;

/**
* The patches in the log, stored in an AVL tree for efficient replaying. The
* collection of patches which are applied to the `start()` model to reach
* the `end` model.
*
* @readonly
*/
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);

private __onPatch: FanOutUnsubscribe;
private __onFlush: FanOutUnsubscribe;

constructor(start: () => Model) {
this.start = start;
const end = (this.end = start());
const onPatch = (patch: Patch) => {
const id = patch.getId();
if (!id) return;
this.patches.set(id, patch);
};
const api = end.api;
this.__onPatch = api.onPatch.listen(onPatch);
this.__onFlush = api.onFlush.listen(onPatch);
}

/**
* Call this method to destroy the `PatchLog` instance. It unsubscribes patch
* and flush listeners from the `end` model and clears the patch log.
*/
public destroy() {
this.__onPatch();
this.__onFlush();
this.patches.clear();
}

/**
* Creates a new model instance using the `start()` factory function and
* replays all patches in the log to reach the current state of the document.
*
* @returns A new model instance with all patches replayed.
*/
public replayToEnd(): Model {
const clone = this.start().clone();
for (let node = first(this.patches.root); node; node = next(node)) clone.applyPatch(node.v);
return clone;
}

/**
* Replays the patch log until a specified timestamp, including the patch
* at the given timestamp. The model returned is a new instance of `start()`
* with patches replayed up to the given timestamp.
*
* @param ts Timestamp ID of the patch to replay to.
* @returns A new model instance with patches replayed up to the given timestamp.
*/
public replayTo(ts: ITimestampStruct): Model {
const clone = this.start().clone();
for (let node = first(this.patches.root); node && compare(ts, node.k) >= 0; node = next(node))
clone.applyPatch(node.v);
return clone;
}

/**
* Advance the start of the log to a specified timestamp, excluding the patch
* at the given timestamp. This method removes all patches from the log that
* are older than the given timestamp and updates the `start()` factory
* function to replay the log from the new start.
*
* @param ts Timestamp ID of the patch to advance to.
*/
public advanceTo(ts: ITimestampStruct): void {
const newStartPatches: Patch[] = [];
let node = first(this.patches.root);
for (; node && compare(ts, node.k) >= 0; node = next(node)) newStartPatches.push(node.v);
for (const patch of newStartPatches) this.patches.del(patch.getId()!);
const oldStart = this.start;
this.start = (): Model => {
const model = oldStart();
for (const patch of newStartPatches) model.applyPatch(patch);
return model;
};
}

// ---------------------------------------------------------------- Printable

public toString(tab?: string) {
const patches: Patch[] = [];
this.patches.forEach(({v}) => patches.push(v));
return (
`log` +
printTree(tab, [
(tab) => `start` + printTree(tab, [(tab) => this.start().toString(tab)]),
() => '',
(tab) =>
'history' +
printTree(
tab,
patches.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`),
),
() => '',
(tab) => `end` + printTree(tab, [(tab) => this.end.toString(tab)]),
])
);
}
}
Loading

0 comments on commit 0a1e5be

Please sign in to comment.