From a5d93105d7035c9dd49348d9c65c331084277c32 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 8 Aug 2024 10:43:19 -0400 Subject: [PATCH 01/17] waku rendezvous --- waku/factory/external_config.nim | 7 + waku/factory/node_factory.nim | 13 +- waku/node/waku_node.nim | 22 ++- waku/waku_rendezvous.nim | 3 + waku/waku_rendezvous/common.nim | 28 ++++ waku/waku_rendezvous/protocol.nim | 226 ++++++++++++++++++++++++++++++ 6 files changed, 286 insertions(+), 13 deletions(-) create mode 100644 waku/waku_rendezvous.nim create mode 100644 waku/waku_rendezvous/common.nim create mode 100644 waku/waku_rendezvous/protocol.nim diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 01db15fcb2..07f574b9a3 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -647,6 +647,13 @@ with the drawback of consuming some more bandwitdh.""", name: "peer-exchange-node" .}: string + ## Rendez vous + rendezvous* {. + desc: "Enable waku rendezvous discovery server", + defaultValue: false, + name: "rendezvous" + .}: bool + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d98a99546d..ce2cd4eff3 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -207,13 +207,6 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Enable Rendezvous Discovery protocol when Relay is enabled - try: - await mountRendezvous(node) - except CatchableError: - return - err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) - # Keepalive mounted on all nodes try: await mountLibp2pPing(node) @@ -372,6 +365,12 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + # Enable Rendezvous Discovery protocol + try: + await mountRendezvous(node, conf.rendezvous) + except CatchableError: + return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + return ok() ## Start node diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 50b65bfc5a..2ae9a3cace 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,7 +17,6 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/connectivity/autonat/client, libp2p/protocols/connectivity/autonat/service, - libp2p/protocols/rendezvous, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -39,6 +38,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_rendezvous/protocol, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -110,7 +110,7 @@ type enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext - rendezvous*: RendezVous + wakuRendezvous*: WakuRendezVous announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] @@ -1214,23 +1214,24 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = asyncSpawn node.keepaliveLoop(keepalive) -proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = +proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" try: - node.rendezvous = RendezVous.new(node.switch) + node.wakuRendezvous = + WakuRendezVous.new(node.switch, node.peerManager, node.enr, enabled) except Exception as e: error "failed to create rendezvous", error = getCurrentExceptionMsg() return if node.started: try: - await node.rendezvous.start() + await node.wakuRendezvous.start() except CatchableError: error "failed to start rendezvous", error = getCurrentExceptionMsg() try: - node.switch.mount(node.rendezvous) + node.switch.mount(node.wakuRendezvous) except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() @@ -1304,6 +1305,12 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() + if not node.wakuRendezvous.isNil(): + try: + await node.wakuRendezvous.start() + except CatchableError: + error "failed to start rendezvous", error = getCurrentExceptionMsg() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1346,6 +1353,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.stop() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_rendezvous.nim b/waku/waku_rendezvous.nim new file mode 100644 index 0000000000..b07f1f7274 --- /dev/null +++ b/waku/waku_rendezvous.nim @@ -0,0 +1,3 @@ +import ./waku_rendezvous/protocol + +export protocol diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim new file mode 100644 index 0000000000..2488025bda --- /dev/null +++ b/waku/waku_rendezvous/common.nim @@ -0,0 +1,28 @@ +{.push raises: [].} + +import chronos + +import ../waku_enr/capabilities + +const DiscoverLimit* = 1000 +const DefaultRegistrationInterval* = 2.hours + +proc computeNamespace*(clusterId: uint16, shard: uint16): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + + return namespace + +proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + namespace &= '/' + namespace &= $cap + + return namespace diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim new file mode 100644 index 0000000000..f26746d4f9 --- /dev/null +++ b/waku/waku_rendezvous/protocol.nim @@ -0,0 +1,226 @@ +{.push raises: [].} + +import + std/[sugar, options], + results, + chronos, + chronicles, + metrics, + libp2p/protocols/rendezvous, + libp2p/switch, + libp2p/utility + +import + ../node/peer_manager, + ../common/enr, + ../waku_enr/capabilities, + ../waku_enr/sharding, + ../waku_core/peers as peers, + ./common + +logScope: + topics = "waku rendez vous" + +declarePublicCounter peerFound, "number of peers found via rendezvous" + +type WakuRendezVous* = ref object of RendezVous + peerManager: PeerManager + relayShard: RelayShards + capabilities: seq[Capabilities] + + periodicRegistrationFut: Future[void] + + clientOnly: bool + +proc advertise( + self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration +): Future[Result[void, string]] {.async.} = + ## Register with all rendez vous peers under a namespace + + let catchable = catch: + await procCall RendezVous(self).advertise(namespace, ttl) + + if catchable.isErr(): + return err(catchable.error.msg) + + return ok() + +proc request( + self: WakuRendezVous, namespace: string, count: int = DiscoverLimit +): Future[Result[seq[PeerRecord], string]] {.async.} = + ## Request all records from all rendez vous peers with matching a namespace + + let catchable = catch: + await RendezVous(self).request(namespace, count) + + if catchable.isErr(): + return err(catchable.error.msg) + + return ok(catchable.get()) + +proc requestLocally(self: WakuRendezVous, namespace: string): seq[PeerRecord] = + RendezVous(self).requestLocally(namespace) + +proc unsubscribeLocally(self: WakuRendezVous, namespace: string) = + RendezVous(self).unsubscribeLocally(namespace) + +proc unsubscribe( + self: WakuRendezVous, namespace: string +): Future[Result[void, string]] {.async.} = + ## Unsubscribe from all rendez vous peers including locally + + let catchable = catch: + await RendezVous(self).unsubscribe(namespace) + + if catchable.isErr(): + return err(catchable.error.msg) + + return ok() + +proc getRelayShards(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() + +proc new*( + T: type WakuRendezVous, + switch: Switch, + peerManager: PeerManager, + enr: Record, + enabled: bool, +): T = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let wrv = WakuRendezVous( + peerManager: peerManager, + relayshard: relayshard, + capabilities: capabilities, + clientOnly: clientOnly, + ) + + RendezVous(wrv).setup(switch) + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return wrv + +proc advertisementNamespaces(self: WakuRendezVous): seq[string] = + let namespaces = collect(newSeq): + for shard in self.relayShard.shardIds: + for cap in self.capabilities: + computeNamespace(self.relayShard.clusterId, shard, cap) + + return namespaces + +proc requestNamespaces(self: WakuRendezVous): seq[string] = + let namespaces = collect(newSeq): + for shard in self.relayShard.shardIds: + for cap in Capabilities: + computeNamespace(self.relayShard.clusterId, shard, cap) + + return namespaces + +proc shardOnlyNamespaces(self: WakuRendezVous): seq[string] = + let namespaces = collect(newSeq): + for shard in self.relayShard.shardIds: + computeNamespace(self.relayShard.clusterId, shard) + + return namespaces + +proc advertiseAll*(self: WakuRendezVous) {.async.} = + let namespaces = self.shardOnlyNamespaces() + + let futs = collect(newSeq): + for namespace in namespaces: + self.advertise(namespace) + + let handles = await allFinished(futs) + + for fut in handles: + let res = fut.read + + if res.isErr(): + warn "failed to advertise", error = res.error + + debug "waku rendezvous advertisements finished", adverCount = handles.len + +proc requestAll*(self: WakuRendezVous) {.async.} = + let namespaces = self.shardOnlyNamespaces() + + let futs = collect(newSeq): + for namespace in namespaces: + self.request(namespace) + + let handles = await allFinished(futs) + + for fut in handles: + let res = fut.read + + if res.isErr(): + warn "failed to request", error = res.error + else: + for peer in res.get(): + peerFound.inc() + self.peerManager.addPeer(peer) + + debug "waku rendezvous requests finished", requestCount = handles.len + +proc unsubcribeAll*(self: WakuRendezVous) {.async.} = + let namespaces = self.shardOnlyNamespaces() + + let futs = collect(newSeq): + for namespace in namespaces: + self.unsubscribe(namespace) + + let handles = await allFinished(futs) + + for fut in handles: + let res = fut.read + + if res.isErr(): + warn "failed to unsubcribe", error = res.error + + debug "waku rendezvous unsubscriptions finished", unsubCount = handles.len + + return + +proc periodicRegistration(self: WakuRendezVous) {.async.} = + debug "waku rendezvous periodic registration started", + interval = DefaultRegistrationInterval + + # infinite loop + while true: + # default ttl of registration is the same as default interval + await self.advertiseAll() + + await sleepAsync(DefaultRegistrationInterval) + +proc start*(self: WakuRendezVous) {.async.} = + await self.requestAll() + + if not self.enabled: + return + + debug "starting waku rendezvous discovery" + + # start registering forever + self.periodicRegistrationFut = self.periodicRegistration() + +proc stopWait*(self: WakuRendezVous) {.async.} = + if not self.enabled: + return + + debug "stopping waku rendezvous discovery" + + await self.unsubcribeAll() + + if not self.periodicRegistrationFut.isNil(): + await self.periodicRegistrationFut.cancelAndWait() From aed39247eda5cddd8a36f2267889389e1328148a Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 13 Aug 2024 07:17:26 -0400 Subject: [PATCH 02/17] changed interval & fixes --- waku/waku_rendezvous/common.nim | 2 +- waku/waku_rendezvous/protocol.nim | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 2488025bda..5ce8b2a7f2 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -5,7 +5,7 @@ import chronos import ../waku_enr/capabilities const DiscoverLimit* = 1000 -const DefaultRegistrationInterval* = 2.hours +const DefaultRegistrationInterval* = 1.minutes proc computeNamespace*(clusterId: uint16, shard: uint16): string = var namespace = "rs/" diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index f26746d4f9..0d6290a73a 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -30,7 +30,7 @@ type WakuRendezVous* = ref object of RendezVous periodicRegistrationFut: Future[void] - clientOnly: bool + enabled: bool proc advertise( self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration @@ -100,7 +100,7 @@ proc new*( peerManager: peerManager, relayshard: relayshard, capabilities: capabilities, - clientOnly: clientOnly, + enabled: enabled, ) RendezVous(wrv).setup(switch) @@ -140,7 +140,7 @@ proc advertiseAll*(self: WakuRendezVous) {.async.} = let futs = collect(newSeq): for namespace in namespaces: - self.advertise(namespace) + self.advertise(namespace, 1.minutes) let handles = await allFinished(futs) @@ -220,7 +220,5 @@ proc stopWait*(self: WakuRendezVous) {.async.} = debug "stopping waku rendezvous discovery" - await self.unsubcribeAll() - if not self.periodicRegistrationFut.isNil(): await self.periodicRegistrationFut.cancelAndWait() From f6a82db05826ab49809e1283879c13999d401ac8 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 29 Aug 2024 11:38:07 -0400 Subject: [PATCH 03/17] add specific ttl and peer selection --- vendor/negentropy | 1 + waku/factory/node_factory.nim | 14 +-- waku/node/waku_node.nim | 8 +- waku/waku_rendezvous/common.nim | 3 +- waku/waku_rendezvous/protocol.nim | 144 +++++++++--------------------- 5 files changed, 59 insertions(+), 111 deletions(-) create mode 160000 vendor/negentropy diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..f152076994 --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit f15207699493a129ac94448f930aa3d3fa748886 diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index ce2cd4eff3..84025fd7f3 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -207,6 +207,14 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) + # Only relay nodes can be rendezvous points. + if conf.rendezvous: + try: + await mountRendezvous(node) + except CatchableError: + return + err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + # Keepalive mounted on all nodes try: await mountLibp2pPing(node) @@ -365,12 +373,6 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) - # Enable Rendezvous Discovery protocol - try: - await mountRendezvous(node, conf.rendezvous) - except CatchableError: - return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) - return ok() ## Start node diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2ae9a3cace..4d985f3db6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1214,12 +1214,11 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = asyncSpawn node.keepaliveLoop(keepalive) -proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} = +proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" try: - node.wakuRendezvous = - WakuRendezVous.new(node.switch, node.peerManager, node.enr, enabled) + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) except Exception as e: error "failed to create rendezvous", error = getCurrentExceptionMsg() return @@ -1235,6 +1234,9 @@ proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} = except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() + # Always start discovering peers at startup + await node.wakuRendezvous.initialRequestAll() + proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 5ce8b2a7f2..06e1e494c7 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -5,7 +5,8 @@ import chronos import ../waku_enr/capabilities const DiscoverLimit* = 1000 -const DefaultRegistrationInterval* = 1.minutes +const DefaultRegistrationTTL* = 60.seconds +const DefaultRegistrationInterval* = 10.seconds proc computeNamespace*(clusterId: uint16, shard: uint16): string = var namespace = "rs/" diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 0d6290a73a..eaf921ae90 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -15,13 +15,14 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding, - ../waku_core/peers as peers, + ../waku_core/peers, + ../waku_core/topics, ./common logScope: topics = "waku rendez vous" -declarePublicCounter peerFound, "number of peers found via rendezvous" +declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" type WakuRendezVous* = ref object of RendezVous peerManager: PeerManager @@ -30,53 +31,38 @@ type WakuRendezVous* = ref object of RendezVous periodicRegistrationFut: Future[void] - enabled: bool - -proc advertise( - self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration +proc batchAdvertise*( + self: WakuRendezVous, + namespace: string, + ttl: Duration = MinimumDuration, + peers: seq[PeerId], ): Future[Result[void, string]] {.async.} = ## Register with all rendez vous peers under a namespace let catchable = catch: - await procCall RendezVous(self).advertise(namespace, ttl) + await procCall RendezVous(self).batchAdvertise(namespace, ttl, peers) if catchable.isErr(): return err(catchable.error.msg) return ok() -proc request( - self: WakuRendezVous, namespace: string, count: int = DiscoverLimit +proc batchRequest*( + self: WakuRendezVous, + namespace: string, + count: int = DiscoverLimit, + peers: seq[PeerId], ): Future[Result[seq[PeerRecord], string]] {.async.} = ## Request all records from all rendez vous peers with matching a namespace let catchable = catch: - await RendezVous(self).request(namespace, count) + await RendezVous(self).batchRequest(namespace, count, peers) if catchable.isErr(): return err(catchable.error.msg) return ok(catchable.get()) -proc requestLocally(self: WakuRendezVous, namespace: string): seq[PeerRecord] = - RendezVous(self).requestLocally(namespace) - -proc unsubscribeLocally(self: WakuRendezVous, namespace: string) = - RendezVous(self).unsubscribeLocally(namespace) - -proc unsubscribe( - self: WakuRendezVous, namespace: string -): Future[Result[void, string]] {.async.} = - ## Unsubscribe from all rendez vous peers including locally - - let catchable = catch: - await RendezVous(self).unsubscribe(namespace) - - if catchable.isErr(): - return err(catchable.error.msg) - - return ok() - proc getRelayShards(enr: enr.Record): Option[RelayShards] = let typedRecord = enr.toTyped().valueOr: return none(RelayShards) @@ -84,11 +70,7 @@ proc getRelayShards(enr: enr.Record): Option[RelayShards] = return typedRecord.relaySharding() proc new*( - T: type WakuRendezVous, - switch: Switch, - peerManager: PeerManager, - enr: Record, - enabled: bool, + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record ): T = let relayshard = getRelayShards(enr).valueOr: warn "Using default cluster id 0" @@ -97,10 +79,7 @@ proc new*( let capabilities = enr.getCapabilities() let wrv = WakuRendezVous( - peerManager: peerManager, - relayshard: relayshard, - capabilities: capabilities, - enabled: enabled, + peerManager: peerManager, relayshard: relayshard, capabilities: capabilities ) RendezVous(wrv).setup(switch) @@ -112,35 +91,19 @@ proc new*( return wrv -proc advertisementNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - for cap in self.capabilities: - computeNamespace(self.relayShard.clusterId, shard, cap) - - return namespaces - -proc requestNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - for cap in Capabilities: - computeNamespace(self.relayShard.clusterId, shard, cap) - - return namespaces +proc advertiseAll(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() -proc shardOnlyNamespaces(self: WakuRendezVous): seq[string] = - let namespaces = collect(newSeq): - for shard in self.relayShard.shardIds: - computeNamespace(self.relayShard.clusterId, shard) - - return namespaces + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) -proc advertiseAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue - let futs = collect(newSeq): - for namespace in namespaces: - self.advertise(namespace, 1.minutes) + # Advertise yourself on that peer + self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) let handles = await allFinished(futs) @@ -148,16 +111,23 @@ proc advertiseAll*(self: WakuRendezVous) {.async.} = let res = fut.read if res.isErr(): - warn "failed to advertise", error = res.error + warn "rendezvous advertise failed", error = res.error debug "waku rendezvous advertisements finished", adverCount = handles.len -proc requestAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() +proc initialRequestAll*(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): - for namespace in namespaces: - self.request(namespace) + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue + + # Ask for 12 peer records for that shard + self.batchRequest(namespace, 12, @[rpi.peerId]) let handles = await allFinished(futs) @@ -165,59 +135,31 @@ proc requestAll*(self: WakuRendezVous) {.async.} = let res = fut.read if res.isErr(): - warn "failed to request", error = res.error + warn "rendezvous request failed", error = res.error else: for peer in res.get(): - peerFound.inc() + peerFoundTotal.inc() self.peerManager.addPeer(peer) debug "waku rendezvous requests finished", requestCount = handles.len -proc unsubcribeAll*(self: WakuRendezVous) {.async.} = - let namespaces = self.shardOnlyNamespaces() - - let futs = collect(newSeq): - for namespace in namespaces: - self.unsubscribe(namespace) - - let handles = await allFinished(futs) - - for fut in handles: - let res = fut.read - - if res.isErr(): - warn "failed to unsubcribe", error = res.error - - debug "waku rendezvous unsubscriptions finished", unsubCount = handles.len - - return - proc periodicRegistration(self: WakuRendezVous) {.async.} = debug "waku rendezvous periodic registration started", interval = DefaultRegistrationInterval # infinite loop while true: - # default ttl of registration is the same as default interval - await self.advertiseAll() - await sleepAsync(DefaultRegistrationInterval) -proc start*(self: WakuRendezVous) {.async.} = - await self.requestAll() - - if not self.enabled: - return + await self.advertiseAll() +proc start*(self: WakuRendezVous) {.async.} = debug "starting waku rendezvous discovery" # start registering forever self.periodicRegistrationFut = self.periodicRegistration() proc stopWait*(self: WakuRendezVous) {.async.} = - if not self.enabled: - return - debug "stopping waku rendezvous discovery" if not self.periodicRegistrationFut.isNil(): From e181df939668e065860aa7ce13ceef7c5337a09d Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 25 Sep 2024 09:27:13 -0400 Subject: [PATCH 04/17] remove vendor --- vendor/negentropy | 1 - 1 file changed, 1 deletion(-) delete mode 160000 vendor/negentropy diff --git a/vendor/negentropy b/vendor/negentropy deleted file mode 160000 index f152076994..0000000000 --- a/vendor/negentropy +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f15207699493a129ac94448f930aa3d3fa748886 From 1470d1c77c5a28d8c8e6d022150fde16a519e39b Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 25 Sep 2024 09:29:37 -0400 Subject: [PATCH 05/17] named procs --- waku/waku_rendezvous/protocol.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index eaf921ae90..163ceb1f65 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -40,7 +40,7 @@ proc batchAdvertise*( ## Register with all rendez vous peers under a namespace let catchable = catch: - await procCall RendezVous(self).batchAdvertise(namespace, ttl, peers) + await procCall RendezVous(self).advertise(namespace, ttl, peers) if catchable.isErr(): return err(catchable.error.msg) @@ -56,7 +56,7 @@ proc batchRequest*( ## Request all records from all rendez vous peers with matching a namespace let catchable = catch: - await RendezVous(self).batchRequest(namespace, count, peers) + await RendezVous(self).request(namespace, count, peers) if catchable.isErr(): return err(catchable.error.msg) @@ -103,7 +103,7 @@ proc advertiseAll(self: WakuRendezVous) {.async.} = continue # Advertise yourself on that peer - self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) let handles = await allFinished(futs) From 601428383d98859043768d6d501ce9dafd604469 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 29 Nov 2024 11:14:19 -0500 Subject: [PATCH 06/17] error handling & clean-up --- waku/factory/waku.nim | 2 +- waku/node/waku_node.nim | 18 ++--- waku/waku_rendezvous/protocol.nim | 119 +++++++++++++++++------------- 3 files changed, 74 insertions(+), 65 deletions(-) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 48d75cfd15..e9fbb48955 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -377,7 +377,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ## libp2p DiscoveryManager waku[].discoveryMngr = DiscoveryManager() waku[].discoveryMngr.add( - RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes) + RendezVousInterface.new(rdv = waku[].node.wakuRendezvous, tta = 1.minutes) ) if not isNil(waku[].node.wakuRelay): for topic in waku[].node.wakuRelay.getSubscribedTopics(): diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4d985f3db6..fe279fad11 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1217,26 +1217,20 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - try: - node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) - except Exception as e: - error "failed to create rendezvous", error = getCurrentExceptionMsg() - return + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) + + # Always start discovering peers at startup + (await node.wakuRendezvous.initialRequestAll()).isOkOr: + error "rendezvous failed initial requests", error = error if node.started: - try: - await node.wakuRendezvous.start() - except CatchableError: - error "failed to start rendezvous", error = getCurrentExceptionMsg() + await node.wakuRendezvous.start() try: node.switch.mount(node.wakuRendezvous) except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() - # Always start discovering peers at startup - await node.wakuRendezvous.initialRequestAll() - proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 163ceb1f65..1e7bba8a68 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -36,8 +36,8 @@ proc batchAdvertise*( namespace: string, ttl: Duration = MinimumDuration, peers: seq[PeerId], -): Future[Result[void, string]] {.async.} = - ## Register with all rendez vous peers under a namespace +): Future[Result[void, string]] {.async: (raises: []).} = + ## Register with all rendezvous peers under a namespace let catchable = catch: await procCall RendezVous(self).advertise(namespace, ttl, peers) @@ -52,8 +52,8 @@ proc batchRequest*( namespace: string, count: int = DiscoverLimit, peers: seq[PeerId], -): Future[Result[seq[PeerRecord], string]] {.async.} = - ## Request all records from all rendez vous peers with matching a namespace +): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = + ## Request all records from all rendezvous peers matching a namespace let catchable = catch: await RendezVous(self).request(namespace, count, peers) @@ -63,35 +63,9 @@ proc batchRequest*( return ok(catchable.get()) -proc getRelayShards(enr: enr.Record): Option[RelayShards] = - let typedRecord = enr.toTyped().valueOr: - return none(RelayShards) - - return typedRecord.relaySharding() - -proc new*( - T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record -): T = - let relayshard = getRelayShards(enr).valueOr: - warn "Using default cluster id 0" - RelayShards(clusterID: 0, shardIds: @[]) - - let capabilities = enr.getCapabilities() - - let wrv = WakuRendezVous( - peerManager: peerManager, relayshard: relayshard, capabilities: capabilities - ) - - RendezVous(wrv).setup(switch) - - debug "waku rendezvous initialized", - cluster = relayshard.clusterId, - shards = relayshard.shardIds, - capabilities = capabilities - - return wrv - -proc advertiseAll(self: WakuRendezVous) {.async.} = +proc advertiseAll( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): @@ -105,17 +79,23 @@ proc advertiseAll(self: WakuRendezVous) {.async.} = # Advertise yourself on that peer self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) - let handles = await allFinished(futs) + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) - for fut in handles: - let res = fut.read + for fut in catchable.get(): + if fut.failed(): + warn "rendezvous advertisement failed", error = fut.error.msg - if res.isErr(): - warn "rendezvous advertise failed", error = res.error + debug "waku rendezvous advertisements finished" - debug "waku rendezvous advertisements finished", adverCount = handles.len + return ok() -proc initialRequestAll*(self: WakuRendezVous) {.async.} = +proc initialRequestAll*( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): @@ -127,21 +107,27 @@ proc initialRequestAll*(self: WakuRendezVous) {.async.} = continue # Ask for 12 peer records for that shard - self.batchRequest(namespace, 12, @[rpi.peerId]) + self.request(namespace, 12, @[rpi.peerId]) - let handles = await allFinished(futs) + let catchable = catch: + await allFinished(futs) - for fut in handles: - let res = fut.read + if catchable.isErr(): + return err(catchable.error.msg) - if res.isErr(): - warn "rendezvous request failed", error = res.error - else: - for peer in res.get(): + for fut in catchable.get(): + if fut.failed(): + warn "rendezvous request failed", error = fut.error.msg + elif fut.finished(): + let peers = fut.value() + + for peer in peers: peerFoundTotal.inc() self.peerManager.addPeer(peer) - debug "waku rendezvous requests finished", requestCount = handles.len + debug "waku rendezvous requests finished" + + return ok() proc periodicRegistration(self: WakuRendezVous) {.async.} = debug "waku rendezvous periodic registration started", @@ -151,15 +137,44 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = while true: await sleepAsync(DefaultRegistrationInterval) - await self.advertiseAll() + (await self.advertiseAll()).isOkOr: + debug "waku rendezvous advertisements failed", error = error + +proc getRelayShards(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() + +proc new*( + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record +): T {.raises: [].} = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let wrv = WakuRendezVous( + peerManager: peerManager, relayshard: relayshard, capabilities: capabilities + ) + + RendezVous(wrv).setup(switch) + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return wrv -proc start*(self: WakuRendezVous) {.async.} = +proc start*(self: WakuRendezVous) {.async: (raises: []).} = debug "starting waku rendezvous discovery" # start registering forever self.periodicRegistrationFut = self.periodicRegistration() -proc stopWait*(self: WakuRendezVous) {.async.} = +proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = debug "stopping waku rendezvous discovery" if not self.periodicRegistrationFut.isNil(): From 316088eec2d59f123374fe81af674983dfe66d7f Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 29 Nov 2024 11:20:27 -0500 Subject: [PATCH 07/17] remove useless try/catch --- waku/node/waku_node.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index fe279fad11..00e3934082 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1302,10 +1302,7 @@ proc start*(node: WakuNode) {.async.} = await node.wakuStoreResume.start() if not node.wakuRendezvous.isNil(): - try: - await node.wakuRendezvous.start() - except CatchableError: - error "failed to start rendezvous", error = getCurrentExceptionMsg() + await node.wakuRendezvous.start() ## The switch uses this mapper to update peer info addrs ## with announced addrs after start From c6e341f6dbfb1204793b7161bb41a68b8bf1f04e Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 29 Nov 2024 11:23:37 -0500 Subject: [PATCH 08/17] removing another try/catch --- waku/factory/node_factory.nim | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 84025fd7f3..fe4da7c9d0 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -207,13 +207,9 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Only relay nodes can be rendezvous points. + # Only relay nodes should be rendezvous points. if conf.rendezvous: - try: - await mountRendezvous(node) - except CatchableError: - return - err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + await mountRendezvous(node) # Keepalive mounted on all nodes try: From 3dbd22f391602eb2c61e605441d2e995cb11a329 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 29 Nov 2024 11:32:12 -0500 Subject: [PATCH 09/17] fix test --- tests/factory/test_node_factory.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/factory/test_node_factory.nim b/tests/factory/test_node_factory.nim index bc3dc0f80d..c575c2b81a 100644 --- a/tests/factory/test_node_factory.nim +++ b/tests/factory/test_node_factory.nim @@ -17,7 +17,7 @@ suite "Node Factory": node.wakuStore.isNil() node.wakuFilter.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() test "Set up a node with Store enabled": var conf = defaultTestWakuNodeConf() From ad9dc419674ab0da2851cc147523c7c552a8d035 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 29 Nov 2024 15:47:33 -0500 Subject: [PATCH 10/17] rendezvous default true --- waku/factory/external_config.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 07f574b9a3..0c6b5b551c 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -650,7 +650,7 @@ with the drawback of consuming some more bandwitdh.""", ## Rendez vous rendezvous* {. desc: "Enable waku rendezvous discovery server", - defaultValue: false, + defaultValue: true, name: "rendezvous" .}: bool From b679a548a634a6e314ef399f54802495c192ce58 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 3 Dec 2024 11:37:00 -0500 Subject: [PATCH 11/17] test & fixes --- tests/test_waku_rendezvous.nim | 88 +++++++++++++++---------------- waku/node/waku_node.nim | 6 ++- waku/waku_rendezvous/common.nim | 1 + waku/waku_rendezvous/protocol.nim | 43 +++++++++------ 4 files changed, 77 insertions(+), 61 deletions(-) diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index c005a8f341..438c4aedc9 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -1,51 +1,51 @@ {.used.} -import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous +import chronos, testutils/unittests, libp2p/builders -import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore - -proc newRendezvousClientSwitch(rdv: RendezVous): Switch = - SwitchBuilder - .new() - .withRng(rng()) - .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRendezVous(rdv) - .build() +import + waku/waku_core/peers, + waku/node/waku_node, + waku/node/peer_manager/peer_manager, + waku/waku_rendezvous/protocol, + ./testlib/[wakucore, wakunode] procSuite "Waku Rendezvous": - asyncTest "Waku Switch uses Rendezvous": - ## Setup - + asyncTest "Simple remote test": let - wakuClient = RendezVous.new() - sourceClient = RendezVous.new() - destClient = RendezVous.new() - wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point - sourceSwitch = newRendezvousClientSwitch(sourceClient) #client - destSwitch = newRendezvousClientSwitch(destClient) #client - - # Setup client rendezvous - wakuClient.setup(wakuSwitch) - sourceClient.setup(sourceSwitch) - destClient.setup(destSwitch) - - await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start()) - - # Connect clients to the rendezvous point - await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) - await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) - - let res0 = await sourceClient.request("empty") - check res0.len == 0 - - # Check that source client gets peer info of dest client from rendezvous point - await sourceClient.advertise("foo") - let res1 = await destClient.request("foo") - check: - res1.len == 1 - res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data + clusterId = 10.uint16 + node1 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + + # Start nodes + await allFutures([node1.start(), node2.start()]) + + let remotePeerInfo1 = node1.switch.peerInfo.toRemotePeerInfo() + let remotePeerInfo2 = node2.switch.peerInfo.toRemotePeerInfo() + + node2.peerManager.addPeer(remotePeerInfo1) - await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop()) + let namespace = "test/name/space" + + let res = await node2.wakuRendezvous.batchAdvertise( + namespace, peers = @[remotePeerInfo1.peerId] + ) + assert res.isOk(), $res.error + + let response = + await node2.wakuRendezvous.batchRequest(namespace, 1, @[remotePeerInfo1.peerId]) + assert response.isOk(), $response.error + + let records = response.get() + + check: + records[0].peerId == remotePeerInfo2.peerId diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 00e3934082..d361f03b57 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1217,7 +1217,9 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: + error "rendezvous new failed", error = error + return # Always start discovering peers at startup (await node.wakuRendezvous.initialRequestAll()).isOkOr: @@ -1347,7 +1349,7 @@ proc stop*(node: WakuNode) {.async.} = await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() if not node.wakuRendezvous.isNil(): - await node.wakuRendezvous.stop() + await node.wakuRendezvous.stopWait() node.started = false diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 06e1e494c7..d2edfc5574 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -7,6 +7,7 @@ import ../waku_enr/capabilities const DiscoverLimit* = 1000 const DefaultRegistrationTTL* = 60.seconds const DefaultRegistrationInterval* = 10.seconds +const PeersRequestedCount* = 12 proc computeNamespace*(clusterId: uint16, shard: uint16): string = var namespace = "rs/" diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 1e7bba8a68..cfaefdee6a 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -20,7 +20,7 @@ import ./common logScope: - topics = "waku rendez vous" + topics = "waku rendezvous" declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" @@ -40,7 +40,8 @@ proc batchAdvertise*( ## Register with all rendezvous peers under a namespace let catchable = catch: - await procCall RendezVous(self).advertise(namespace, ttl, peers) + #await procCall RendezVous(self).advertise(namespace, ttl, peers) + await self.advertise(namespace, ttl, peers) if catchable.isErr(): return err(catchable.error.msg) @@ -56,7 +57,8 @@ proc batchRequest*( ## Request all records from all rendezvous peers matching a namespace let catchable = catch: - await RendezVous(self).request(namespace, count, peers) + #await procCall RendezVous(self).request(namespace, count, peers) + await self.request(namespace, count, peers) if catchable.isErr(): return err(catchable.error.msg) @@ -77,6 +79,9 @@ proc advertiseAll( continue # Advertise yourself on that peer + #[ procCall RendezVous(self).advertise( + namespace, DefaultRegistrationTTL, @[rpi.peerId] + ) ]# self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) let catchable = catch: @@ -106,8 +111,9 @@ proc initialRequestAll*( let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: continue - # Ask for 12 peer records for that shard - self.request(namespace, 12, @[rpi.peerId]) + # Ask for peer records for that shard + #procCall RendezVous(self).request(namespace, PeersRequestedCount, @[rpi.peerId]) + self.request(namespace, PeersRequestedCount, @[rpi.peerId]) let catchable = catch: await allFinished(futs) @@ -148,34 +154,41 @@ proc getRelayShards(enr: enr.Record): Option[RelayShards] = proc new*( T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record -): T {.raises: [].} = +): Result[T, string] {.raises: [].} = let relayshard = getRelayShards(enr).valueOr: warn "Using default cluster id 0" RelayShards(clusterID: 0, shardIds: @[]) let capabilities = enr.getCapabilities() - let wrv = WakuRendezVous( - peerManager: peerManager, relayshard: relayshard, capabilities: capabilities - ) + let catchable = catch: + procCall RendezVous.new(switch) + + if catchable.isErr(): + return err(catchable.error.msg) + + let rv = catchable.get() - RendezVous(wrv).setup(switch) + var wrv = WakuRendezVous(rv) + wrv.peerManager = peerManager + wrv.relayshard = relayshard + wrv.capabilities = capabilities debug "waku rendezvous initialized", cluster = relayshard.clusterId, shards = relayshard.shardIds, capabilities = capabilities - return wrv + return ok(wrv) proc start*(self: WakuRendezVous) {.async: (raises: []).} = - debug "starting waku rendezvous discovery" - # start registering forever self.periodicRegistrationFut = self.periodicRegistration() -proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = - debug "stopping waku rendezvous discovery" + debug "waku rendezvous discovery started" +proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = if not self.periodicRegistrationFut.isNil(): await self.periodicRegistrationFut.cancelAndWait() + + debug "waku rendezvous discovery stopped" From 06d153f680d55b09e79b49001085007e1e97a51e Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 5 Dec 2024 09:03:36 -0500 Subject: [PATCH 12/17] fixes --- tests/test_waku_rendezvous.nim | 38 +++++++++---- waku/factory/node_factory.nim | 2 +- waku/node/waku_node.nim | 9 +--- waku/waku_rendezvous/common.nim | 10 +++- waku/waku_rendezvous/protocol.nim | 90 +++++++++++++++++++------------ 5 files changed, 93 insertions(+), 56 deletions(-) diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index 438c4aedc9..f23876326e 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -1,6 +1,6 @@ {.used.} -import chronos, testutils/unittests, libp2p/builders +import std/options, chronos, testutils/unittests, libp2p/builders import waku/waku_core/peers, @@ -25,27 +25,43 @@ procSuite "Waku Rendezvous": Port(0), clusterId = clusterId, ) + node3 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) - # Start nodes - await allFutures([node1.start(), node2.start()]) + await allFutures( + [ + node1.mountWakuRendezvous(), + node2.mountWakuRendezvous(), + node3.mountWakuRendezvous(), + ] + ) + await allFutures([node1.start(), node2.start(), node3.start()]) - let remotePeerInfo1 = node1.switch.peerInfo.toRemotePeerInfo() - let remotePeerInfo2 = node2.switch.peerInfo.toRemotePeerInfo() + let peerInfo1 = node1.switch.peerInfo.toRemotePeerInfo() + let peerInfo2 = node2.switch.peerInfo.toRemotePeerInfo() + let peerInfo3 = node3.switch.peerInfo.toRemotePeerInfo() - node2.peerManager.addPeer(remotePeerInfo1) + node1.peerManager.addPeer(peerInfo2) + node2.peerManager.addPeer(peerInfo1) + node2.peerManager.addPeer(peerInfo3) + node3.peerManager.addPeer(peerInfo2) let namespace = "test/name/space" - let res = await node2.wakuRendezvous.batchAdvertise( - namespace, peers = @[remotePeerInfo1.peerId] + let res = await node1.wakuRendezvous.batchAdvertise( + namespace, 60.seconds, @[peerInfo2.peerId] ) assert res.isOk(), $res.error let response = - await node2.wakuRendezvous.batchRequest(namespace, 1, @[remotePeerInfo1.peerId]) + await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId]) assert response.isOk(), $response.error - let records = response.get() check: - records[0].peerId == remotePeerInfo2.peerId + records.len == 1 + records[0].peerId == peerInfo1.peerId diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index fe4da7c9d0..76e57f7c43 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -209,7 +209,7 @@ proc setupProtocols( # Only relay nodes should be rendezvous points. if conf.rendezvous: - await mountRendezvous(node) + await mountWakuRendezvous(node) # Keepalive mounted on all nodes try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index d361f03b57..df6f1906bf 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1214,11 +1214,11 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = asyncSpawn node.keepaliveLoop(keepalive) -proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = +proc mountWakuRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: - error "rendezvous new failed", error = error + error "initializing waku rendezvous failed", error = error return # Always start discovering peers at startup @@ -1228,11 +1228,6 @@ proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = if node.started: await node.wakuRendezvous.start() - try: - node.switch.mount(node.wakuRendezvous) - except LPError: - error "failed to mount rendezvous", error = getCurrentExceptionMsg() - proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index d2edfc5574..9722347cfb 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -1,8 +1,8 @@ {.push raises: [].} -import chronos +import std/options, chronos -import ../waku_enr/capabilities +import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding const DiscoverLimit* = 1000 const DefaultRegistrationTTL* = 60.seconds @@ -28,3 +28,9 @@ proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): str namespace &= $cap return namespace + +proc getRelayShards*(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index cfaefdee6a..a87adb62ab 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -24,8 +24,10 @@ logScope: declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" -type WakuRendezVous* = ref object of RendezVous +type WakuRendezVous* = ref object of RootObj + rendezvous: Rendezvous peerManager: PeerManager + relayShard: RelayShards capabilities: seq[Capabilities] @@ -34,17 +36,26 @@ type WakuRendezVous* = ref object of RendezVous proc batchAdvertise*( self: WakuRendezVous, namespace: string, - ttl: Duration = MinimumDuration, + ttl: Duration = DefaultRegistrationTTL, peers: seq[PeerId], ): Future[Result[void, string]] {.async: (raises: []).} = ## Register with all rendezvous peers under a namespace - let catchable = catch: - #await procCall RendezVous(self).advertise(namespace, ttl, peers) - await self.advertise(namespace, ttl, peers) + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) - if catchable.isErr(): - return err(catchable.error.msg) + let dialCatch = catch: + await allFinished(futs) + + if dialCatch.isErr(): + return err(dialCatch.error.msg) + + let advertCatch = catch: + await self.rendezvous.advertise(namespace, ttl, peers) + + if advertCatch.isErr(): + return err(advertCatch.error.msg) return ok() @@ -56,14 +67,23 @@ proc batchRequest*( ): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = ## Request all records from all rendezvous peers matching a namespace - let catchable = catch: - #await procCall RendezVous(self).request(namespace, count, peers) - await self.request(namespace, count, peers) + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) - if catchable.isErr(): - return err(catchable.error.msg) + let dialCatch = catch: + await allFinished(futs) - return ok(catchable.get()) + if dialCatch.isErr(): + return err(dialCatch.error.msg) + + let reqCatch = catch: + await self.rendezvous.request(namespace, count, peers) + + if reqCatch.isErr(): + return err(reqCatch.error.msg) + + return ok(reqCatch.get()) proc advertiseAll( self: WakuRendezVous @@ -79,10 +99,7 @@ proc advertiseAll( continue # Advertise yourself on that peer - #[ procCall RendezVous(self).advertise( - namespace, DefaultRegistrationTTL, @[rpi.peerId] - ) ]# - self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) let catchable = catch: await allFinished(futs) @@ -112,8 +129,7 @@ proc initialRequestAll*( continue # Ask for peer records for that shard - #procCall RendezVous(self).request(namespace, PeersRequestedCount, @[rpi.peerId]) - self.request(namespace, PeersRequestedCount, @[rpi.peerId]) + self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) let catchable = catch: await allFinished(futs) @@ -125,13 +141,16 @@ proc initialRequestAll*( if fut.failed(): warn "rendezvous request failed", error = fut.error.msg elif fut.finished(): - let peers = fut.value() + let res = fut.value() - for peer in peers: + let records = res.valueOr: + return err($res.error) + + for record in records: peerFoundTotal.inc() - self.peerManager.addPeer(peer) + self.peerManager.addPeer(record) - debug "waku rendezvous requests finished" + debug "waku rendezvous initial requests finished" return ok() @@ -146,12 +165,6 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = (await self.advertiseAll()).isOkOr: debug "waku rendezvous advertisements failed", error = error -proc getRelayShards(enr: enr.Record): Option[RelayShards] = - let typedRecord = enr.toTyped().valueOr: - return none(RelayShards) - - return typedRecord.relaySharding() - proc new*( T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record ): Result[T, string] {.raises: [].} = @@ -161,15 +174,22 @@ proc new*( let capabilities = enr.getCapabilities() - let catchable = catch: - procCall RendezVous.new(switch) + let rvCatchable = catch: + RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL) - if catchable.isErr(): - return err(catchable.error.msg) + if rvCatchable.isErr(): + return err(rvCatchable.error.msg) + + let rv = rvCatchable.get() + + let mountCatchable = catch: + switch.mount(rv) - let rv = catchable.get() + if mountCatchable.isErr(): + return err(mountCatchable.error.msg) - var wrv = WakuRendezVous(rv) + var wrv = WakuRendezVous() + wrv.rendezvous = rv wrv.peerManager = peerManager wrv.relayshard = relayshard wrv.capabilities = capabilities From 6505eab9ad53335b610b463fe2a8b0734cb752ec Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 5 Dec 2024 09:28:58 -0500 Subject: [PATCH 13/17] remove libp2p discovery manager --- waku/factory/waku.nim | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index e9fbb48955..55f32be36b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -374,16 +374,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() - ## libp2p DiscoveryManager - waku[].discoveryMngr = DiscoveryManager() - waku[].discoveryMngr.add( - RendezVousInterface.new(rdv = waku[].node.wakuRendezvous, tta = 1.minutes) - ) - if not isNil(waku[].node.wakuRelay): - for topic in waku[].node.wakuRelay.getSubscribedTopics(): - debug "advertise rendezvous namespace", topic - waku[].discoveryMngr.advertise(RdvNamespace(topic)) - return ok() # Waku shutdown From 482024bc176408969b4e733d087fea3047cbc98e Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 6 Dec 2024 08:09:57 -0500 Subject: [PATCH 14/17] close conn & fixes --- tests/test_waku_rendezvous.nim | 6 +-- waku/node/waku_node.nim | 2 +- waku/waku_rendezvous/protocol.nim | 75 ++++++++++++++++++++++++++----- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index f23876326e..65967b79a5 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -33,11 +33,7 @@ procSuite "Waku Rendezvous": ) await allFutures( - [ - node1.mountWakuRendezvous(), - node2.mountWakuRendezvous(), - node3.mountWakuRendezvous(), - ] + [node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()] ) await allFutures([node1.start(), node2.start(), node3.start()]) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index df6f1906bf..591962472b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1214,7 +1214,7 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = asyncSpawn node.keepaliveLoop(keepalive) -proc mountWakuRendezvous*(node: WakuNode) {.async: (raises: []).} = +proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index a87adb62ab..221321c950 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -22,9 +22,10 @@ import logScope: topics = "waku rendezvous" -declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" +declarePublicCounter rendezvousPeerFoundTotal, + "total number of peers found via rendezvous" -type WakuRendezVous* = ref object of RootObj +type WakuRendezVous* = ref object rendezvous: Rendezvous peerManager: PeerManager @@ -41,6 +42,8 @@ proc batchAdvertise*( ): Future[Result[void, string]] {.async: (raises: []).} = ## Register with all rendezvous peers under a namespace + # rendezvous.advertise except already opened connections + # must dial first var futs = collect(newSeq): for peerId in peers: self.peerManager.dialPeer(peerId, RendezVousCodec) @@ -49,13 +52,34 @@ proc batchAdvertise*( await allFinished(futs) if dialCatch.isErr(): - return err(dialCatch.error.msg) + return err("batchAdvertise: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn let advertCatch = catch: await self.rendezvous.advertise(namespace, ttl, peers) + for conn in conns: + await conn.close() + if advertCatch.isErr(): - return err(advertCatch.error.msg) + return err("batchAdvertise: " & advertCatch.error.msg) return ok() @@ -67,6 +91,8 @@ proc batchRequest*( ): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = ## Request all records from all rendezvous peers matching a namespace + # rendezvous.request except already opened connections + # must dial first var futs = collect(newSeq): for peerId in peers: self.peerManager.dialPeer(peerId, RendezVousCodec) @@ -75,29 +101,53 @@ proc batchRequest*( await allFinished(futs) if dialCatch.isErr(): - return err(dialCatch.error.msg) + return err("batchRequest: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn let reqCatch = catch: await self.rendezvous.request(namespace, count, peers) + for conn in conns: + await conn.close() + if reqCatch.isErr(): - return err(reqCatch.error.msg) + return err("batchRequest: " & reqCatch.error.msg) return ok(reqCatch.get()) proc advertiseAll( self: WakuRendezVous ): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous advertisements started" + let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): for pubsubTopic in pubsubTopics: - let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) - # Get a random RDV peer for that shard let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" continue + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + # Advertise yourself on that peer self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) @@ -109,7 +159,7 @@ proc advertiseAll( for fut in catchable.get(): if fut.failed(): - warn "rendezvous advertisement failed", error = fut.error.msg + error "rendezvous advertisement failed", error = fut.error.msg debug "waku rendezvous advertisements finished" @@ -118,6 +168,8 @@ proc advertiseAll( proc initialRequestAll*( self: WakuRendezVous ): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous initial requests started" + let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): @@ -126,6 +178,7 @@ proc initialRequestAll*( # Get a random RDV peer for that shard let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" continue # Ask for peer records for that shard @@ -139,7 +192,7 @@ proc initialRequestAll*( for fut in catchable.get(): if fut.failed(): - warn "rendezvous request failed", error = fut.error.msg + error "rendezvous request failed", error = fut.error.msg elif fut.finished(): let res = fut.value() @@ -147,7 +200,7 @@ proc initialRequestAll*( return err($res.error) for record in records: - peerFoundTotal.inc() + rendezvousPeerFoundTotal.inc() self.peerManager.addPeer(record) debug "waku rendezvous initial requests finished" From aadb70a42aec268e9e462546831bf9681c863795 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 6 Dec 2024 08:35:03 -0500 Subject: [PATCH 15/17] rename mount proc --- waku/factory/node_factory.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 76e57f7c43..fe4da7c9d0 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -209,7 +209,7 @@ proc setupProtocols( # Only relay nodes should be rendezvous points. if conf.rendezvous: - await mountWakuRendezvous(node) + await mountRendezvous(node) # Keepalive mounted on all nodes try: From f8f204d001a25c8a261bcb0b876383deb13795b4 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 9 Dec 2024 09:47:10 -0500 Subject: [PATCH 16/17] default enable rendezvous --- tests/testlib/wakunode.nim | 1 + waku/factory/node_factory.nim | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 01e58697fa..8cacf5317c 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -40,6 +40,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = clusterId: DefaultClusterId, shards: @[DefaultShardId], relay: true, + rendezvous: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index fe4da7c9d0..bf45cb0d24 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -209,7 +209,7 @@ proc setupProtocols( # Only relay nodes should be rendezvous points. if conf.rendezvous: - await mountRendezvous(node) + await node.mountRendezvous() # Keepalive mounted on all nodes try: From d876488745ed7161295a53aba6daca7507dead61 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 9 Dec 2024 10:31:04 -0500 Subject: [PATCH 17/17] fix tests --- tests/wakunode2/test_app.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 67a6556c8b..73ffc8f932 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -62,7 +62,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() ## Cleanup waitFor waku.stop() @@ -92,7 +92,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() # DS structures are updated with dynamic ports typedNodeEnr.get().tcp.get() != 0