-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add rolling bloom filter, reliability utils and protobuf #4
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Package | ||
version = "0.1.0" | ||
author = "Waku Team" | ||
description = "E2E Reliability Protocol API" | ||
license = "MIT" | ||
srcDir = "src" | ||
|
||
# Dependencies | ||
requires "nim >= 2.0.8" | ||
requires "chronicles" | ||
requires "libp2p" | ||
|
||
# Tasks | ||
task test, "Run the test suite": | ||
exec "nim c -r tests/test_bloom.nim" | ||
exec "nim c -r tests/test_reliability.nim" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import std/times | ||
|
||
type | ||
MessageID* = string | ||
|
||
Message* = object | ||
messageId*: MessageID | ||
lamportTimestamp*: int64 | ||
causalHistory*: seq[MessageID] | ||
channelId*: string | ||
content*: seq[byte] | ||
bloomFilter*: seq[byte] | ||
|
||
UnacknowledgedMessage* = object | ||
message*: Message | ||
sendTime*: Time | ||
resendAttempts*: int | ||
|
||
TimestampedMessageID* = object | ||
id*: MessageID | ||
timestamp*: Time | ||
|
||
const | ||
DefaultMaxMessageHistory* = 1000 | ||
DefaultMaxCausalHistory* = 10 | ||
DefaultResendInterval* = initDuration(seconds = 60) | ||
DefaultMaxResendAttempts* = 5 | ||
DefaultSyncMessageInterval* = initDuration(seconds = 30) | ||
DefaultBufferSweepInterval* = initDuration(seconds = 60) | ||
MaxMessageSize* = 1024 * 1024 # 1 MB |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,122 @@ | ||||||||||||||||
import libp2p/protobuf/minprotobuf | ||||||||||||||||
import std/options | ||||||||||||||||
import ../src/[message, protobufutil, bloom, reliability_utils] | ||||||||||||||||
|
||||||||||||||||
proc toBytes(s: string): seq[byte] = | ||||||||||||||||
result = newSeq[byte](s.len) | ||||||||||||||||
copyMem(result[0].addr, s[0].unsafeAddr, s.len) | ||||||||||||||||
|
||||||||||||||||
proc encode*(msg: Message): ProtoBuffer = | ||||||||||||||||
var pb = initProtoBuffer() | ||||||||||||||||
|
||||||||||||||||
pb.write(1, msg.messageId) | ||||||||||||||||
pb.write(2, uint64(msg.lamportTimestamp)) | ||||||||||||||||
|
||||||||||||||||
for hist in msg.causalHistory: | ||||||||||||||||
pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling | ||||||||||||||||
|
||||||||||||||||
pb.write(4, msg.channelId) | ||||||||||||||||
pb.write(5, msg.content) | ||||||||||||||||
pb.write(6, msg.bloomFilter) | ||||||||||||||||
pb.finish() | ||||||||||||||||
|
||||||||||||||||
pb | ||||||||||||||||
|
||||||||||||||||
proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] = | ||||||||||||||||
let pb = initProtoBuffer(buffer) | ||||||||||||||||
var msg = Message() | ||||||||||||||||
|
||||||||||||||||
if not ?pb.getField(1, msg.messageId): | ||||||||||||||||
return err(ProtobufError.missingRequiredField("messageId")) | ||||||||||||||||
|
||||||||||||||||
var timestamp: uint64 | ||||||||||||||||
if not ?pb.getField(2, timestamp): | ||||||||||||||||
return err(ProtobufError.missingRequiredField("lamportTimestamp")) | ||||||||||||||||
msg.lamportTimestamp = int64(timestamp) | ||||||||||||||||
|
||||||||||||||||
# Decode causal history | ||||||||||||||||
var causalHistory: seq[string] | ||||||||||||||||
let histResult = pb.getRepeatedField(3, causalHistory) | ||||||||||||||||
if histResult.isOk: | ||||||||||||||||
msg.causalHistory = causalHistory | ||||||||||||||||
|
||||||||||||||||
if not ?pb.getField(4, msg.channelId): | ||||||||||||||||
return err(ProtobufError.missingRequiredField("channelId")) | ||||||||||||||||
|
||||||||||||||||
if not ?pb.getField(5, msg.content): | ||||||||||||||||
return err(ProtobufError.missingRequiredField("content")) | ||||||||||||||||
|
||||||||||||||||
if not ?pb.getField(6, msg.bloomFilter): | ||||||||||||||||
msg.bloomFilter = @[] # Empty if not present | ||||||||||||||||
|
||||||||||||||||
ok(msg) | ||||||||||||||||
|
||||||||||||||||
proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] = | ||||||||||||||||
try: | ||||||||||||||||
let pb = encode(msg) | ||||||||||||||||
ok(pb.buffer) | ||||||||||||||||
except: | ||||||||||||||||
err(reSerializationError) | ||||||||||||||||
|
||||||||||||||||
proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] = | ||||||||||||||||
try: | ||||||||||||||||
let msgResult = Message.decode(data) | ||||||||||||||||
if msgResult.isOk: | ||||||||||||||||
ok(msgResult.get) | ||||||||||||||||
else: | ||||||||||||||||
err(reSerializationError) | ||||||||||||||||
Comment on lines
+63
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
except: | ||||||||||||||||
err(reDeserializationError) | ||||||||||||||||
|
||||||||||||||||
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] = | ||||||||||||||||
try: | ||||||||||||||||
var pb = initProtoBuffer() | ||||||||||||||||
|
||||||||||||||||
# Convert intArray to bytes | ||||||||||||||||
var bytes = newSeq[byte](filter.intArray.len * sizeof(int)) | ||||||||||||||||
for i, val in filter.intArray: | ||||||||||||||||
let start = i * sizeof(int) | ||||||||||||||||
copyMem(addr bytes[start], unsafeAddr val, sizeof(int)) | ||||||||||||||||
|
||||||||||||||||
pb.write(1, bytes) | ||||||||||||||||
pb.write(2, uint64(filter.capacity)) | ||||||||||||||||
pb.write(3, uint64(filter.errorRate * 1_000_000)) | ||||||||||||||||
pb.write(4, uint64(filter.kHashes)) | ||||||||||||||||
pb.write(5, uint64(filter.mBits)) | ||||||||||||||||
|
||||||||||||||||
pb.finish() | ||||||||||||||||
ok(pb.buffer) | ||||||||||||||||
except: | ||||||||||||||||
err(reSerializationError) | ||||||||||||||||
|
||||||||||||||||
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] = | ||||||||||||||||
if data.len == 0: | ||||||||||||||||
return err(reDeserializationError) | ||||||||||||||||
|
||||||||||||||||
try: | ||||||||||||||||
let pb = initProtoBuffer(data) | ||||||||||||||||
var bytes: seq[byte] | ||||||||||||||||
var cap, errRate, kHashes, mBits: uint64 | ||||||||||||||||
|
||||||||||||||||
if not pb.getField(1, bytes).get() or | ||||||||||||||||
not pb.getField(2, cap).get() or | ||||||||||||||||
not pb.getField(3, errRate).get() or | ||||||||||||||||
not pb.getField(4, kHashes).get() or | ||||||||||||||||
not pb.getField(5, mBits).get(): | ||||||||||||||||
return err(reDeserializationError) | ||||||||||||||||
|
||||||||||||||||
# Convert bytes back to intArray | ||||||||||||||||
var intArray = newSeq[int](bytes.len div sizeof(int)) | ||||||||||||||||
for i in 0 ..< intArray.len: | ||||||||||||||||
let start = i * sizeof(int) | ||||||||||||||||
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int)) | ||||||||||||||||
Comment on lines
+109
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That looks beautiful but a couple of doubts:
wdyt @arnetheduck ? |
||||||||||||||||
|
||||||||||||||||
ok(BloomFilter( | ||||||||||||||||
intArray: intArray, | ||||||||||||||||
capacity: int(cap), | ||||||||||||||||
errorRate: float(errRate) / 1_000_000, | ||||||||||||||||
kHashes: int(kHashes), | ||||||||||||||||
mBits: int(mBits) | ||||||||||||||||
)) | ||||||||||||||||
except: | ||||||||||||||||
err(reDeserializationError) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# adapted from https://github.com/waku-org/nwaku/blob/master/waku/common/protobuf.nim | ||
|
||
{.push raises: [].} | ||
|
||
import libp2p/protobuf/minprotobuf | ||
import libp2p/varint | ||
|
||
export minprotobuf, varint | ||
|
||
type | ||
ProtobufErrorKind* {.pure.} = enum | ||
DecodeFailure | ||
MissingRequiredField | ||
|
||
ProtobufError* = object | ||
case kind*: ProtobufErrorKind | ||
of DecodeFailure: | ||
error*: minprotobuf.ProtoError | ||
of MissingRequiredField: | ||
field*: string | ||
|
||
ProtobufResult*[T] = Result[T, ProtobufError] | ||
|
||
converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError = | ||
case err | ||
of minprotobuf.ProtoError.RequiredFieldMissing: | ||
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: "unknown") | ||
else: | ||
ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err) | ||
|
||
proc missingRequiredField*(T: type ProtobufError, field: string): T = | ||
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field) |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,95 @@ | ||||||||||
import std/[times, locks] | ||||||||||
import ./[rolling_bloom_filter, message] | ||||||||||
|
||||||||||
type | ||||||||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} | ||||||||||
|
||||||||||
ReliabilityConfig* = object | ||||||||||
bloomFilterCapacity*: int | ||||||||||
bloomFilterErrorRate*: float | ||||||||||
bloomFilterWindow*: times.Duration | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I argue elsewhere that we shouldn't bring |
||||||||||
maxMessageHistory*: int | ||||||||||
maxCausalHistory*: int | ||||||||||
resendInterval*: times.Duration | ||||||||||
maxResendAttempts*: int | ||||||||||
syncMessageInterval*: times.Duration | ||||||||||
bufferSweepInterval*: times.Duration | ||||||||||
|
||||||||||
ReliabilityManager* = ref object | ||||||||||
lamportTimestamp*: int64 | ||||||||||
messageHistory*: seq[MessageID] | ||||||||||
bloomFilter*: RollingBloomFilter | ||||||||||
outgoingBuffer*: seq[UnacknowledgedMessage] | ||||||||||
incomingBuffer*: seq[Message] | ||||||||||
channelId*: string | ||||||||||
config*: ReliabilityConfig | ||||||||||
lock*: Lock | ||||||||||
onMessageReady*: proc(messageId: MessageID) {.gcsafe.} | ||||||||||
onMessageSent*: proc(messageId: MessageID) {.gcsafe.} | ||||||||||
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} | ||||||||||
onPeriodicSync*: PeriodicSyncCallback | ||||||||||
|
||||||||||
ReliabilityError* = enum | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think is interesting to use
Suggested change
|
||||||||||
reInvalidArgument | ||||||||||
reOutOfMemory | ||||||||||
reInternalError | ||||||||||
reSerializationError | ||||||||||
reDeserializationError | ||||||||||
reMessageTooLarge | ||||||||||
|
||||||||||
proc defaultConfig*(): ReliabilityConfig = | ||||||||||
## Creates a default configuration for the ReliabilityManager. | ||||||||||
## | ||||||||||
## Returns: | ||||||||||
## A ReliabilityConfig object with default values. | ||||||||||
ReliabilityConfig( | ||||||||||
bloomFilterCapacity: DefaultBloomFilterCapacity, | ||||||||||
bloomFilterErrorRate: DefaultBloomFilterErrorRate, | ||||||||||
bloomFilterWindow: DefaultBloomFilterWindow, | ||||||||||
maxMessageHistory: DefaultMaxMessageHistory, | ||||||||||
maxCausalHistory: DefaultMaxCausalHistory, | ||||||||||
resendInterval: DefaultResendInterval, | ||||||||||
maxResendAttempts: DefaultMaxResendAttempts, | ||||||||||
syncMessageInterval: DefaultSyncMessageInterval, | ||||||||||
bufferSweepInterval: DefaultBufferSweepInterval | ||||||||||
) | ||||||||||
|
||||||||||
proc cleanup*(rm: ReliabilityManager) {.raises: [].} = | ||||||||||
if not rm.isNil: | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A nitpick comment. In "verbs" we tend to use parenthesis
Suggested change
|
||||||||||
{.gcsafe.}: | ||||||||||
try: | ||||||||||
rm.outgoingBuffer.setLen(0) | ||||||||||
rm.incomingBuffer.setLen(0) | ||||||||||
rm.messageHistory.setLen(0) | ||||||||||
except Exception as e: | ||||||||||
logError("Error during cleanup: " & e.msg) | ||||||||||
Comment on lines
+64
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
It is also interesting to use the most-specific exception type. |
||||||||||
|
||||||||||
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = | ||||||||||
withLock rm.lock: | ||||||||||
try: | ||||||||||
rm.bloomFilter.clean() | ||||||||||
except Exception as e: | ||||||||||
logError("Failed to clean ReliabilityManager bloom filter: " & e.msg) | ||||||||||
|
||||||||||
proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} = | ||||||||||
rm.messageHistory.add(msgId) | ||||||||||
if rm.messageHistory.len > rm.config.maxMessageHistory: | ||||||||||
rm.messageHistory.delete(0) | ||||||||||
|
||||||||||
proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} = | ||||||||||
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 | ||||||||||
|
||||||||||
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] = | ||||||||||
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] | ||||||||||
|
||||||||||
proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] = | ||||||||||
withLock rm.lock: | ||||||||||
result = rm.messageHistory | ||||||||||
|
||||||||||
proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = | ||||||||||
withLock rm.lock: | ||||||||||
result = rm.outgoingBuffer | ||||||||||
|
||||||||||
proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] = | ||||||||||
withLock rm.lock: | ||||||||||
result = rm.incomingBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the
reSerializationError
come from?