Skip to content

Commit

Permalink
feat: waku store sync 2.0 storage & tests (#3215)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Jan 23, 2025
1 parent dfd06fa commit 54a7a68
Show file tree
Hide file tree
Showing 7 changed files with 643 additions and 32 deletions.
10 changes: 5 additions & 5 deletions tests/waku_store_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, random], chronos, chronicles

import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore
import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore

randomize()

Expand All @@ -12,7 +12,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash =

return hash

proc newTestWakuRecon*(
#[ proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
Expand All @@ -34,9 +34,9 @@ proc newTestWakuRecon*(
proto.start()
switch.mount(proto)
return proto
return proto ]#

proc newTestWakuTransfer*(
#[ proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
Expand All @@ -55,4 +55,4 @@ proc newTestWakuTransfer*(
proto.start()
switch.mount(proto)
return proto
return proto ]#
23 changes: 11 additions & 12 deletions tests/waku_store_sync/test_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_core/time,
../../waku/waku_store_sync,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/codec,
./sync_utils
Expand All @@ -19,12 +18,12 @@ proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet =
for i in 0 ..< count:
let diff = rng.rand(9.uint8) + 1

let timestamp = lastTime + diff * 1_000_000_000
let timestamp = lastTime + diff * 1_000
lastTime = timestamp

let hash = randomHash(rng)

let id = SyncID(time: Timestamp(timestamp), fingerprint: hash)
let id = SyncID(time: Timestamp(timestamp), hash: hash)

elements.add(id)

Expand All @@ -40,8 +39,8 @@ proc randomSetRange(
ub = itemSet.elements[^1]

#for test check equality
lb.fingerprint = EmptyFingerprint
ub.fingerprint = EmptyFingerprint
lb.hash = EmptyWakuMessageHash
ub.hash = EmptyWakuMessageHash

let bounds = lb .. ub

Expand Down Expand Up @@ -90,9 +89,9 @@ suite "Waku Store Sync Codec":
time = getNowInNanosecondTime()

let (bounds1, itemSet1) = randomSetRange(count, time, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 11_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 21_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 31_000_000, rng)

let range1 = (bounds1, RangeType.ItemSet)
let range2 = (bounds2, RangeType.ItemSet)
Expand Down Expand Up @@ -128,12 +127,12 @@ suite "Waku Store Sync Codec":
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)

for i in 0 ..< count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)

let nowTime = lastTime + 10_000_000_000 # 10s

lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

Expand Down Expand Up @@ -171,10 +170,10 @@ suite "Waku Store Sync Codec":
fingerprints = newSeqOfCap[Fingerprint](4)

for i in 1 .. count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

Expand Down
204 changes: 204 additions & 0 deletions tests/waku_store_sync/test_storage.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
{.used.}

import std/[options, random], testutils/unittests, chronos

import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/storage/seq_storage,
./sync_utils

suite "Waku Sync Storage":
test "process hash range":
var rng = initRand()
let count = 10_000
var elements = newSeqOfCap[SyncID](count)

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

var storage1 = SeqStorage.new(elements)
var storage2 = SeqStorage.new(elements)

let lb = elements[0]
let ub = elements[count - 1]
let bounds = lb .. ub
let fingerprint1 = storage1.computeFingerprint(bounds)

var outputPayload: RangesData

storage2.processFingerprintRange(bounds, fingerprint1, outputPayload)

let expected =
RangesData(ranges: @[(bounds, RangeType.Skip)], fingerprints: @[], itemSets: @[])

check:
outputPayload == expected

test "process item set range":
var rng = initRand()
let count = 1000
var elements1 = newSeqOfCap[SyncID](count)
var elements2 = newSeqOfCap[SyncID](count)
var diffs: seq[Fingerprint]

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements1.add(id)
if rng.rand(0 .. 9) == 0:
elements2.add(id)
else:
diffs.add(id.hash)

var storage1 = SeqStorage.new(elements1)

let lb = elements1[0]
let ub = elements1[count - 1]
let bounds = lb .. ub

let itemSet2 = ItemSet(elements: elements2, reconciled: true)

var
toSend: seq[Fingerprint]
toRecv: seq[Fingerprint]
outputPayload: RangesData

storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload)

check:
toSend == diffs

test "insert new element":
var rng = initRand()

let storage = SeqStorage.new(10)

let element1 = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let element2 = SyncID(time: Timestamp(2000), hash: randomHash(rng))

let res1 = storage.insert(element1)
assert res1.isOk(), $res1.error
let count1 = storage.length()

let res2 = storage.insert(element2)
assert res2.isOk(), $res2.error
let count2 = storage.length()

check:
count1 == 1
count2 == 2

test "insert duplicate":
var rng = initRand()

let element = SyncID(time: Timestamp(1000), hash: randomHash(rng))

let storage = SeqStorage.new(@[element])

let res = storage.insert(element)

check:
res.isErr() == true

test "prune elements":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[SyncID](count)

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

let storage = SeqStorage.new(elements)

let beforeCount = storage.length()

let pruned = storage.prune(Timestamp(500))

let afterCount = storage.length()

check:
beforeCount == 1000
pruned == 500
afterCount == 500

## disabled tests are rough benchmark
#[ test "10M fingerprint":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
let storage = SeqStorage.new(elements)
let before = getMonoTime()
discard storage.fingerprinting(some(0 .. count))
let after = getMonoTime()
echo "Fingerprint Time: " & $(after - before) ]#

#[ test "random inserts":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
var storage = SeqStorage.new(elements)
var avg: times.Duration
for i in 0 ..< 1000:
let newId =
SyncID(time: Timestamp(rng.rand(0 .. count)), hash: randomHash(rng))
let before = getMonoTime()
discard storage.insert(newId)
let after = getMonoTime()
avg += after - before
avg = avg div 1000
echo "Avg Time 1K Inserts: " & $avg ]#

#[ test "trim":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
var storage = SeqStorage.new(elements)
let before = getMonoTime()
discard storage.trim(Timestamp(count div 4))
let after = getMonoTime()
echo "Trim Time: " & $(after - before) ]#
Loading

0 comments on commit 54a7a68

Please sign in to comment.