diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index dd2694cf77..f844deada7 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -6,7 +6,7 @@ when not (compileOption("threads")): {.push raises: [].} -import std/[strformat, strutils, times, options, random] +import std/[strformat, strutils, times, options, random, sequtils] import confutils, chronicles, @@ -379,7 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed") if conf.relay: - await node.mountRelay(conf.topics.split(" ")) + let shards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + await node.mountRelay(shards) await node.mountLibp2pPing() diff --git a/apps/chat2/config_chat2.nim b/apps/chat2/config_chat2.nim index 417d298a3f..5086aef51b 100644 --- a/apps/chat2/config_chat2.nim +++ b/apps/chat2/config_chat2.nim @@ -83,11 +83,19 @@ type name: "keep-alive" .}: bool - topics* {. - desc: "Default topics to subscribe to (space separated list).", - defaultValue: "/waku/2/rs/0/0", - name: "topics" - .}: string + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 0, + name: "cluster-id" + .}: uint16 + + shards* {. + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", + defaultValue: @[uint16(0)], + name: "shard" + .}: seq[uint16] ## Store config store* {. diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index c858fdb3b2..c7d8bb56a8 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -67,12 +67,6 @@ type Chat2MatterbridgeConf* = object name: "nodekey" .}: crypto.PrivateKey - topics* {. - desc: "Default topics to subscribe to (space separated list)", - defaultValue: "/waku/2/rs/0/0", - name: "topics" - .}: string - store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" .}: bool diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index 5683b544f5..35f214f8d0 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -98,7 +98,7 @@ type LiteProtocolTesterConf* = object ## TODO: extend lite protocol tester configuration based on testing needs # shards* {. - # desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + # desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", # defaultValue: @[], # name: "shard" # .}: seq[uint16] diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 40e42010a9..09a64c8d54 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -441,10 +441,12 @@ proc initAndStartApp( ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort) ) builder.withWakuCapabilities(flags) - let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics) - if addShardedTopics.isErr(): - error "failed to add sharded topics to ENR", error = addShardedTopics.error - return err($addShardedTopics.error) + + builder.withWakuRelaySharding( + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + ).isOkOr: + error "failed to add sharded topics to ENR", error = error + return err("failed to add sharded topics to ENR: " & $error) let recordRes = builder.build() let record = @@ -561,11 +563,14 @@ when isMainModule: let twnClusterConf = ClusterConf.TheWakuNetworkConf() conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes - conf.pubsubTopics = twnClusterConf.pubsubTopics conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork + + if conf.shards.len == 0: + conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1)) if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) @@ -631,9 +636,11 @@ when isMainModule: error "failed to mount waku metadata protocol: ", err = error quit 1 - for pubsubTopic in conf.pubsubTopics: - # Subscribe the node to the default pubsubtopic, to count messages - subscribeAndHandleMessages(node, pubsubTopic, msgPerContentTopic) + for shard in conf.shards: + # Subscribe the node to the shards, to count messages + subscribeAndHandleMessages( + node, $RelayShard(shardId: shard, clusterId: conf.clusterId), msgPerContentTopic + ) # spawn the routine that crawls the network # TODO: split into 3 routines (discovery, connections, ip2location) diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 22b74b5865..291144c5ba 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -38,10 +38,15 @@ type NetworkMonitorConf* = object name: "dns-discovery-url" .}: string - pubsubTopics* {. - desc: "Default pubsub topic to subscribe to. Argument may be repeated.", - name: "pubsub-topic" - .}: seq[string] + shards* {. + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", + name: "shard" + .}: seq[uint16] + + numShardsInNetwork* {. + desc: "Number of shards in the network", name: "num-shards-in-network" + .}: uint32 refreshInterval* {. desc: "How often new peers are discovered and connected to (in seconds)", @@ -55,7 +60,7 @@ type NetworkMonitorConf* = object "Cluster id that the node is running in. Node in a different cluster id is disconnected.", defaultValue: 1, name: "cluster-id" - .}: uint32 + .}: uint16 rlnRelay* {. desc: "Enable spam protection through rln-relay: true|false", diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index a67534f44d..9d7f5450e1 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -81,7 +81,8 @@ type WakuCanaryConf* = object .}: bool shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", defaultValue: @[], name: "shard", abbr: "s" diff --git a/docs/operators/how-to/run.md b/docs/operators/how-to/run.md index b85efe1bb6..bc90394792 100644 --- a/docs/operators/how-to/run.md +++ b/docs/operators/how-to/run.md @@ -18,7 +18,7 @@ By default a nwaku node will: See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key. - listen for incoming libp2p connections on the default TCP port (`60000`) - enable `relay` protocol -- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0` +- subscribe to the default clusterId (0) and shard (0) - enable `store` protocol, but only as a client. This implies that the nwaku node will not persist any historical messages itself, but can query `store` service peers who do so. diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index 3d0cb08c2c..57acf13df4 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -20,8 +20,6 @@ suite "Peer Manager": serverKey {.threadvar.}: PrivateKey clientKey {.threadvar.}: PrivateKey clusterId {.threadvar.}: uint64 - shardTopic0 {.threadvar.}: string - shardTopic1 {.threadvar.}: string asyncSetup: listenPort = Port(0) @@ -29,17 +27,15 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() clusterId = 1 - shardTopic0 = "/waku/2/rs/" & $clusterId & "/0" - shardTopic1 = "/waku/2/rs/" & $clusterId & "/1" asyncTest "light client is not disconnected": # Given two nodes with the same shardId let server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] ) client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1] + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] ) # And both mount metadata and filter @@ -71,10 +67,10 @@ suite "Peer Manager": # Given two nodes with the same shardId let server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] ) client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] ) # And both mount metadata and relay @@ -104,10 +100,10 @@ suite "Peer Manager": # Given two nodes with different shardIds let server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] ) client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1] + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] ) # And both mount metadata and relay diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index e1bdb7c6a7..1304575c7d 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -96,9 +96,9 @@ proc getWakuRlnConfigOnChain*( ) proc setupRelayWithOnChainRln*( - node: WakuNode, pubsubTopics: seq[string], wakuRlnConfig: WakuRlnConfig + node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await node.mountRlnRelay(wakuRlnConfig) suite "Waku RlnRelay - End to End - Static": @@ -223,7 +223,7 @@ suite "Waku RlnRelay - End to End - Static": nodekey = generateSecp256k1Key() node = newTestWakuNode(nodekey, parseIpAddress("0.0.0.0"), Port(0)) - await node.mountRelay(@[DefaultPubsubTopic]) + await node.mountRelay(@[DefaultRelayShard]) let contractAddress = await uploadRLNContract(EthClient) let wakuRlnConfig = WakuRlnConfig( diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 639ea3983d..d71f186cac 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -418,7 +418,8 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/3/0"], + clusterId = 3, + shards = @[uint16(0)], ) # same network @@ -426,13 +427,15 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/4/0"], + clusterId = 4, + shards = @[uint16(0)], ) node3 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/4/0"], + clusterId = 4, + shards = @[uint16(0)], ) node1.mountMetadata(3).expect("Mounted Waku Metadata") diff --git a/tests/test_relay_peer_exchange.nim b/tests/test_relay_peer_exchange.nim index be27365a63..0be3c91932 100644 --- a/tests/test_relay_peer_exchange.nim +++ b/tests/test_relay_peer_exchange.nim @@ -25,8 +25,8 @@ procSuite "Relay (GossipSub) Peer Exchange": newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true) # When both client and server mount relay without a handler - await node1.mountRelay(@[DefaultPubsubTopic]) - await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler)) + await node1.mountRelay(@[DefaultRelayShard]) + await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler)) # Then the relays are mounted without a handler check: @@ -75,9 +75,9 @@ procSuite "Relay (GossipSub) Peer Exchange": peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler # Givem the nodes mount relay with a peer exchange handler - await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) - await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) - await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle)) + await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) + await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) + await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle)) # Ensure that node1 prunes all peers after the first connection node1.wakuRelay.parameters.dHigh = 1 diff --git a/tests/test_waku_enr.nim b/tests/test_waku_enr.nim index bf657d43d1..f187fa3002 100644 --- a/tests/test_waku_enr.nim +++ b/tests/test_waku_enr.nim @@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding": clusterId: uint16 = 22 shardId: uint16 = 1 - let shard = RelayShard.staticSharding(clusterId, shardId) + let shard = RelayShard(clusterId: clusterId, shardId: shardId) ## When let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards") diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index 4640d49f9b..975070465a 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -28,7 +28,7 @@ suite "WakuNode": node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000)) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002)) - pubSubTopic = "/waku/2/rs/0/0" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) @@ -36,13 +36,13 @@ suite "WakuNode": # Setup node 1 with stable codec "/vac/waku/relay/2.0.0" await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) node1.wakuRelay.codec = "/vac/waku/relay/2.0.0" # Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2" await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2" check: @@ -58,15 +58,15 @@ suite "WakuNode": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(2000.millis) - var res = await node1.publish(some(pubSubTopic), message) + var res = await node1.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(2000.millis) diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 5f2c5e2df2..c680fb4685 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -19,8 +19,8 @@ suite "WakuNode - Lightpush": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - await destNode.mountRelay(@[DefaultPubsubTopic]) - await bridgeNode.mountRelay(@[DefaultPubsubTopic]) + await destNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountRelay(@[DefaultRelayShard]) await bridgeNode.mountLightPush() lightNode.mountLightPushClient() diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index eb679859b8..b6184e330f 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -37,8 +37,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = nat: "any", maxConnections: 50, maxMessageSize: "1024 KiB", - clusterId: 0, - pubsubTopics: @["/waku/2/rs/0/0"], + clusterId: DefaultClusterId, + shards: @[DefaultShardId], relay: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) @@ -63,8 +63,9 @@ proc newTestWakuNode*( dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - pubsubTopics: seq[string] = @["/waku/2/rs/1/0"], peerStoreCapacity = none(int), + clusterId = DefaultClusterId, + shards = @[DefaultShardId], ): WakuNode = var resolvedExtIp = extIp @@ -77,14 +78,8 @@ proc newTestWakuNode*( var conf = defaultTestWakuNodeConf() - let clusterId = - if pubsubTopics.len() > 0: - RelayShard.parse(pubsubTopics[0]).get().clusterId - else: - 1.uint16 - conf.clusterId = clusterId - conf.pubsubTopics = pubsubTopics + conf.shards = shards if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails @@ -95,7 +90,7 @@ proc newTestWakuNode*( let netConf = NetConfig.init( bindIp = bindIp, - clusterId = clusterId, + clusterId = conf.clusterId, bindPort = bindPort, extIp = resolvedExtIp, extPort = extPort, @@ -111,8 +106,10 @@ proc newTestWakuNode*( var enrBuilder = EnrBuilder.init(nodeKey) - enrBuilder.withShardedTopics(pubsubTopics).isOkOr: - raise newException(Defect, "Invalid record: " & error) + enrBuilder.withWakuRelaySharding( + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + ).isOkOr: + raise newException(Defect, "Invalid record: " & $error) enrBuilder.withIpAddressAndPorts( ipAddr = netConf.enrIp, tcpPort = netConf.enrPort, udpPort = netConf.discv5UdpPort diff --git a/tests/waku_core/test_namespaced_topics.nim b/tests/waku_core/test_namespaced_topics.nim index ca69269363..8c3cee1fc2 100644 --- a/tests/waku_core/test_namespaced_topics.nim +++ b/tests/waku_core/test_namespaced_topics.nim @@ -136,7 +136,7 @@ suite "Waku Message - Content topics namespacing": suite "Waku Message - Pub-sub topics namespacing": test "Stringify static sharding pub-sub topic": ## Given - var shard = RelayShard.staticSharding(clusterId = 0, shardId = 2) + var shard = RelayShard(clusterId: 0, shardId: 2) ## When let topic = $shard diff --git a/tests/waku_core/topics/test_pubsub_topic.nim b/tests/waku_core/topics/test_pubsub_topic.nim index bbdc894553..4807d30d13 100644 --- a/tests/waku_core/topics/test_pubsub_topic.nim +++ b/tests/waku_core/topics/test_pubsub_topic.nim @@ -10,10 +10,10 @@ suite "Static Sharding Functionality": check: shard.clusterId == 0 shard.shardId == 1 - shard == RelayShard.staticSharding(0, 1) + shard == RelayShard(clusterId: 0, shardId: 1) test "Pubsub Topic Naming Compliance": - let shard = RelayShard.staticSharding(0, 1) + let shard = RelayShard(clusterId: 0, shardId: 1) check: shard.clusterId == 0 shard.shardId == 1 diff --git a/tests/waku_core/topics/test_sharding.nim b/tests/waku_core/topics/test_sharding.nim index db0ea6d979..33c38b4304 100644 --- a/tests/waku_core/topics/test_sharding.nim +++ b/tests/waku_core/topics/test_sharding.nim @@ -54,16 +54,16 @@ suite "Autosharding": # Then the generated shards are valid check: - shard1 == RelayShard.staticSharding(ClusterId, 3) - shard2 == RelayShard.staticSharding(ClusterId, 3) - shard3 == RelayShard.staticSharding(ClusterId, 6) - shard4 == RelayShard.staticSharding(ClusterId, 6) - shard5 == RelayShard.staticSharding(ClusterId, 3) - shard6 == RelayShard.staticSharding(ClusterId, 3) - shard7 == RelayShard.staticSharding(ClusterId, 3) - shard8 == RelayShard.staticSharding(ClusterId, 3) - shard9 == RelayShard.staticSharding(ClusterId, 7) - shard10 == RelayShard.staticSharding(ClusterId, 3) + shard1 == RelayShard(clusterId: ClusterId, shardId: 3) + shard2 == RelayShard(clusterId: ClusterId, shardId: 3) + shard3 == RelayShard(clusterId: ClusterId, shardId: 6) + shard4 == RelayShard(clusterId: ClusterId, shardId: 6) + shard5 == RelayShard(clusterId: ClusterId, shardId: 3) + shard6 == RelayShard(clusterId: ClusterId, shardId: 3) + shard7 == RelayShard(clusterId: ClusterId, shardId: 3) + shard8 == RelayShard(clusterId: ClusterId, shardId: 3) + shard9 == RelayShard(clusterId: ClusterId, shardId: 7) + shard10 == RelayShard(clusterId: ClusterId, shardId: 3) suite "getShard from NsContentTopic": test "Generate Gen0 Shard with topic.generation==none": @@ -75,7 +75,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -85,7 +85,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = @@ -106,7 +106,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -116,7 +116,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 7c44ec90f1..b93875e831 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -63,19 +63,19 @@ suite "WakuNode - Relay": node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0)) nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + await node3.mountRelay(@[shard]) await allFutures( node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]), @@ -87,15 +87,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - var res = await node1.publish(some(pubSubTopic), message) + var res = await node1.publish(some($shard), message) assert res.isOk(), $res.error ## Then @@ -124,7 +124,7 @@ suite "WakuNode - Relay": nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic1 = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) @@ -135,13 +135,13 @@ suite "WakuNode - Relay": # start all the nodes await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + await node3.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -155,7 +155,7 @@ suite "WakuNode - Relay": ): Future[ValidationResult] {.async.} = ## the validator that only allows messages with contentTopic1 to be relayed check: - topic == pubSubTopic + topic == $shard # only relay messages with contentTopic1 if msg.contentTopic != contentTopic1: @@ -172,22 +172,22 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard # check that only messages with contentTopic1 is relayed (but not contentTopic2) msg.contentTopic == contentTopic1 # relay handler is called completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - var res = await node1.publish(some(pubSubTopic), message1) + var res = await node1.publish(some($shard), message1) assert res.isOk(), $res.error await sleepAsync(500.millis) # message2 never gets relayed because of the validator - res = await node1.publish(some(pubSubTopic), message2) + res = await node1.publish(some($shard), message2) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -258,16 +258,16 @@ suite "WakuNode - Relay": wsBindPort = Port(0), wsEnabled = true, ) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -276,15 +276,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -306,16 +306,16 @@ suite "WakuNode - Relay": ) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -324,15 +324,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -354,16 +354,16 @@ suite "WakuNode - Relay": wsBindPort = Port(0), wsEnabled = true, ) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) #delete websocket peer address # TODO: a better way to find the index - this is too brittle @@ -376,15 +376,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -408,16 +408,16 @@ suite "WakuNode - Relay": ) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -426,15 +426,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -466,16 +466,16 @@ suite "WakuNode - Relay": ) let - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -484,15 +484,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index f63a100827..c1a085b103 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -53,9 +53,9 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} = ) proc setupRelayWithRln*( - node: WakuNode, identifier: uint, pubsubTopics: seq[string] + node: WakuNode, identifier: uint, shards: seq[RelayShard] ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await setupRln(node, identifier) proc subscribeToContentTopicWithHandler*( diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index e227a0bb79..f837d34113 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -50,7 +50,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -66,7 +66,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -81,7 +81,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -131,18 +131,15 @@ procSuite "WakuNode - RLN relay": await node2.stop() await node3.stop() - asyncTest "testing rln-relay is applied in all rln pubsub/content topics": + asyncTest "testing rln-relay is applied in all rln shards/content topics": # create 3 nodes let nodes = toSeq(0 ..< 3).mapIt( newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) ) await allFutures(nodes.mapIt(it.start())) - let pubsubTopics = - @[ - PubsubTopic("/waku/2/pubsubtopic-a/proto"), - PubsubTopic("/waku/2/pubsubtopic-b/proto"), - ] + let shards = + @[RelayShard(clusterId: 0, shardId: 0), RelayShard(clusterId: 0, shardId: 1)] let contentTopics = @[ ContentTopic("/waku/2/content-topic-a/proto"), @@ -150,7 +147,7 @@ procSuite "WakuNode - RLN relay": ] # set up three nodes - await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics))) + await allFutures(nodes.mapIt(it.mountRelay(shards))) # mount rlnrelay in off-chain mode for index, node in nodes: @@ -177,14 +174,14 @@ procSuite "WakuNode - RLN relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = info "relayHandler. The received topic:", topic - if topic == pubsubTopics[0]: + if topic == $shards[0]: rxMessagesTopic1 = rxMessagesTopic1 + 1 - elif topic == pubsubTopics[1]: + elif topic == $shards[1]: rxMessagesTopic2 = rxMessagesTopic2 + 1 # mount the relay handlers - nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler)) - nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)) await sleepAsync(1000.millis) # generate some messages with rln proofs first. generating @@ -214,9 +211,9 @@ procSuite "WakuNode - RLN relay": # publish 3 messages from node[0] (last 2 are spam, window is 10 secs) # publish 3 messages from node[1] (last 2 are spam, window is 10 secs) for msg in messages1: - discard await nodes[0].publish(some(pubsubTopics[0]), msg) + discard await nodes[0].publish(some($shards[0]), msg) for msg in messages2: - discard await nodes[1].publish(some(pubsubTopics[1]), msg) + discard await nodes[1].publish(some($shards[1]), msg) # wait for gossip to propagate await sleepAsync(5000.millis) @@ -245,7 +242,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -261,7 +258,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -276,7 +273,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -361,7 +358,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -377,7 +374,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( @@ -392,7 +389,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig3 = WakuRlnConfig( @@ -485,7 +482,7 @@ procSuite "WakuNode - RLN relay": # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopicSeq = @[DefaultPubsubTopic] + shardSeq = @[DefaultRelayShard] nodeKey1 = generateSecp256k1Key() node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0)) nodeKey2 = generateSecp256k1Key() @@ -493,12 +490,12 @@ procSuite "WakuNode - RLN relay": epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4 # Given both nodes mount relay and rlnrelay - await node1.mountRelay(pubsubTopicSeq) + await node1.mountRelay(shardSeq) let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) @@ -613,7 +610,7 @@ procSuite "WakuNode - RLN relay": # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopicSeq = @[DefaultPubsubTopic] + shardSeq = @[DefaultRelayShard] nodeKey1 = generateSecp256k1Key() node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0)) nodeKey2 = generateSecp256k1Key() @@ -622,12 +619,12 @@ procSuite "WakuNode - RLN relay": # Given both nodes mount relay and rlnrelay # Mount rlnrelay in node1 in off-chain mode - await node1.mountRelay(pubsubTopicSeq) + await node1.mountRelay(shardSeq) let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) diff --git a/tests/waku_rln_relay/utils_static.nim b/tests/waku_rln_relay/utils_static.nim index 00f29c7048..d2a781fcdb 100644 --- a/tests/waku_rln_relay/utils_static.nim +++ b/tests/waku_rln_relay/utils_static.nim @@ -32,9 +32,9 @@ proc setupStaticRln*( ) proc setupRelayWithStaticRln*( - node: WakuNode, identifier: uint, pubsubTopics: seq[string] + node: WakuNode, identifier: uint, shards: seq[RelayShard] ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await setupStaticRln(node, identifier) proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] = diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index c8a372984a..9732d114b6 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -54,16 +54,16 @@ suite "Waku v2 Rest API - Relay": installRelayApiHandlers(restServer.router, node, cache) restServer.start() - let pubSubTopics = - @[ - PubSubTopic("pubsub-topic-1"), - PubSubTopic("pubsub-topic-2"), - PubSubTopic("pubsub-topic-3"), - ] + let + shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0) + shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1) + shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2) + + let shards = @[$shard0, $shard1, $shard2] # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) - let response = await client.relayPostSubscriptionsV1(pubSubTopics) + let response = await client.relayPostSubscriptionsV1(shards) # Then check: @@ -72,12 +72,12 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - cache.isPubsubSubscribed("pubsub-topic-1") - cache.isPubsubSubscribed("pubsub-topic-2") - cache.isPubsubSubscribed("pubsub-topic-3") + cache.isPubsubSubscribed($shard0) + cache.isPubsubSubscribed($shard1) + cache.isPubsubSubscribed($shard2) check: - toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len + toSeq(node.wakuRelay.subscribedTopics).len == shards.len await restServer.stop() await restServer.closeWait() @@ -87,9 +87,15 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay( - @["pubsub-topic-1", "pubsub-topic-2", "pubsub-topic-3", "pubsub-topic-x"] - ) + + let + shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0) + shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1) + shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2) + shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3) + shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4) + + await node.mountRelay(@[shard0, shard1, shard2, shard3]) var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -98,25 +104,19 @@ suite "Waku v2 Rest API - Relay": restPort = restServer.httpServer.address.port # update with bound port for client use let cache = MessageCache.init() - cache.pubsubSubscribe("pubsub-topic-1") - cache.pubsubSubscribe("pubsub-topic-2") - cache.pubsubSubscribe("pubsub-topic-3") - cache.pubsubSubscribe("pubsub-topic-x") + cache.pubsubSubscribe($shard0) + cache.pubsubSubscribe($shard1) + cache.pubsubSubscribe($shard2) + cache.pubsubSubscribe($shard3) installRelayApiHandlers(restServer.router, node, cache) restServer.start() - let pubSubTopics = - @[ - PubSubTopic("pubsub-topic-1"), - PubSubTopic("pubsub-topic-2"), - PubSubTopic("pubsub-topic-3"), - PubSubTopic("pubsub-topic-y"), - ] + let shards = @[$shard0, $shard1, $shard2, $shard4] # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) - let response = await client.relayDeleteSubscriptionsV1(pubSubTopics) + let response = await client.relayDeleteSubscriptionsV1(shards) # Then check: @@ -125,16 +125,16 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - not cache.isPubsubSubscribed("pubsub-topic-1") - not node.wakuRelay.isSubscribed("pubsub-topic-1") - not cache.isPubsubSubscribed("pubsub-topic-2") - not node.wakuRelay.isSubscribed("pubsub-topic-2") - not cache.isPubsubSubscribed("pubsub-topic-3") - not node.wakuRelay.isSubscribed("pubsub-topic-3") - cache.isPubsubSubscribed("pubsub-topic-x") - node.wakuRelay.isSubscribed("pubsub-topic-x") - not cache.isPubsubSubscribed("pubsub-topic-y") - not node.wakuRelay.isSubscribed("pubsub-topic-y") + not cache.isPubsubSubscribed($shard0) + not node.wakuRelay.isSubscribed($shard0) + not cache.isPubsubSubscribed($shard1) + not node.wakuRelay.isSubscribed($shard1) + not cache.isPubsubSubscribed($shard2) + not node.wakuRelay.isSubscribed($shard2) + cache.isPubsubSubscribed($shard3) + node.wakuRelay.isSubscribed($shard3) + not cache.isPubsubSubscribed($shard4) + not node.wakuRelay.isSubscribed($shard4) await restServer.stop() await restServer.closeWait() diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index ae8010e8f6..532a657fd0 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -312,14 +312,23 @@ type WakuNodeConf* = object name: "keep-alive" .}: bool + # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork + numShardsInNetwork* {. + desc: "Number of shards in the network", + defaultValue: 0, + name: "num-shards-in-network" + .}: uint32 + pubsubTopics* {. - desc: "Default pubsub topic to subscribe to. Argument may be repeated.", + desc: + "Deprecated. Default pubsub topic to subscribe to. Argument may be repeated.", defaultValue: @[], name: "pubsub-topic" .}: seq[string] shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", defaultValue: @[ uint16(0), diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 0d2b6ef5b3..064a03acd9 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -28,29 +28,8 @@ proc enrConfiguration*( enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - var shards = newSeq[uint16]() - - let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: - error "failed to parse pubsub topic, please format according to static shard specification", - error = $error - return err("failed to parse pubsub topic: " & $error) - - if shardsOpt.isSome(): - let relayShards = shardsOpt.get() - - if relayShards.clusterid != conf.clusterId: - error "pubsub topic corresponds to different shard than configured", - nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterid - return err("pubsub topic corresponds to different shard than configured") - - shards = relayShards.shardIds - elif conf.shards.len > 0: - shards = toSeq(conf.shards.mapIt(uint16(it))) - else: - info "no pubsub topics specified" - enrBuilder.withWakuRelaySharding( - RelayShards(clusterId: conf.clusterId, shardIds: shards) + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) ).isOkOr: return err("could not initialize ENR with shards") diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index ee3c4ef972..41678f5904 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -10,7 +10,7 @@ type ClusterConf* = object rlnRelayBandwidthThreshold*: int rlnEpochSizeSec*: uint64 rlnRelayUserMessageLimit*: uint64 - pubsubTopics*: seq[string] + numShardsInNetwork*: uint32 discv5Discovery*: bool discv5BootstrapNodes*: seq[string] @@ -28,11 +28,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayBandwidthThreshold: 0, rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, - pubsubTopics: - @[ - "/waku/2/rs/1/0", "/waku/2/rs/1/1", "/waku/2/rs/1/2", "/waku/2/rs/1/3", - "/waku/2/rs/1/4", "/waku/2/rs/1/5", "/waku/2/rs/1/6", "/waku/2/rs/1/7", - ], + numShardsInNetwork: 8, discv5Discovery: true, discv5BootstrapNodes: @[ diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2624b949e3..d6672f8a8a 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -113,6 +113,13 @@ proc initNode( ## Mount protocols +proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = + if conf.numShardsInNetwork != 0: + return conf.numShardsInNetwork + # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec + # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding + return uint32(MaxShardIndex + 1) + proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey ): Future[Result[void, string]] {.async.} = @@ -127,7 +134,14 @@ proc setupProtocols( node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) - node.mountSharding(conf.clusterId, uint32(conf.pubsubTopics.len)).isOkOr: + # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork + let numShardsInNetwork = getNumShardsInNetwork(conf) + + if conf.numShardsInNetwork == 0: + warn "Number of shards in network not configured, setting it to", + numShardsInNetwork = $numShardsInNetwork + + node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: return err("failed to mount waku sharding: " & error) # Mount relay on all nodes @@ -151,14 +165,20 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - let shards = - conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard")) + var autoShards: seq[RelayShard] + for contentTopic in conf.contentTopics: + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + return err("Could not parse content topic: " & error) + autoShards.add(shard) + debug "Shards created from content topics", - contentTopics = conf.contentTopics, shards = shards + contentTopics = conf.contentTopics, shards = autoShards - if conf.relay: - let pubsubTopics = conf.pubsubTopics & shards + let confShards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let shards = confShards & autoShards + if conf.relay: let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) @@ -166,10 +186,7 @@ proc setupProtocols( try: await mountRelay( - node, - pubsubTopics, - peerExchangeHandler = peerExchangeHandler, - int(parsedMaxMsgSize), + node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) ) except CatchableError: return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 28a6326b98..cdc795c30b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -68,7 +68,7 @@ proc logConfig(conf: WakuNodeConf) = info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers - for shard in conf.pubsubTopics: + for shard in conf.shards: info "Configuration. Shards", shard = shard for i in conf.discv5BootstrapNodes: @@ -86,6 +86,19 @@ proc logConfig(conf: WakuNodeConf) = func version*(waku: Waku): string = waku.version +proc validateShards(conf: WakuNodeConf): Result[void, string] = + let numShardsInNetwork = getNumShardsInNetwork(conf) + + for shard in conf.shards: + if shard >= numShardsInNetwork: + let msg = + "validateShards invalid shard: " & $shard & " when numShardsInNetwork: " & + $numShardsInNetwork # fmt doesn't work + error "validateShards failed", error = msg + return err(msg) + + return ok() + ## Initialisation proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = @@ -94,16 +107,35 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = logging.setupLog(conf.logLevel, conf.logFormat) + # TODO: remove after pubsubtopic config gets removed + var shards = newSeq[uint16]() + if conf.pubsubTopics.len > 0: + let shardsRes = topicsToRelayShards(conf.pubsubTopics) + if shardsRes.isErr(): + error "failed to parse pubsub topic, please format according to static shard specification", + error = shardsRes.error + return err("failed to parse pubsub topic: " & $shardsRes.error) + + let shardsOpt = shardsRes.get() + + if shardsOpt.isSome(): + let relayShards = shardsOpt.get() + if relayShards.clusterId != conf.clusterId: + error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22", + nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterId + return err( + "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22" + ) + + for shard in relayShards.shardIds: + shards.add(shard) + confCopy.shards = shards + case confCopy.clusterId # cluster-id=1 (aka The Waku Network) of 1: let twnClusterConf = ClusterConf.TheWakuNetworkConf() - if len(confCopy.shards) != 0: - confCopy.pubsubTopics = - confCopy.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) - else: - confCopy.pubsubTopics = twnClusterConf.pubsubTopics # Override configuration confCopy.maxMessageSize = twnClusterConf.maxMessageSize @@ -117,6 +149,7 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork # Only set rlnRelay to true if relay is configured if confCopy.relay: @@ -127,6 +160,11 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = info "Running nwaku node", version = git_version logConfig(confCopy) + let validateShardsRes = validateShards(confCopy) + if validateShardsRes.isErr(): + error "Failed validating shards", error = $validateShardsRes.error + return err("Failed validating shards: " & $validateShardsRes.error) + if not confCopy.nodekey.isSome(): let keyRes = crypto.PrivateKey.random(Secp256k1, rng[]) if keyRes.isErr(): diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e347f38ac7..2bcb065940 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -321,7 +321,7 @@ proc subscribe*( error "Autosharding error", error = error return - (shard, some(subscription.topic)) + ($shard, some(subscription.topic)) of PubsubSub: (subscription.topic, none(ContentTopic)) else: @@ -356,7 +356,7 @@ proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) = error "Autosharding error", error = error return - (shard, some(subscription.topic)) + ($shard, some(subscription.topic)) of PubsubUnsub: (subscription.topic, none(ContentTopic)) else: @@ -437,7 +437,7 @@ proc startRelay*(node: WakuNode) {.async.} = proc mountRelay*( node: WakuNode, - pubsubTopics: seq[string] = @[], + shards: seq[RelayShard] = @[], peerExchangeHandler = none(RoutingRecordsHandler), maxMessageSize = int(DefaultMaxWakuMessageSize), ) {.async, gcsafe.} = @@ -466,11 +466,11 @@ proc mountRelay*( node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) - info "relay mounted successfully", pubsubTopics = pubsubTopics + info "relay mounted successfully", shards = shards - # Subscribe to topics - for pubsubTopic in pubsubTopics: - node.subscribe((kind: PubsubSub, topic: pubsubTopic)) + # Subscribe to shards + for shard in shards: + node.subscribe((kind: PubsubSub, topic: $shard)) ## Waku filter diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 811e6cab05..325dcce06f 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -132,7 +132,8 @@ proc startRestServerProtocolSupport*( let handler = messageCacheHandler(cache) - for pubsubTopic in conf.pubsubTopics: + for shard in conf.shards: + let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard) cache.pubsubSubscribe(pubsubTopic) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index cbd65d653d..7ee0ee7e33 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -282,7 +282,7 @@ proc installRelayApiHandlers*( debug "Publishing message", contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil() - var publishFut = node.publish(some(pubsubTopic), message) + var publishFut = node.publish(some($pubsubTopic), message) if not await publishFut.withTimeout(futTimeout): return RestApiResponse.internalServerError("Failed to publish: timedout") diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index da9b493997..27ea271804 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -13,16 +13,16 @@ export parsing type PubsubTopic* = string -const DefaultPubsubTopic* = PubsubTopic("/waku/2/rs/0/0") - -## Namespaced pub-sub topic +## Relay Shard type RelayShard* = object clusterId*: uint16 shardId*: uint16 -proc staticSharding*(T: type RelayShard, clusterId, shardId: uint16): T = - return RelayShard(clusterId: clusterId, shardId: shardId) +const DefaultShardId* = uint16(0) +const DefaultClusterId* = uint16(0) +const DefaultRelayShard* = + RelayShard(clusterId: DefaultClusterId, shardId: DefaultShardId) # Serialization @@ -31,6 +31,8 @@ proc `$`*(topic: RelayShard): string = ## in the format `/waku/2/rs// return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId +const DefaultPubsubTopic* = $DefaultRelayShard + # Deserialization const @@ -67,7 +69,7 @@ proc parseStaticSharding*( ParsingError.invalidFormat($err) ) - ok(RelayShard.staticSharding(clusterId, shardId)) + ok(RelayShard(clusterId: clusterId, shardId: shardId)) proc parse*(T: type RelayShard, topic: PubsubTopic): ParsingResult[RelayShard] = ## Splits a namespaced topic string into its constituent parts. diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index 519e61da0c..4a4af4cb5d 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -27,7 +27,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar # This is equilavent to modulo shard count but faster let shard = hashValue and uint64((count - 1)) - RelayShard.staticSharding(s.clusterId, uint16(shard)) + RelayShard(clusterId: s.clusterId, shardId: uint16(shard)) proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] = ## Compute the (pubsub topic) shard to use for this content topic. @@ -42,13 +42,13 @@ proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] = else: return err("Generation > 0 are not supported yet") -proc getShard*(s: Sharding, topic: ContentTopic): Result[PubsubTopic, string] = +proc getShard*(s: Sharding, topic: ContentTopic): Result[RelayShard, string] = let parsedTopic = NsContentTopic.parse(topic).valueOr: return err($error) let shard = ?s.getShard(parsedTopic) - ok($shard) + ok(shard) proc parseSharding*( s: Sharding, @@ -130,7 +130,7 @@ proc parseSharding*( var list = newSeq[(RelayShard, float64)](shardCount) for (shard, weight) in shardsNWeights: - let pubsub = RelayShard.staticSharding(ClusterId, uint16(shard)) + let pubsub = RelayShard(clusterId: ClusterId, shardId: uint16(shard)) let clusterBytes = toBytesBE(uint16(ClusterId)) let shardBytes = toBytesBE(uint16(shard)) diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 6e15a95d3b..d1beb2864b 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -13,7 +13,7 @@ import ../common/enr, ../waku_core logScope: topics = "waku enr sharding" -const MaxShardIndex: uint16 = 1023 +const MaxShardIndex*: uint16 = 1023 const ShardingIndicesListEnrField* = "rs" @@ -25,7 +25,7 @@ type RelayShards* = object shardIds*: seq[uint16] func topics*(rs: RelayShards): seq[RelayShard] = - rs.shardIds.mapIt(RelayShard.staticSharding(rs.clusterId, it)) + rs.shardIds.mapIt(RelayShard(clusterId: rs.clusterId, shardId: it)) func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] = if shardId > MaxShardIndex: