Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rolling bloom filter, reliability utils and protobuf #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

shash256
Copy link
Collaborator

@shash256 shash256 commented Jan 13, 2025

This PR adds the rolling bloom filter to the SDS API, that is built upon bloom.nim. This PR also adds protobuf for message, bloom filter serialization and deserialization, definitions & utility functions for the ReliabilityManager. The core functions and unit tests for ReliabilityManager would be in a subsequent PR.

Would greatly appreciate nim related review and comments from the nwaku team !!

Copy link

@Ivansete-status Ivansete-status left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for it! Some comments so far :) Ping me if any comment is unclear or you need me to review again

Comment on lines +63 to +67
let msgResult = Message.decode(data)
if msgResult.isOk:
ok(msgResult.get)
else:
err(reSerializationError)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let msgResult = Message.decode(data)
if msgResult.isOk:
ok(msgResult.get)
else:
err(reSerializationError)
let msg = Message.decode(data).valueOr:
return err("error in deserializeMessage: " & $error)

let pb = encode(msg)
ok(pb.buffer)
except:
err(reSerializationError)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the reSerializationError come from?

Comment on lines +109 to +112
var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len:
let start = i * sizeof(int)
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks beautiful but a couple of doubts:

  1. Do we have a Nim module that already does that?
  2. Should we care about the desired endianness?

wdyt @arnetheduck ?

onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback

ReliabilityError* = enum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is interesting to use pure enum which will show the string representation when converting to string ($). e.g., reSerializationError. It also makes the code more readable as it enforces the use of ReliabilityError.reSerializationError, f.e.

Suggested change
ReliabilityError* = enum
ReliabilityError* {.pure.} = enum

)

proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick comment. In "verbs" we tend to use parenthesis

Suggested change
if not rm.isNil:
if not rm.isNil():

Comment on lines +64 to +65
except Exception as e:
logError("Error during cleanup: " & e.msg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except Exception as e:
logError("Error during cleanup: " & e.msg)
except Exception:
logError("Error during cleanup: " & getCurrentExceptionMsg())

It is also interesting to use the most-specific exception type.

Comment on lines +40 to +41
except:
logError("Failed to initialize bloom filter")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except:
logError("Failed to initialize bloom filter")
except:
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg())

Copy link

@jm-clius jm-clius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good! I've requested a change wrt the rolling bloom filter design being based on duration - I think we'll get more predictable behaviour if we simply design it based on a maximum capacity with some variability.

Comment on lines +87 to +90
for msg in rbf.messages:
if msg.timestamp > cutoff:
newMessages.add(msg)
newFilter.insert(msg.id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the number of messages within the cutoff window is much higher than our capacity? From glancing at https://github.com/waku-org/nim-sds/blob/master/src/bloom.nim I don't think there will be any errors, but our false positive rate design will be way off.

To me it seems unnecessary to bring a time window consideration into the design here at all - we want SDS to work the same for a group communication at any rate. Perhaps therefore the rolling filter can operate on a "min" and "max" number of entries around the configured capacity? E.g. we can allow up to capacity + 20% of messages before triggering a clean, after which we add back the last capacity - 20% of messages? Something like that, but in any case avoiding the highly variable time element.

ReliabilityConfig* = object
bloomFilterCapacity*: int
bloomFilterErrorRate*: float
bloomFilterWindow*: times.Duration

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I argue elsewhere that we shouldn't bring Duration into the bloom filter design (as only number of inserted items affect the false positive design and message rates over specific time periods are extremely variable).

@jm-clius
Copy link

Side note: I think it's a good idea to mention the deliverable that this forms part of in the PR description. :) waku-org/pm#194

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants