diff --git a/tests/factory/test_node_factory.nim b/tests/factory/test_node_factory.nim index bc3dc0f80d..c575c2b81a 100644 --- a/tests/factory/test_node_factory.nim +++ b/tests/factory/test_node_factory.nim @@ -17,7 +17,7 @@ suite "Node Factory": node.wakuStore.isNil() node.wakuFilter.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() test "Set up a node with Store enabled": var conf = defaultTestWakuNodeConf() diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index c005a8f341..65967b79a5 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -1,51 +1,63 @@ {.used.} -import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous +import std/options, chronos, testutils/unittests, libp2p/builders -import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore - -proc newRendezvousClientSwitch(rdv: RendezVous): Switch = - SwitchBuilder - .new() - .withRng(rng()) - .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRendezVous(rdv) - .build() +import + waku/waku_core/peers, + waku/node/waku_node, + waku/node/peer_manager/peer_manager, + waku/waku_rendezvous/protocol, + ./testlib/[wakucore, wakunode] procSuite "Waku Rendezvous": - asyncTest "Waku Switch uses Rendezvous": - ## Setup - + asyncTest "Simple remote test": let - wakuClient = RendezVous.new() - sourceClient = RendezVous.new() - destClient = RendezVous.new() - wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point - sourceSwitch = newRendezvousClientSwitch(sourceClient) #client - destSwitch = newRendezvousClientSwitch(destClient) #client - - # Setup client rendezvous - wakuClient.setup(wakuSwitch) - sourceClient.setup(sourceSwitch) - destClient.setup(destSwitch) - - await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start()) - - # Connect clients to the rendezvous point - await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) - await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs) - - let res0 = await sourceClient.request("empty") - check res0.len == 0 - - # Check that source client gets peer info of dest client from rendezvous point - await sourceClient.advertise("foo") - let res1 = await destClient.request("foo") - check: - res1.len == 1 - res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data + clusterId = 10.uint16 + node1 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + node3 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + ) + + await allFutures( + [node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()] + ) + await allFutures([node1.start(), node2.start(), node3.start()]) + + let peerInfo1 = node1.switch.peerInfo.toRemotePeerInfo() + let peerInfo2 = node2.switch.peerInfo.toRemotePeerInfo() + let peerInfo3 = node3.switch.peerInfo.toRemotePeerInfo() + + node1.peerManager.addPeer(peerInfo2) + node2.peerManager.addPeer(peerInfo1) + node2.peerManager.addPeer(peerInfo3) + node3.peerManager.addPeer(peerInfo2) + + let namespace = "test/name/space" + + let res = await node1.wakuRendezvous.batchAdvertise( + namespace, 60.seconds, @[peerInfo2.peerId] + ) + assert res.isOk(), $res.error + + let response = + await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId]) + assert response.isOk(), $response.error + let records = response.get() - await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop()) + check: + records.len == 1 + records[0].peerId == peerInfo1.peerId diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 01e58697fa..8cacf5317c 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -40,6 +40,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = clusterId: DefaultClusterId, shards: @[DefaultShardId], relay: true, + rendezvous: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 67a6556c8b..73ffc8f932 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -62,7 +62,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() ## Cleanup waitFor waku.stop() @@ -92,7 +92,7 @@ suite "Wakunode2 - Waku initialization": node.wakuArchive.isNil() node.wakuStore.isNil() not node.wakuStoreClient.isNil() - not node.rendezvous.isNil() + not node.wakuRendezvous.isNil() # DS structures are updated with dynamic ports typedNodeEnr.get().tcp.get() != 0 diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 01db15fcb2..0c6b5b551c 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -647,6 +647,13 @@ with the drawback of consuming some more bandwitdh.""", name: "peer-exchange-node" .}: string + ## Rendez vous + rendezvous* {. + desc: "Enable waku rendezvous discovery server", + defaultValue: true, + name: "rendezvous" + .}: bool + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d98a99546d..bf45cb0d24 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -207,12 +207,9 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Enable Rendezvous Discovery protocol when Relay is enabled - try: - await mountRendezvous(node) - except CatchableError: - return - err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + # Only relay nodes should be rendezvous points. + if conf.rendezvous: + await node.mountRendezvous() # Keepalive mounted on all nodes try: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 48d75cfd15..55f32be36b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -374,16 +374,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() - ## libp2p DiscoveryManager - waku[].discoveryMngr = DiscoveryManager() - waku[].discoveryMngr.add( - RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes) - ) - if not isNil(waku[].node.wakuRelay): - for topic in waku[].node.wakuRelay.getSubscribedTopics(): - debug "advertise rendezvous namespace", topic - waku[].discoveryMngr.advertise(RdvNamespace(topic)) - return ok() # Waku shutdown diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 50b65bfc5a..591962472b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,7 +17,6 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/connectivity/autonat/client, libp2p/protocols/connectivity/autonat/service, - libp2p/protocols/rendezvous, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -39,6 +38,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_rendezvous/protocol, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -110,7 +110,7 @@ type enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext - rendezvous*: RendezVous + wakuRendezvous*: WakuRendezVous announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] @@ -1217,22 +1217,16 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - try: - node.rendezvous = RendezVous.new(node.switch) - except Exception as e: - error "failed to create rendezvous", error = getCurrentExceptionMsg() + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: + error "initializing waku rendezvous failed", error = error return - if node.started: - try: - await node.rendezvous.start() - except CatchableError: - error "failed to start rendezvous", error = getCurrentExceptionMsg() + # Always start discovering peers at startup + (await node.wakuRendezvous.initialRequestAll()).isOkOr: + error "rendezvous failed initial requests", error = error - try: - node.switch.mount(node.rendezvous) - except LPError: - error "failed to mount rendezvous", error = getCurrentExceptionMsg() + if node.started: + await node.wakuRendezvous.start() proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd @@ -1304,6 +1298,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1346,6 +1343,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.stopWait() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_rendezvous.nim b/waku/waku_rendezvous.nim new file mode 100644 index 0000000000..b07f1f7274 --- /dev/null +++ b/waku/waku_rendezvous.nim @@ -0,0 +1,3 @@ +import ./waku_rendezvous/protocol + +export protocol diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim new file mode 100644 index 0000000000..9722347cfb --- /dev/null +++ b/waku/waku_rendezvous/common.nim @@ -0,0 +1,36 @@ +{.push raises: [].} + +import std/options, chronos + +import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding + +const DiscoverLimit* = 1000 +const DefaultRegistrationTTL* = 60.seconds +const DefaultRegistrationInterval* = 10.seconds +const PeersRequestedCount* = 12 + +proc computeNamespace*(clusterId: uint16, shard: uint16): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + + return namespace + +proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + namespace &= '/' + namespace &= $cap + + return namespace + +proc getRelayShards*(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim new file mode 100644 index 0000000000..221321c950 --- /dev/null +++ b/waku/waku_rendezvous/protocol.nim @@ -0,0 +1,267 @@ +{.push raises: [].} + +import + std/[sugar, options], + results, + chronos, + chronicles, + metrics, + libp2p/protocols/rendezvous, + libp2p/switch, + libp2p/utility + +import + ../node/peer_manager, + ../common/enr, + ../waku_enr/capabilities, + ../waku_enr/sharding, + ../waku_core/peers, + ../waku_core/topics, + ./common + +logScope: + topics = "waku rendezvous" + +declarePublicCounter rendezvousPeerFoundTotal, + "total number of peers found via rendezvous" + +type WakuRendezVous* = ref object + rendezvous: Rendezvous + peerManager: PeerManager + + relayShard: RelayShards + capabilities: seq[Capabilities] + + periodicRegistrationFut: Future[void] + +proc batchAdvertise*( + self: WakuRendezVous, + namespace: string, + ttl: Duration = DefaultRegistrationTTL, + peers: seq[PeerId], +): Future[Result[void, string]] {.async: (raises: []).} = + ## Register with all rendezvous peers under a namespace + + # rendezvous.advertise except already opened connections + # must dial first + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) + + let dialCatch = catch: + await allFinished(futs) + + if dialCatch.isErr(): + return err("batchAdvertise: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn + + let advertCatch = catch: + await self.rendezvous.advertise(namespace, ttl, peers) + + for conn in conns: + await conn.close() + + if advertCatch.isErr(): + return err("batchAdvertise: " & advertCatch.error.msg) + + return ok() + +proc batchRequest*( + self: WakuRendezVous, + namespace: string, + count: int = DiscoverLimit, + peers: seq[PeerId], +): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = + ## Request all records from all rendezvous peers matching a namespace + + # rendezvous.request except already opened connections + # must dial first + var futs = collect(newSeq): + for peerId in peers: + self.peerManager.dialPeer(peerId, RendezVousCodec) + + let dialCatch = catch: + await allFinished(futs) + + if dialCatch.isErr(): + return err("batchRequest: " & dialCatch.error.msg) + + futs = dialCatch.get() + + let conns = collect(newSeq): + for fut in futs: + let catchable = catch: + fut.read() + + if catchable.isErr(): + error "rendezvous dial failed", error = catchable.error.msg + continue + + let connOpt = catchable.get() + + let conn = connOpt.valueOr: + continue + + conn + + let reqCatch = catch: + await self.rendezvous.request(namespace, count, peers) + + for conn in conns: + await conn.close() + + if reqCatch.isErr(): + return err("batchRequest: " & reqCatch.error.msg) + + return ok(reqCatch.get()) + +proc advertiseAll( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous advertisements started" + + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" + continue + + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Advertise yourself on that peer + self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) + + for fut in catchable.get(): + if fut.failed(): + error "rendezvous advertisement failed", error = fut.error.msg + + debug "waku rendezvous advertisements finished" + + return ok() + +proc initialRequestAll*( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = + debug "waku rendezvous initial requests started" + + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + error "could not get a peer supporting RendezVousCodec" + continue + + # Ask for peer records for that shard + self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) + + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) + + for fut in catchable.get(): + if fut.failed(): + error "rendezvous request failed", error = fut.error.msg + elif fut.finished(): + let res = fut.value() + + let records = res.valueOr: + return err($res.error) + + for record in records: + rendezvousPeerFoundTotal.inc() + self.peerManager.addPeer(record) + + debug "waku rendezvous initial requests finished" + + return ok() + +proc periodicRegistration(self: WakuRendezVous) {.async.} = + debug "waku rendezvous periodic registration started", + interval = DefaultRegistrationInterval + + # infinite loop + while true: + await sleepAsync(DefaultRegistrationInterval) + + (await self.advertiseAll()).isOkOr: + debug "waku rendezvous advertisements failed", error = error + +proc new*( + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record +): Result[T, string] {.raises: [].} = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let rvCatchable = catch: + RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL) + + if rvCatchable.isErr(): + return err(rvCatchable.error.msg) + + let rv = rvCatchable.get() + + let mountCatchable = catch: + switch.mount(rv) + + if mountCatchable.isErr(): + return err(mountCatchable.error.msg) + + var wrv = WakuRendezVous() + wrv.rendezvous = rv + wrv.peerManager = peerManager + wrv.relayshard = relayshard + wrv.capabilities = capabilities + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return ok(wrv) + +proc start*(self: WakuRendezVous) {.async: (raises: []).} = + # start registering forever + self.periodicRegistrationFut = self.periodicRegistration() + + debug "waku rendezvous discovery started" + +proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = + if not self.periodicRegistrationFut.isNil(): + await self.periodicRegistrationFut.cancelAndWait() + + debug "waku rendezvous discovery stopped"