From e227d058e5b09bd94a5ad9558e88debad53fa33e Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 29 Aug 2024 11:38:07 -0400 Subject: [PATCH] add specific ttl and peer selection --- vendor/negentropy | 1 + vendor/nph | 1 + waku/factory/node_factory.nim | 14 +-- waku/node/waku_node.nim | 8 +- waku/waku_rendezvous/common.nim | 3 +- waku/waku_rendezvous/protocol.nim | 144 +++++++++--------------------- 6 files changed, 60 insertions(+), 111 deletions(-) create mode 160000 vendor/negentropy create mode 160000 vendor/nph diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..f152076994 --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit f15207699493a129ac94448f930aa3d3fa748886 diff --git a/vendor/nph b/vendor/nph new file mode 160000 index 0000000000..de5cd4823e --- /dev/null +++ b/vendor/nph @@ -0,0 +1 @@ +Subproject commit de5cd4823e63424adb58ef3717524348ae6c4d87 diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 7aafa3b14e..ea5f6666c1 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -186,6 +186,14 @@ proc setupProtocols( protectedTopic = topicKey.topic, publicKey = topicKey.key node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics) + # Only relay nodes can be rendezvous points. + if conf.rendezvous: + try: + await mountRendezvous(node) + except CatchableError: + return + err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + # Keepalive mounted on all nodes try: await mountLibp2pPing(node) @@ -346,12 +354,6 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) - # Enable Rendezvous Discovery protocol - try: - await mountRendezvous(node, conf.rendezvous) - except CatchableError: - return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) - return ok() ## Start node diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 56e9a6a10d..b651cc35e6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1214,11 +1214,10 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) -proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} = +proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - node.wakuRendezvous = - WakuRendezVous.new(node.switch, node.peerManager, node.enr, enabled) + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) if node.started: try: @@ -1231,6 +1230,9 @@ proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} = except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() + # Always start discovering peers at startup + await node.wakuRendezvous.initialRequestAll() + proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 5ce8b2a7f2..06e1e494c7 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -5,7 +5,8 @@ import chronos import ../waku_enr/capabilities const DiscoverLimit* = 1000 -const DefaultRegistrationInterval* = 1.minutes +const DefaultRegistrationTTL* = 60.seconds +const DefaultRegistrationInterval* = 10.seconds proc computeNamespace*(clusterId: uint16, shard: uint16): string = var namespace = "rs/" diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 0d6290a73a..eaf921ae90 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -15,13 +15,14 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding, - ../waku_core/peers as peers, + ../waku_core/peers, + ../waku_core/topics, ./common logScope: topics = "waku rendez vous" -declarePublicCounter peerFound, "number of peers found via rendezvous" +declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" type WakuRendezVous* = ref object of RendezVous peerManager: PeerManager @@ -30,53 +31,38 @@ type WakuRendezVous* = ref object of RendezVous periodicRegistrationFut: Future[void] - enabled: bool - -proc advertise( - self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration +proc batchAdvertise*( + self: WakuRendezVous, + namespace: string, + ttl: Duration = MinimumDuration, + peers: seq[PeerId], ): Future[Result[void, string]] {.async.} = ## Register with all rendez vous peers under a namespace let catchable = catch: - await procCall RendezVous(self).advertise(namespace, ttl) + await procCall RendezVous(self).batchAdvertise(namespace, ttl, peers) if catchable.isErr(): return err(catchable.error.msg) return ok() -proc request( - self: WakuRendezVous, namespace: string, count: int = DiscoverLimit +proc batchRequest*( + self: WakuRendezVous, + namespace: string, + count: int = DiscoverLimit, + peers: seq[PeerId], ): Future[Result[seq[PeerRecord], string]] {.async.} = ## Request all records from all rendez vous peers with matching a namespace let catchable = catch: - await RendezVous(self).request(namespace, count) + await RendezVous(self).batchRequest(namespace, count, peers) if catchable.isErr(): return err(catchable.error.msg) return ok(catchable.get()) -proc requestLocally(self: WakuRendezVous, namespace: string): seq[PeerRecord] = - RendezVous(self).requestLocally(namespace) - -proc unsubscribeLocally(self: WakuRendezVous, namespace: string) = - RendezVous(self).unsubscribeLocally(namespace) - -proc unsubscribe( - self: WakuRendezVous, namespace: string -): Future[Result[void, string]] {.async.} = - ## Unsubscribe from all rendez vous peers including locally - - let catchable = catch: - await RendezVous(self).unsubscribe(namespace) - - if catchable.isErr(): - return err(catchable.error.msg) - - return ok() - proc getRelayShards(enr: enr.Record): Option[RelayShards] = let typedRecord = enr.toTyped().valueOr: return none(RelayShards) @@ -84,11 +70,7 @@ proc getRelayShards(enr: enr.Record): Option[RelayShards] = return typedRecord.relaySharding() proc new*( - T: type WakuRendezVous, - switch: Switch, - peerManager: PeerManager, - enr: Record, - enabled: bool, + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record ): T = let relayshard = getRelayShards(enr).valueOr: warn "Using default cluster id 0" @@ -97,10 +79,7 @@ proc new*( let capabilities = enr.getCapabilities() let wrv = WakuRendezVous( - peerManager: peerManager, - relayshard: relayshard, - capabilities: capabilities, - enabled: enabled, + peerManager: peerManager, relayshard: relayshard, capabilities: capabilities ) RendezVous(wrv).setup(switch) @@ -112,35 +91,19 @@ proc new*( return wrv -proc advertisementNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - for cap in self.capabilities: - computeNamespace(self.relayShard.clusterId, shard, cap) - - return namespaces - -proc requestNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - for cap in Capabilities: - computeNamespace(self.relayShard.clusterId, shard, cap) - - return namespaces +proc advertiseAll(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() -proc shardOnlyNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - computeNamespace(self.relayShard.clusterId, shard) - - return namespaces + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) -proc advertiseAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue - let futs = collect(newSeq): - for namespace in namespaces: - self.advertise(namespace, 1.minutes) + # Advertise yourself on that peer + self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) let handles = await allFinished(futs) @@ -148,16 +111,23 @@ proc advertiseAll*(self: WakuRendezVous) {.async.} = let res = fut.read if res.isErr(): - warn "failed to advertise", error = res.error + warn "rendezvous advertise failed", error = res.error debug "waku rendezvous advertisements finished", adverCount = handles.len -proc requestAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() +proc initialRequestAll*(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): - for namespace in namespaces: - self.request(namespace) + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue + + # Ask for 12 peer records for that shard + self.batchRequest(namespace, 12, @[rpi.peerId]) let handles = await allFinished(futs) @@ -165,59 +135,31 @@ proc requestAll*(self: WakuRendezVous) {.async.} = let res = fut.read if res.isErr(): - warn "failed to request", error = res.error + warn "rendezvous request failed", error = res.error else: for peer in res.get(): - peerFound.inc() + peerFoundTotal.inc() self.peerManager.addPeer(peer) debug "waku rendezvous requests finished", requestCount = handles.len -proc unsubcribeAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() - - let futs = collect(newSeq): - for namespace in namespaces: - self.unsubscribe(namespace) - - let handles = await allFinished(futs) - - for fut in handles: - let res = fut.read - - if res.isErr(): - warn "failed to unsubcribe", error = res.error - - debug "waku rendezvous unsubscriptions finished", unsubCount = handles.len - - return - proc periodicRegistration(self: WakuRendezVous) {.async.} = debug "waku rendezvous periodic registration started", interval = DefaultRegistrationInterval # infinite loop while true: - # default ttl of registration is the same as default interval - await self.advertiseAll() - await sleepAsync(DefaultRegistrationInterval) -proc start*(self: WakuRendezVous) {.async.} = - await self.requestAll() - - if not self.enabled: - return + await self.advertiseAll() +proc start*(self: WakuRendezVous) {.async.} = debug "starting waku rendezvous discovery" # start registering forever self.periodicRegistrationFut = self.periodicRegistration() proc stopWait*(self: WakuRendezVous) {.async.} = - if not self.enabled: - return - debug "stopping waku rendezvous discovery" if not self.periodicRegistrationFut.isNil():