-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add rolling bloom filter, reliability utils and protobuf #4
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for it! Some comments so far :) Ping me if any comment is unclear or you need me to review again
let msgResult = Message.decode(data) | ||
if msgResult.isOk: | ||
ok(msgResult.get) | ||
else: | ||
err(reSerializationError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let msgResult = Message.decode(data) | |
if msgResult.isOk: | |
ok(msgResult.get) | |
else: | |
err(reSerializationError) | |
let msg = Message.decode(data).valueOr: | |
return err("error in deserializeMessage: " & $error) |
let pb = encode(msg) | ||
ok(pb.buffer) | ||
except: | ||
err(reSerializationError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the reSerializationError
come from?
var intArray = newSeq[int](bytes.len div sizeof(int)) | ||
for i in 0 ..< intArray.len: | ||
let start = i * sizeof(int) | ||
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks beautiful but a couple of doubts:
- Do we have a Nim module that already does that?
- Should we care about the desired endianness?
wdyt @arnetheduck ?
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} | ||
onPeriodicSync*: PeriodicSyncCallback | ||
|
||
ReliabilityError* = enum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is interesting to use pure
enum which will show the string representation when converting to string
($
). e.g., reSerializationError
. It also makes the code more readable as it enforces the use of ReliabilityError.reSerializationError
, f.e.
ReliabilityError* = enum | |
ReliabilityError* {.pure.} = enum |
) | ||
|
||
proc cleanup*(rm: ReliabilityManager) {.raises: [].} = | ||
if not rm.isNil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nitpick comment. In "verbs" we tend to use parenthesis
if not rm.isNil: | |
if not rm.isNil(): |
except Exception as e: | ||
logError("Error during cleanup: " & e.msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except Exception as e: | |
logError("Error during cleanup: " & e.msg) | |
except Exception: | |
logError("Error during cleanup: " & getCurrentExceptionMsg()) |
It is also interesting to use the most-specific exception type.
except: | ||
logError("Failed to initialize bloom filter") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except: | |
logError("Failed to initialize bloom filter") | |
except: | |
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good! I've requested a change wrt the rolling bloom filter design being based on duration - I think we'll get more predictable behaviour if we simply design it based on a maximum capacity with some variability.
for msg in rbf.messages: | ||
if msg.timestamp > cutoff: | ||
newMessages.add(msg) | ||
newFilter.insert(msg.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the number of messages within the cutoff window is much higher than our capacity
? From glancing at https://github.com/waku-org/nim-sds/blob/master/src/bloom.nim I don't think there will be any errors, but our false positive rate design will be way off.
To me it seems unnecessary to bring a time window consideration into the design here at all - we want SDS to work the same for a group communication at any rate. Perhaps therefore the rolling filter can operate on a "min" and "max" number of entries around the configured capacity
? E.g. we can allow up to capacity + 20%
of messages before triggering a clean, after which we add back the last capacity - 20%
of messages? Something like that, but in any case avoiding the highly variable time element.
ReliabilityConfig* = object | ||
bloomFilterCapacity*: int | ||
bloomFilterErrorRate*: float | ||
bloomFilterWindow*: times.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I argue elsewhere that we shouldn't bring Duration
into the bloom filter design (as only number of inserted items affect the false positive design and message rates over specific time periods are extremely variable).
Side note: I think it's a good idea to mention the deliverable that this forms part of in the PR description. :) waku-org/pm#194 |
This PR adds the rolling bloom filter to the SDS API, that is built upon bloom.nim. This PR also adds protobuf for message, bloom filter serialization and deserialization, definitions & utility functions for the ReliabilityManager. The core functions and unit tests for ReliabilityManager would be in a subsequent PR.
Would greatly appreciate nim related review and comments from the nwaku team !!