diff --git a/packages/node/src/sync/decoder.ts b/packages/node/src/sync/decoder.ts new file mode 100644 index 000000000..49599439a --- /dev/null +++ b/packages/node/src/sync/decoder.ts @@ -0,0 +1,130 @@ +import type {Symbol} from "./symbol.js" +import type { CodedSymbol, HashedSymbol } from "./symbol.js"; + + +export class Decoder> { + // Coded symbols received so far + private cs: CodedSymbol[] = []; + // Set of source symbols that are exclusive to the decoder + private local: CodingWindow; + // Set of source symbols that the decoder initially has + private window: CodingWindow; + // Set of source symbols that are exclusive to the encoder + private remote: CodingWindow; + // Indices of coded symbols that can be decoded, i.e., degree equal to -1 + // or 1 and sum of hash equal to hash of sum, or degree equal to 0 and sum + // of hash equal to 0 + private decodable: number[] = []; + // Number of coded symbols that are decoded + private decoded: number = 0; + + constructor() { + this.local = new CodingWindow(); + this.window = new CodingWindow(); + this.remote = new CodingWindow(); + } + + // Decoded returns true if and only if every existing coded symbols d received + // so far have been decoded. + public decoded(): boolean { + return this.decoded === this.cs.length; + } + + // Local returns the list of source symbols that are present in B but not in A. + public localSymbols(): HashedSymbol[] { + return this.local.symbols; + } + + // Remote returns the list of source symbols that are present in A but not in B. + public remoteSymbols(): HashedSymbol[] { + return this.remote.symbols; + } + + // AddSymbol adds a source symbol to B, the Decoder's local set. + public addSymbol(s: T): void { + const th = new HashedSymbol(s, s.hash()); + this.addHashedSymbol(th); + } + + // AddHashedSymbol adds a source symbol to B, the Decoder's local set. + public addHashedSymbol(s: HashedSymbol): void { + this.window.addHashedSymbol(s); + } + + // AddCodedSymbol passes the next coded symbol in A's sequence to the Decoder. + public addCodedSymbol(c: CodedSymbol): void { + // Scan through decoded symbols to peel off matching ones + c = this.window.applyWindow(c, "remove"); + c = this.remote.applyWindow(c, "remove"); + c = this.local.applyWindow(c, "add"); + // Insert the new coded symbol + this.cs.push(c); + // Check if the coded symbol is decodable, and insert into decodable list if so + if ((c.count === 1 || c.count === -1) && c.hash === c.symbol.hash()) { + this.decodable.push(this.cs.length - 1); + } else if (c.count === 0 && c.hash === 0) { + this.decodable.push(this.cs.length - 1); + } + } + + // Apply a new symbol and modify the corresponding coded symbols + private applyNewSymbol(t: HashedSymbol, direction: number): RandomMapping { + let m = new RandomMapping(t.hash, 0); + while (m.lastIdx < this.cs.length) { + const cidx = m.lastIdx; + this.cs[cidx] = this.cs[cidx].apply(t, direction); + // Check if the coded symbol is now decodable + if ( + (this.cs[cidx].count === -1 || this.cs[cidx].count === 1) && + this.cs[cidx].hash === this.cs[cidx].symbol.hash() + ) { + this.decodable.push(cidx); + } + m.nextIndex(); + } + return m; + } + + // TryDecode tries to decode all coded symbols received so far. + public tryDecode(): void { + for (const didx of this.decodable) { + const cidx = this.decodable[didx]; + const c = this.cs[cidx]; + switch (c.count) { + case 1: + // Allocate a symbol and then XOR with the sum + const ns1 = new HashedSymbol(); + ns1.symbol = ns1.symbol.xor(c.symbol); + ns1.hash = c.hash; + const m1 = this.applyNewSymbol(ns1, -1); + this.remote.addHashedSymbolWithMapping(ns1, m1); + this.decoded += 1; + break; + case -1: + const ns2 = new HashedSymbol(); + ns2.symbol = ns2.symbol.xor(c.symbol); + ns2.hash = c.hash; + const m2 = this.applyNewSymbol(ns2, 1); + this.local.addHashedSymbolWithMapping(ns2, m2); + this.decoded += 1; + break; + case 0: + this.decoded += 1; + break; + default: + throw new Error("Invalid degree for decodable coded symbol"); + } + } + this.decodable = []; + } + + // Reset clears the decoder. + public reset(): void { + this.cs = []; + this.decodable = []; + this.local.reset(); + this.remote.reset(); + this.window.reset(); + this.decoded = 0; + } +} diff --git a/packages/node/src/sync/encoder.ts b/packages/node/src/sync/encoder.ts new file mode 100644 index 000000000..41415b594 --- /dev/null +++ b/packages/node/src/sync/encoder.ts @@ -0,0 +1,156 @@ +class SymbolMapping { + sourceIdx: number; + codedIdx: number; + + constructor(sourceIdx: number, codedIdx: number) { + this.sourceIdx = sourceIdx; + this.codedIdx = codedIdx; + } +} + +class MappingHeap { + private heap: SymbolMapping[]; + + constructor() { + this.heap = []; + } + + private fixHead(): void { + let curr = 0; + while (true) { + let child = curr * 2 + 1; + if (child >= this.heap.length) { + break; // No left child + } + if ( + child + 1 < this.heap.length && + this.heap[child + 1].codedIdx < this.heap[child].codedIdx + ) { + child = child + 1; // Right child has higher priority + } + if (this.heap[curr].codedIdx <= this.heap[child].codedIdx) { + break; // Invariant is satisfied + } + // Swap current and child + [this.heap[curr], this.heap[child]] = [this.heap[child], this.heap[curr]]; + curr = child; + } + } + + private fixTail(): void { + let curr = this.heap.length - 1; + while (curr > 0) { + const parent = Math.floor((curr - 1) / 2); + if (this.heap[parent].codedIdx <= this.heap[curr].codedIdx) { + break; + } + // Swap parent and current + [this.heap[parent], this.heap[curr]] = [ + this.heap[curr], + this.heap[parent], + ]; + curr = parent; + } + } + + push(mapping: SymbolMapping): void { + this.heap.push(mapping); + this.fixTail(); + } + + pop(): SymbolMapping | undefined { + if (this.heap.length === 0) return undefined; + const root = this.heap[0]; + this.heap[0] = this.heap[this.heap.length - 1]; + this.heap.pop(); + this.fixHead(); + return root; + } + + get size(): number { + return this.heap.length; + } + + get top(): SymbolMapping | undefined { + return this.heap[0]; + } +} + +class CodingWindow> { + private symbols: HashedSymbol[]; + private mappings: RandomMapping[]; + private queue: MappingHeap; + private nextIdx: number; + + constructor() { + this.symbols = []; + this.mappings = []; + this.queue = new MappingHeap(); + this.nextIdx = 0; + } + + addSymbol(symbol: T): void { + const hashedSymbol = new HashedSymbol(symbol, symbol.hash()); + this.addHashedSymbol(hashedSymbol); + } + + addHashedSymbol(hashedSymbol: HashedSymbol): void { + const mapping = new RandomMapping(hashedSymbol.hash, 0); + this.addHashedSymbolWithMapping(hashedSymbol, mapping); + } + + addHashedSymbolWithMapping( + hashedSymbol: HashedSymbol, + mapping: RandomMapping, + ): void { + this.symbols.push(hashedSymbol); + this.mappings.push(mapping); + this.queue.push( + new SymbolMapping(this.symbols.length - 1, mapping.lastIdx), + ); + } + + applyWindow(cw: CodedSymbol, direction: number): CodedSymbol { + if (this.queue.size === 0) { + this.nextIdx += 1; + return cw; + } + + while (this.queue.top?.codedIdx === this.nextIdx) { + const mapping = this.queue.pop(); + if (mapping) { + cw = cw.apply(this.symbols[mapping.sourceIdx], direction); + const nextMap = this.mappings[mapping.sourceIdx].nextIndex(); + mapping.codedIdx = nextMap; + this.queue.push(mapping); + } + } + this.nextIdx += 1; + return cw; + } + + reset(): void { + this.symbols = []; + this.mappings = []; + this.queue = new MappingHeap(); + this.nextIdx = 0; + } +} + +class Encoder> extends CodingWindow { + addSymbol(s: T): void { + super.addSymbol(s); + } + + addHashedSymbol(s: HashedSymbol): void { + super.addHashedSymbol(s); + } + + produceNextCodedSymbol(): CodedSymbol { + return super.applyWindow(new CodedSymbol(), 1); + } + + reset(): void { + super.reset(); + } +} diff --git a/packages/node/src/sync/index.ts b/packages/node/src/sync/index.ts new file mode 100644 index 000000000..e69de29bb diff --git a/packages/node/src/sync/mapping.ts b/packages/node/src/sync/mapping.ts new file mode 100644 index 000000000..756fae9a2 --- /dev/null +++ b/packages/node/src/sync/mapping.ts @@ -0,0 +1,17 @@ +export class RandomMapping { + private prng: bigint; + lastIdx: number; + + constructor(prng: number, lastIdx = 0) { + this.prng = BigInt(prng); + this.lastIdx = lastIdx; + } + + nextIndex(): number { + const multiplier = 0xda942042e4dd58b5n; + this.prng = (this.prng * multiplier) & 0xffffffffffffffffn; + + this.lastIdx += Math.ceil((this.lastIdx + 1.5) * ((2 ** 32) / Math.sqrt(Number(this.prng) + 1) - 1)); + return this.lastIdx; + } +} diff --git a/packages/node/src/sync/sketch.ts b/packages/node/src/sync/sketch.ts new file mode 100644 index 000000000..2c5f4e802 --- /dev/null +++ b/packages/node/src/sync/sketch.ts @@ -0,0 +1,67 @@ +import { Symbol } from "./symbol.js" +import {HashedSymbol, CodedSymbol} from "./symbol.js" +import {RandomMapping} from "./mapping.js" +import {Decoder} from "./decoder.js" + + + +class Sketch> { + s: CodedSymbol[] = [] + + addHashedSymbol(t: HashedSymbol) { + const m = new RandomMapping(t.Hash, 0); + while (m.lastIdx < this.s.length) { + const idx = m.lastIdx; + this.s[idx].Symbol = this.s[idx].Symbol.XOR(t.Symbol); + this.s[idx].Count += 1; + this.s[idx].Hash ^= t.Hash; + m.nextIndex(); + } + } + + removeHashedSymbol(t: HashedSymbol) { + const m = new RandomMapping(t.Hash, 0); + while (m.lastIdx < this.s.length) { + const idx = m.lastIdx; + this.s[idx].Symbol = this.s[idx].Symbol.XOR(t.Symbol); + this.s[idx].Count -= 1; + this.s[idx].Hash ^= t.Hash; + m.nextIndex(); + } + } + + addSymbol(t: T) { + const hs = new HashedSymbol(t, t.Hash()); + this.addHashedSymbol(hs); + } + + removeSymbol(t: T) { + const hs = new HashedSymbol(t, t.Hash()); + this.removeHashedSymbol(hs); + } + + subtract(s2: Sketch) { + if (this.s.length !== s2.s.length) { + throw Error("subtracting sketches of different sizes"); + } + + for (let i = 0; i < this.s.length; i++) { + this.s[i].Symbol = this.s[i].Symbol.XOR(s2.s[i].Symbol); + this.s[i].Count -= s2.s[i].Count; + this.s[i].Hash ^= s2.s[i].Hash; + } + } + + decode(): { fwd: HashedSymbol[]; rev: HashedSymbol[]; succ: boolean } { + const dec = new Decoder(); + for (const c of this.s) { + dec.addCodedSymbol(c); + } + dec.tryDecode(); + return { + fwd: dec.remote(), + rev: dec.local(), + succ: dec.decoded(), + }; + } +} diff --git a/packages/node/src/sync/symbol.ts b/packages/node/src/sync/symbol.ts new file mode 100644 index 000000000..44e2216c6 --- /dev/null +++ b/packages/node/src/sync/symbol.ts @@ -0,0 +1,33 @@ +export interface Symbol { + XOR(t2: T): T; + Hash(): number; +} + +export class HashedSymbol> { + Symbol: T; + Hash: number; + + constructor(symbol: T, hash: number) { + this.Symbol = symbol; + this.Hash = hash; + } +} + +export class CodedSymbol> extends HashedSymbol { + Count: number; + + constructor(symbol: T, hash: number, count = 0) { + super(symbol, hash); + this.Count = count; + } + + static readonly ADD = 1; + static readonly REMOVE = -1; + + apply(s: HashedSymbol, direction: number): CodedSymbol { + this.Symbol = this.Symbol.XOR(s.Symbol); + this.Hash ^= s.Hash; + this.Count += direction; + return this; + } +}