Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local snapshot #42

Merged
merged 8 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class NdnSvsAdaptor {
this.syncAgent.publishUpdate(this.topic, content)
}

private async handleSyncUpdate(content: Uint8Array) {
public async handleSyncUpdate(content: Uint8Array) {
// Apply patch
// Remark: `applyUpdate` will trigger a transaction after the update is decoded.
// We can register "beforeTransaction" event and throw an exception there to do access control.
Expand Down
70 changes: 70 additions & 0 deletions src/adaptors/yjs-state-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import * as Y from 'yjs'
import { Storage } from '../backend/storage'

const StateKey = 'localState'
const SnapshotKey = 'localSnapshot'

/**
* YjsStateManager manages updates transported by SyncAgent.
* It receives and emits updates, encodes and loads local state as updates.
*/
export class YjsStateManager {

private readonly callback = this.docUpdateHandler.bind(this)
private counter = 0

constructor(
public readonly getState: () => Uint8Array,
public readonly doc: Y.Doc,
public readonly storage: Storage,
public readonly threshold = 50,
) {
doc.on('update', this.callback)
}

public destroy() {
this.saveLocalSnapshot(this.getState())
this.doc.off('update', this.callback)
}

private docUpdateHandler(_update: Uint8Array, origin: undefined) {
if (origin !== this) { // This condition must be true
this.counter += 1
if (this.counter >= this.threshold) {
this.saveLocalSnapshot(this.getState())
}
} else {
console.error('[FATAL] YjsStateManager is not supposed to apply updates itself.' +
'Call the NDN Adaptor instead.')
}
}

/**
* Save the current status into a local snapshot.
* This snapshot includes everyone's update and is not supposed to be published.
* Public snapshots use a different mechanism.
* @param state the current SVS state vector
*/
public async saveLocalSnapshot(state: Uint8Array): Promise<void> {
this.counter = 0
const update = Y.encodeStateAsUpdate(this.doc)
await this.storage.set(SnapshotKey, update)
await this.storage.set(StateKey, state)
}

/**
* Load from a local snapshot.
* @param replayer the NdnSvsAdaptor's update function that handles replay.
* @returns a SVS vector that can be used as a start point for `replayUpdates`.
*/
public async loadLocalSnapshot(replayer: (update: Uint8Array) => Promise<void>): Promise<Uint8Array | undefined> {
const state = await this.storage.get(StateKey)
const snapshot = await this.storage.get(SnapshotKey)
if (snapshot) {
await replayer(snapshot)
return state
} else {
return undefined
}
}
}
15 changes: 14 additions & 1 deletion src/backend/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { Certificate, ECDSA, createSigner } from "@ndn/keychain"
import { v4 as uuidv4 } from "uuid"
import { base64ToBytes, encodeKey as encodePath, Signal as BackendSignal, openRoot } from "../utils"
import { Decoder } from "@ndn/tlv"
import { YjsStateManager } from "../adaptors/yjs-state-manager"
import { encodeSyncState, parseSyncState } from "./sync-agent/deliveries"

export const UseAutoAnnouncement = false

Expand All @@ -39,6 +41,7 @@ export let ownCertificate: Certificate | undefined
// TODO: Setup persistent storage using IndexDB
export let rootDoc: RootDocStore | undefined
export let yjsAdaptor: NdnSvsAdaptor | undefined
export let yjsSnapshotMgr: YjsStateManager | undefined

export let listener: PeerJsListener | undefined = undefined
export let nfdWsFace: FwFace | undefined = undefined
Expand Down Expand Up @@ -234,6 +237,12 @@ export async function bootstrapWorkspace(opts: {
getYjsDoc(rootDoc),
'doc'
)
yjsSnapshotMgr = new YjsStateManager(
() => encodeSyncState(syncAgent!.getUpdateSyncSV()),
getYjsDoc(rootDoc),
// No key conflict in this case. If we are worried, use anothe sub-folder.
persistStore,
)

// Load or create
if (opts.createNew) {
Expand All @@ -257,7 +266,8 @@ export async function bootstrapWorkspace(opts: {
}
rootDoc.latex[project.RootId].items.push(mainUuid)
} else {
await syncAgent.replayUpdates('doc')
const state = await yjsSnapshotMgr.loadLocalSnapshot(update => yjsAdaptor!.handleSyncUpdate(update))
await syncAgent.replayUpdates('doc', state ? parseSyncState(state) : undefined)
}

if (!opts.inMemory) {
Expand Down Expand Up @@ -288,6 +298,9 @@ export async function stopWorkspace() {

syncAgent!.ready = false

yjsSnapshotMgr!.destroy()
yjsSnapshotMgr = undefined

yjsAdaptor!.destroy()
yjsAdaptor = undefined

Expand Down
4 changes: 4 additions & 0 deletions src/backend/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ export class SyncAgent {
})
}

public getUpdateSyncSV() {
return new SvStateVector(this.atLeastOnce.syncState)
}

static async create(
nodeId: Name,
persistStorage: Storage,
Expand Down
Loading