From af84f0fb0926378a77f93047b15f7bdc2a119006 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 13:13:55 -0600 Subject: [PATCH 01/33] initial changes --- apps/networkmonitor/networkmonitor.nim | 12 +++++-- apps/networkmonitor/networkmonitor_config.nim | 13 +++++--- waku/factory/external_config.nim | 7 ++-- waku/factory/networks_config.nim | 8 ++--- waku/factory/node_factory.nim | 32 +++++++++++++------ waku/factory/waku.nim | 20 +++++++++++- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 40e42010a9..4fe53a66f6 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -566,6 +566,10 @@ when isMainModule: conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + conf.networkShards = twnClusterConf.networkShards + + if conf.shards.len == 0: + conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.networkShards - 1)) if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) @@ -631,9 +635,11 @@ when isMainModule: error "failed to mount waku metadata protocol: ", err = error quit 1 - for pubsubTopic in conf.pubsubTopics: - # Subscribe the node to the default pubsubtopic, to count messages - subscribeAndHandleMessages(node, pubsubTopic, msgPerContentTopic) + for shard in conf.shards: + # Subscribe the node to the shards, to count messages + subscribeAndHandleMessages( + node, $RelayShard(shardId: shard, clusterId: conf.clusterId), msgPerContentTopic + ) # spawn the routine that crawls the network # TODO: split into 3 routines (discovery, connections, ip2location) diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 22b74b5865..912780b589 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -38,10 +38,13 @@ type NetworkMonitorConf* = object name: "dns-discovery-url" .}: string - pubsubTopics* {. - desc: "Default pubsub topic to subscribe to. Argument may be repeated.", - name: "pubsub-topic" - .}: seq[string] + shards* {. + desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + name: "shard" + .}: seq[uint16] + + networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: + uint16 refreshInterval* {. desc: "How often new peers are discovered and connected to (in seconds)", @@ -55,7 +58,7 @@ type NetworkMonitorConf* = object "Cluster id that the node is running in. Node in a different cluster id is disconnected.", defaultValue: 1, name: "cluster-id" - .}: uint32 + .}: uint16 rlnRelay* {. desc: "Enable spam protection through rln-relay: true|false", diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index ae8010e8f6..1d6af3e38c 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -312,11 +312,8 @@ type WakuNodeConf* = object name: "keep-alive" .}: bool - pubsubTopics* {. - desc: "Default pubsub topic to subscribe to. Argument may be repeated.", - defaultValue: @[], - name: "pubsub-topic" - .}: seq[string] + networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: + uint32 shards* {. desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index ee3c4ef972..b337cce4ec 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -10,7 +10,7 @@ type ClusterConf* = object rlnRelayBandwidthThreshold*: int rlnEpochSizeSec*: uint64 rlnRelayUserMessageLimit*: uint64 - pubsubTopics*: seq[string] + networkShards*: uint32 discv5Discovery*: bool discv5BootstrapNodes*: seq[string] @@ -28,11 +28,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayBandwidthThreshold: 0, rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, - pubsubTopics: - @[ - "/waku/2/rs/1/0", "/waku/2/rs/1/1", "/waku/2/rs/1/2", "/waku/2/rs/1/3", - "/waku/2/rs/1/4", "/waku/2/rs/1/5", "/waku/2/rs/1/6", "/waku/2/rs/1/7", - ], + networkShards: 8, discv5Discovery: true, discv5BootstrapNodes: @[ diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2624b949e3..aa57620478 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -113,6 +113,12 @@ proc initNode( ## Mount protocols +proc getNetworkShards*(conf: WakuNodeConf): uint32 = + if conf.networkShards != 0: + return conf.networkShards + # If conf.networkShards is not set, use the number of shards configured as networkShards + return uint32(conf.shards.len) + proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey ): Future[Result[void, string]] {.async.} = @@ -127,7 +133,10 @@ proc setupProtocols( node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) - node.mountSharding(conf.clusterId, uint32(conf.pubsubTopics.len)).isOkOr: + # If conf.networkShards is not set, use the number of shards configured as networkShards + let networkShards = getNetworkShards(conf) + + node.mountSharding(conf.clusterId, networkShards).isOkOr: return err("failed to mount waku sharding: " & error) # Mount relay on all nodes @@ -151,14 +160,20 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - let shards = - conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard")) + var autoShards: seq[RelayShard] = @[] + for contentTopic in conf.contentTopics: + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + return err("Could not parse content topic: " & error) + autoShards.add(shard) + debug "Shards created from content topics", - contentTopics = conf.contentTopics, shards = shards + contentTopics = conf.contentTopics, shards = autoShards - if conf.relay: - let pubsubTopics = conf.pubsubTopics & shards + let confShards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let shards = confShards & autoShards + if conf.relay: let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) @@ -166,10 +181,7 @@ proc setupProtocols( try: await mountRelay( - node, - pubsubTopics, - peerExchangeHandler = peerExchangeHandler, - int(parsedMaxMsgSize), + node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) ) except CatchableError: return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 28a6326b98..dea4af4df2 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -68,7 +68,7 @@ proc logConfig(conf: WakuNodeConf) = info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers - for shard in conf.pubsubTopics: + for shard in conf.shards: info "Configuration. Shards", shard = shard for i in conf.discv5BootstrapNodes: @@ -86,6 +86,18 @@ proc logConfig(conf: WakuNodeConf) = func version*(waku: Waku): string = waku.version +proc validateShards(conf: WakuNodeConf): Result[void, string] = + let networkShards = getNetworkShards(conf) + + for shard in conf.shards: + if shard >= networkShards: + let msg = "Invalid shard: " & $shard & " when networkShards: " & $networkShards + # fmt doesn't work + error "validateShards failed", error = msg + return err(msg) + + return ok() + ## Initialisation proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = @@ -117,6 +129,7 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + confCopy.networkShards = twnClusterConf.networkShards # Only set rlnRelay to true if relay is configured if confCopy.relay: @@ -127,6 +140,11 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = info "Running nwaku node", version = git_version logConfig(confCopy) + let validateShardsRes = validateShards(conf) + if validateShardsRes.isErr(): + error "Failed validating shards", error = $validateShardsRes.error + return err("Failed validating shards: " & $validateShardsRes.error) + if not confCopy.nodekey.isSome(): let keyRes = crypto.PrivateKey.random(Secp256k1, rng[]) if keyRes.isErr(): From d4432e4a45c9fea5346d255d819a479f5551d7a5 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 14:11:12 -0600 Subject: [PATCH 02/33] fixing some compilation errors --- waku/factory/internal_config.nim | 8 +++++++- waku/factory/node_factory.nim | 2 +- waku/node/waku_node.nim | 14 +++++++------- waku/waku_api/rest/builder.nim | 3 ++- waku/waku_api/rest/relay/handlers.nim | 2 +- waku/waku_core/topics/sharding.nim | 4 ++-- 6 files changed, 20 insertions(+), 13 deletions(-) diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 0d2b6ef5b3..355ac40c62 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -30,11 +30,17 @@ proc enrConfiguration*( var shards = newSeq[uint16]() - let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: + # Only for dev purposes. After everything compiles, remove this block and uncomment the one below + let shardsOpt = topicsToRelayShards(@["/waku/2/rs/0/0"]).valueOr: error "failed to parse pubsub topic, please format according to static shard specification", error = $error return err("failed to parse pubsub topic: " & $error) + #[ let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: + error "failed to parse pubsub topic, please format according to static shard specification", + error = $error + return err("failed to parse pubsub topic: " & $error) ]# + if shardsOpt.isSome(): let relayShards = shardsOpt.get() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index aa57620478..0fad26dedf 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -160,7 +160,7 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - var autoShards: seq[RelayShard] = @[] + var autoShards: seq[RelayShard] for contentTopic in conf.contentTopics: let shard = node.wakuSharding.getShard(contentTopic).valueOr: return err("Could not parse content topic: " & error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e347f38ac7..2bcb065940 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -321,7 +321,7 @@ proc subscribe*( error "Autosharding error", error = error return - (shard, some(subscription.topic)) + ($shard, some(subscription.topic)) of PubsubSub: (subscription.topic, none(ContentTopic)) else: @@ -356,7 +356,7 @@ proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) = error "Autosharding error", error = error return - (shard, some(subscription.topic)) + ($shard, some(subscription.topic)) of PubsubUnsub: (subscription.topic, none(ContentTopic)) else: @@ -437,7 +437,7 @@ proc startRelay*(node: WakuNode) {.async.} = proc mountRelay*( node: WakuNode, - pubsubTopics: seq[string] = @[], + shards: seq[RelayShard] = @[], peerExchangeHandler = none(RoutingRecordsHandler), maxMessageSize = int(DefaultMaxWakuMessageSize), ) {.async, gcsafe.} = @@ -466,11 +466,11 @@ proc mountRelay*( node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) - info "relay mounted successfully", pubsubTopics = pubsubTopics + info "relay mounted successfully", shards = shards - # Subscribe to topics - for pubsubTopic in pubsubTopics: - node.subscribe((kind: PubsubSub, topic: pubsubTopic)) + # Subscribe to shards + for shard in shards: + node.subscribe((kind: PubsubSub, topic: $shard)) ## Waku filter diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 811e6cab05..325dcce06f 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -132,7 +132,8 @@ proc startRestServerProtocolSupport*( let handler = messageCacheHandler(cache) - for pubsubTopic in conf.pubsubTopics: + for shard in conf.shards: + let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard) cache.pubsubSubscribe(pubsubTopic) node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index cbd65d653d..7ee0ee7e33 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -282,7 +282,7 @@ proc installRelayApiHandlers*( debug "Publishing message", contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil() - var publishFut = node.publish(some(pubsubTopic), message) + var publishFut = node.publish(some($pubsubTopic), message) if not await publishFut.withTimeout(futTimeout): return RestApiResponse.internalServerError("Failed to publish: timedout") diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index 519e61da0c..a091e78739 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -42,13 +42,13 @@ proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] = else: return err("Generation > 0 are not supported yet") -proc getShard*(s: Sharding, topic: ContentTopic): Result[PubsubTopic, string] = +proc getShard*(s: Sharding, topic: ContentTopic): Result[RelayShard, string] = let parsedTopic = NsContentTopic.parse(topic).valueOr: return err($error) let shard = ?s.getShard(parsedTopic) - ok($shard) + ok(shard) proc parseSharding*( s: Sharding, From 86a14810968e498533895b6a708db74d231b08b0 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 14:45:35 -0600 Subject: [PATCH 03/33] fixing compilation error --- waku/factory/waku.nim | 5 ----- 1 file changed, 5 deletions(-) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index dea4af4df2..a2e889a22b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -111,11 +111,6 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = # cluster-id=1 (aka The Waku Network) of 1: let twnClusterConf = ClusterConf.TheWakuNetworkConf() - if len(confCopy.shards) != 0: - confCopy.pubsubTopics = - confCopy.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) - else: - confCopy.pubsubTopics = twnClusterConf.pubsubTopics # Override configuration confCopy.maxMessageSize = twnClusterConf.maxMessageSize From cbcb93bdcba28066a99be23fbaae6ba6faf553fc Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 15:07:55 -0600 Subject: [PATCH 04/33] chat2 fix and code to temporarily convert from pubsub topics to shards --- apps/chat2/chat2.nim | 6 ++++-- waku/factory/waku.nim | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index dd2694cf77..f844deada7 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -6,7 +6,7 @@ when not (compileOption("threads")): {.push raises: [].} -import std/[strformat, strutils, times, options, random] +import std/[strformat, strutils, times, options, random, sequtils] import confutils, chronicles, @@ -379,7 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed") if conf.relay: - await node.mountRelay(conf.topics.split(" ")) + let shards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + await node.mountRelay(shards) await node.mountLibp2pPing() diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index a2e889a22b..fe8255caf1 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -106,6 +106,20 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = logging.setupLog(conf.logLevel, conf.logFormat) + # TODO: remove after pubsubtopic config gets removed + #[ let shards = newSeq[uint16]() + if conf.pubsubTopics.length > 0: + let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: + error "failed to parse pubsub topic, please format according to static shard specification", + error = $error + return err("failed to parse pubsub topic: " & $error) + + if shardsOpt.isSome(): + let relayShards = shardsOpt.get() + for shard in relayShards: + shards.add(shard.shardId) + confCopy.shards = shards ]# + case confCopy.clusterId # cluster-id=1 (aka The Waku Network) From 7e69272e5772d78f1d68145c3c1ad6c30350c832 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 15:19:55 -0600 Subject: [PATCH 05/33] fixes to testlib --- tests/testlib/wakunode.nim | 21 +++++++++------------ waku/waku_core/topics/pubsub_topic.nim | 3 ++- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index eb679859b8..96c2deeab7 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -38,7 +38,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = maxConnections: 50, maxMessageSize: "1024 KiB", clusterId: 0, - pubsubTopics: @["/waku/2/rs/0/0"], + shards: @[uint16(0)], relay: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) @@ -63,8 +63,9 @@ proc newTestWakuNode*( dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - pubsubTopics: seq[string] = @["/waku/2/rs/1/0"], peerStoreCapacity = none(int), + clusterId = DefaultClusterId, + shards = @[DefaultShardId], ): WakuNode = var resolvedExtIp = extIp @@ -77,14 +78,8 @@ proc newTestWakuNode*( var conf = defaultTestWakuNodeConf() - let clusterId = - if pubsubTopics.len() > 0: - RelayShard.parse(pubsubTopics[0]).get().clusterId - else: - 1.uint16 - conf.clusterId = clusterId - conf.pubsubTopics = pubsubTopics + conf.shards = shards if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails @@ -95,7 +90,7 @@ proc newTestWakuNode*( let netConf = NetConfig.init( bindIp = bindIp, - clusterId = clusterId, + clusterId = conf.clusterId, bindPort = bindPort, extIp = resolvedExtIp, extPort = extPort, @@ -111,8 +106,10 @@ proc newTestWakuNode*( var enrBuilder = EnrBuilder.init(nodeKey) - enrBuilder.withShardedTopics(pubsubTopics).isOkOr: - raise newException(Defect, "Invalid record: " & error) + enrBuilder.withWakuRelaySharding( + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + ).isOkOr: + raise newException(Defect, "Invalid record: " & $error) enrBuilder.withIpAddressAndPorts( ipAddr = netConf.enrIp, tcpPort = netConf.enrPort, udpPort = netConf.discv5UdpPort diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index da9b493997..8955560447 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -13,7 +13,8 @@ export parsing type PubsubTopic* = string -const DefaultPubsubTopic* = PubsubTopic("/waku/2/rs/0/0") +const DefaultShardId* = uint16(0) +const DefaultClusterId* = uint16(0) ## Namespaced pub-sub topic From 20c6a24835dceb41976f81ffc80f7085406b2106 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 15:30:08 -0600 Subject: [PATCH 06/33] adding defaults and removing redundant staticSharding proc --- tests/test_waku_enr.nim | 2 +- tests/waku_core/test_namespaced_topics.nim | 2 +- tests/waku_core/topics/test_pubsub_topic.nim | 4 +-- tests/waku_core/topics/test_sharding.nim | 28 ++++++++++---------- waku/waku_core/topics/pubsub_topic.nim | 13 +++++---- waku/waku_core/topics/sharding.nim | 4 +-- waku/waku_enr/sharding.nim | 2 +- 7 files changed, 27 insertions(+), 28 deletions(-) diff --git a/tests/test_waku_enr.nim b/tests/test_waku_enr.nim index bf657d43d1..7180302526 100644 --- a/tests/test_waku_enr.nim +++ b/tests/test_waku_enr.nim @@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding": clusterId: uint16 = 22 shardId: uint16 = 1 - let shard = RelayShard.staticSharding(clusterId, shardId) + let shard = RelayShard(clusterId, shardId) ## When let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards") diff --git a/tests/waku_core/test_namespaced_topics.nim b/tests/waku_core/test_namespaced_topics.nim index ca69269363..de938a03a0 100644 --- a/tests/waku_core/test_namespaced_topics.nim +++ b/tests/waku_core/test_namespaced_topics.nim @@ -136,7 +136,7 @@ suite "Waku Message - Content topics namespacing": suite "Waku Message - Pub-sub topics namespacing": test "Stringify static sharding pub-sub topic": ## Given - var shard = RelayShard.staticSharding(clusterId = 0, shardId = 2) + var shard = RelayShard(clusterId = 0, shardId = 2) ## When let topic = $shard diff --git a/tests/waku_core/topics/test_pubsub_topic.nim b/tests/waku_core/topics/test_pubsub_topic.nim index bbdc894553..98fe6b7c23 100644 --- a/tests/waku_core/topics/test_pubsub_topic.nim +++ b/tests/waku_core/topics/test_pubsub_topic.nim @@ -10,10 +10,10 @@ suite "Static Sharding Functionality": check: shard.clusterId == 0 shard.shardId == 1 - shard == RelayShard.staticSharding(0, 1) + shard == RelayShard(0, 1) test "Pubsub Topic Naming Compliance": - let shard = RelayShard.staticSharding(0, 1) + let shard = RelayShard(0, 1) check: shard.clusterId == 0 shard.shardId == 1 diff --git a/tests/waku_core/topics/test_sharding.nim b/tests/waku_core/topics/test_sharding.nim index db0ea6d979..faf454876c 100644 --- a/tests/waku_core/topics/test_sharding.nim +++ b/tests/waku_core/topics/test_sharding.nim @@ -54,16 +54,16 @@ suite "Autosharding": # Then the generated shards are valid check: - shard1 == RelayShard.staticSharding(ClusterId, 3) - shard2 == RelayShard.staticSharding(ClusterId, 3) - shard3 == RelayShard.staticSharding(ClusterId, 6) - shard4 == RelayShard.staticSharding(ClusterId, 6) - shard5 == RelayShard.staticSharding(ClusterId, 3) - shard6 == RelayShard.staticSharding(ClusterId, 3) - shard7 == RelayShard.staticSharding(ClusterId, 3) - shard8 == RelayShard.staticSharding(ClusterId, 3) - shard9 == RelayShard.staticSharding(ClusterId, 7) - shard10 == RelayShard.staticSharding(ClusterId, 3) + shard1 == RelayShard(ClusterId, 3) + shard2 == RelayShard(ClusterId, 3) + shard3 == RelayShard(ClusterId, 6) + shard4 == RelayShard(ClusterId, 6) + shard5 == RelayShard(ClusterId, 3) + shard6 == RelayShard(ClusterId, 3) + shard7 == RelayShard(ClusterId, 3) + shard8 == RelayShard(ClusterId, 3) + shard9 == RelayShard(ClusterId, 7) + shard10 == RelayShard(ClusterId, 3) suite "getShard from NsContentTopic": test "Generate Gen0 Shard with topic.generation==none": @@ -75,7 +75,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -85,7 +85,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = @@ -106,7 +106,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -116,7 +116,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard.staticSharding(ClusterId, 3) + shard.value() == RelayShard(ClusterId, 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index 8955560447..e82217e946 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -13,17 +13,16 @@ export parsing type PubsubTopic* = string -const DefaultShardId* = uint16(0) -const DefaultClusterId* = uint16(0) - -## Namespaced pub-sub topic +## Relay Shard type RelayShard* = object clusterId*: uint16 shardId*: uint16 -proc staticSharding*(T: type RelayShard, clusterId, shardId: uint16): T = - return RelayShard(clusterId: clusterId, shardId: shardId) +const DefaultShardId* = uint16(0) +const DefaultClusterId* = uint16(0) +const DefaultRelayShard* = + RelayShard(clusterId: DefaultClusterId, shardId: DefaultShardId) # Serialization @@ -68,7 +67,7 @@ proc parseStaticSharding*( ParsingError.invalidFormat($err) ) - ok(RelayShard.staticSharding(clusterId, shardId)) + ok(RelayShard(clusterId, shardId)) proc parse*(T: type RelayShard, topic: PubsubTopic): ParsingResult[RelayShard] = ## Splits a namespaced topic string into its constituent parts. diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index a091e78739..32c3533e6f 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -27,7 +27,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar # This is equilavent to modulo shard count but faster let shard = hashValue and uint64((count - 1)) - RelayShard.staticSharding(s.clusterId, uint16(shard)) + RelayShard(s.clusterId, uint16(shard)) proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] = ## Compute the (pubsub topic) shard to use for this content topic. @@ -130,7 +130,7 @@ proc parseSharding*( var list = newSeq[(RelayShard, float64)](shardCount) for (shard, weight) in shardsNWeights: - let pubsub = RelayShard.staticSharding(ClusterId, uint16(shard)) + let pubsub = RelayShard(ClusterId, uint16(shard)) let clusterBytes = toBytesBE(uint16(ClusterId)) let shardBytes = toBytesBE(uint16(shard)) diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 6e15a95d3b..c4ce9cb342 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -25,7 +25,7 @@ type RelayShards* = object shardIds*: seq[uint16] func topics*(rs: RelayShards): seq[RelayShard] = - rs.shardIds.mapIt(RelayShard.staticSharding(rs.clusterId, it)) + rs.shardIds.mapIt(RelayShard(rs.clusterId, it)) func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] = if shardId > MaxShardIndex: From 10d5a2e014dd4e2b4f08eba0ccbd27416349fe07 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 15:43:29 -0600 Subject: [PATCH 07/33] fixing compilation errors --- tests/waku_core/topics/test_pubsub_topic.nim | 4 +-- tests/waku_core/topics/test_sharding.nim | 28 ++++++++++---------- waku/waku_core/topics/pubsub_topic.nim | 2 +- waku/waku_core/topics/sharding.nim | 4 +-- waku/waku_enr/sharding.nim | 2 +- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/waku_core/topics/test_pubsub_topic.nim b/tests/waku_core/topics/test_pubsub_topic.nim index 98fe6b7c23..4807d30d13 100644 --- a/tests/waku_core/topics/test_pubsub_topic.nim +++ b/tests/waku_core/topics/test_pubsub_topic.nim @@ -10,10 +10,10 @@ suite "Static Sharding Functionality": check: shard.clusterId == 0 shard.shardId == 1 - shard == RelayShard(0, 1) + shard == RelayShard(clusterId: 0, shardId: 1) test "Pubsub Topic Naming Compliance": - let shard = RelayShard(0, 1) + let shard = RelayShard(clusterId: 0, shardId: 1) check: shard.clusterId == 0 shard.shardId == 1 diff --git a/tests/waku_core/topics/test_sharding.nim b/tests/waku_core/topics/test_sharding.nim index faf454876c..33c38b4304 100644 --- a/tests/waku_core/topics/test_sharding.nim +++ b/tests/waku_core/topics/test_sharding.nim @@ -54,16 +54,16 @@ suite "Autosharding": # Then the generated shards are valid check: - shard1 == RelayShard(ClusterId, 3) - shard2 == RelayShard(ClusterId, 3) - shard3 == RelayShard(ClusterId, 6) - shard4 == RelayShard(ClusterId, 6) - shard5 == RelayShard(ClusterId, 3) - shard6 == RelayShard(ClusterId, 3) - shard7 == RelayShard(ClusterId, 3) - shard8 == RelayShard(ClusterId, 3) - shard9 == RelayShard(ClusterId, 7) - shard10 == RelayShard(ClusterId, 3) + shard1 == RelayShard(clusterId: ClusterId, shardId: 3) + shard2 == RelayShard(clusterId: ClusterId, shardId: 3) + shard3 == RelayShard(clusterId: ClusterId, shardId: 6) + shard4 == RelayShard(clusterId: ClusterId, shardId: 6) + shard5 == RelayShard(clusterId: ClusterId, shardId: 3) + shard6 == RelayShard(clusterId: ClusterId, shardId: 3) + shard7 == RelayShard(clusterId: ClusterId, shardId: 3) + shard8 == RelayShard(clusterId: ClusterId, shardId: 3) + shard9 == RelayShard(clusterId: ClusterId, shardId: 7) + shard10 == RelayShard(clusterId: ClusterId, shardId: 3) suite "getShard from NsContentTopic": test "Generate Gen0 Shard with topic.generation==none": @@ -75,7 +75,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -85,7 +85,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = @@ -106,7 +106,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==0": let sharding = @@ -116,7 +116,7 @@ suite "Autosharding": # Then the generated shard is valid check: - shard.value() == RelayShard(ClusterId, 3) + shard.value() == RelayShard(clusterId: ClusterId, shardId: 3) test "Generate Gen0 Shard with topic.generation==other": let sharding = diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index e82217e946..ad89070f39 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -67,7 +67,7 @@ proc parseStaticSharding*( ParsingError.invalidFormat($err) ) - ok(RelayShard(clusterId, shardId)) + ok(RelayShard(clusterId: clusterId, shardId: shardId)) proc parse*(T: type RelayShard, topic: PubsubTopic): ParsingResult[RelayShard] = ## Splits a namespaced topic string into its constituent parts. diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index 32c3533e6f..4a4af4cb5d 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -27,7 +27,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar # This is equilavent to modulo shard count but faster let shard = hashValue and uint64((count - 1)) - RelayShard(s.clusterId, uint16(shard)) + RelayShard(clusterId: s.clusterId, shardId: uint16(shard)) proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] = ## Compute the (pubsub topic) shard to use for this content topic. @@ -130,7 +130,7 @@ proc parseSharding*( var list = newSeq[(RelayShard, float64)](shardCount) for (shard, weight) in shardsNWeights: - let pubsub = RelayShard(ClusterId, uint16(shard)) + let pubsub = RelayShard(clusterId: ClusterId, shardId: uint16(shard)) let clusterBytes = toBytesBE(uint16(ClusterId)) let shardBytes = toBytesBE(uint16(shard)) diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index c4ce9cb342..3b7cdae149 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -25,7 +25,7 @@ type RelayShards* = object shardIds*: seq[uint16] func topics*(rs: RelayShards): seq[RelayShard] = - rs.shardIds.mapIt(RelayShard(rs.clusterId, it)) + rs.shardIds.mapIt(RelayShard(clusterId: rs.clusterId, shardId: it)) func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] = if shardId > MaxShardIndex: From 0db34d2ce83e4b6ed5dd1bbad4f61d0c0bd0083f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:02:01 -0600 Subject: [PATCH 08/33] adding default pubsub topic --- waku/waku_core/topics/pubsub_topic.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index ad89070f39..27ea271804 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -31,6 +31,8 @@ proc `$`*(topic: RelayShard): string = ## in the format `/waku/2/rs// return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId +const DefaultPubsubTopic* = $DefaultRelayShard + # Deserialization const From 57f5ef0e60bec15b20d89166b4fc42170f096389 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:06:08 -0600 Subject: [PATCH 09/33] fix syntax error --- tests/waku_core/test_namespaced_topics.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_core/test_namespaced_topics.nim b/tests/waku_core/test_namespaced_topics.nim index de938a03a0..8c3cee1fc2 100644 --- a/tests/waku_core/test_namespaced_topics.nim +++ b/tests/waku_core/test_namespaced_topics.nim @@ -136,7 +136,7 @@ suite "Waku Message - Content topics namespacing": suite "Waku Message - Pub-sub topics namespacing": test "Stringify static sharding pub-sub topic": ## Given - var shard = RelayShard(clusterId = 0, shardId = 2) + var shard = RelayShard(clusterId: 0, shardId: 2) ## When let topic = $shard From be46c5eeeb9114359b677ccff8478044da8bafa2 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:14:38 -0600 Subject: [PATCH 10/33] fix type --- tests/waku_relay/utils.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index f63a100827..306af6d0f2 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -53,9 +53,9 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} = ) proc setupRelayWithRln*( - node: WakuNode, identifier: uint, pubsubTopics: seq[string] + node: WakuNode, identifier: uint, shards: seq[RelayShards] ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await setupRln(node, identifier) proc subscribeToContentTopicWithHandler*( From 80e1ba1d2071f2d4350bcb2eb00461a0b9112ca1 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:23:19 -0600 Subject: [PATCH 11/33] fix typo --- tests/waku_relay/utils.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index 306af6d0f2..c1a085b103 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -53,7 +53,7 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} = ) proc setupRelayWithRln*( - node: WakuNode, identifier: uint, shards: seq[RelayShards] + node: WakuNode, identifier: uint, shards: seq[RelayShard] ) {.async.} = await node.mountRelay(shards) await setupRln(node, identifier) From f932b780c1a90316ac22427bbcab3e0a60b5dd5a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:35:29 -0600 Subject: [PATCH 12/33] fix more compilation errors --- tests/waku_relay/test_wakunode_relay.nim | 92 ++++++++++++------------ 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 7c44ec90f1..00c5449dae 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -63,19 +63,19 @@ suite "WakuNode - Relay": node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0)) nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + await node3.mountRelay(@[shard]) await allFutures( node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]), @@ -87,15 +87,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - var res = await node1.publish(some(pubSubTopic), message) + var res = await node1.publish(some($shard), message) assert res.isOk(), $res.error ## Then @@ -124,7 +124,7 @@ suite "WakuNode - Relay": nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic1 = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) @@ -135,13 +135,13 @@ suite "WakuNode - Relay": # start all the nodes await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + await node3.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -155,7 +155,7 @@ suite "WakuNode - Relay": ): Future[ValidationResult] {.async.} = ## the validator that only allows messages with contentTopic1 to be relayed check: - topic == pubSubTopic + topic == $shard # only relay messages with contentTopic1 if msg.contentTopic != contentTopic1: @@ -172,22 +172,22 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard # check that only messages with contentTopic1 is relayed (but not contentTopic2) msg.contentTopic == contentTopic1 # relay handler is called completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - var res = await node1.publish(some(pubSubTopic), message1) + var res = await node1.publish(some($shard), message1) assert res.isOk(), $res.error await sleepAsync(500.millis) # message2 never gets relayed because of the validator - res = await node1.publish(some(pubSubTopic), message2) + res = await node1.publish(some($shard), message2) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -258,16 +258,16 @@ suite "WakuNode - Relay": wsBindPort = Port(0), wsEnabled = true, ) - pubSubTopic = "test" + pubSubTopic = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -276,15 +276,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -306,16 +306,16 @@ suite "WakuNode - Relay": ) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -324,15 +324,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -354,16 +354,16 @@ suite "WakuNode - Relay": wsBindPort = Port(0), wsEnabled = true, ) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) #delete websocket peer address # TODO: a better way to find the index - this is too brittle @@ -376,15 +376,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -408,16 +408,16 @@ suite "WakuNode - Relay": ) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0)) - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -426,15 +426,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) @@ -466,16 +466,16 @@ suite "WakuNode - Relay": ) let - pubSubTopic = "test" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -484,15 +484,15 @@ suite "WakuNode - Relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(500.millis) - let res = await node2.publish(some(pubSubTopic), message) + let res = await node2.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(500.millis) From 0fccc840cec02636350a6b13a10d048b721363d2 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 26 Aug 2024 16:45:14 -0600 Subject: [PATCH 13/33] fix variable name --- tests/waku_relay/test_wakunode_relay.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 00c5449dae..b93875e831 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -258,7 +258,7 @@ suite "WakuNode - Relay": wsBindPort = Port(0), wsEnabled = true, ) - pubSubTopic = DefaultRelayShard + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) From 1b0ce73cc28f9d0e093e1b86b19400186702c727 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 10:26:52 -0600 Subject: [PATCH 14/33] fixing chat2 compilation --- apps/chat2/config_chat2.nim | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/chat2/config_chat2.nim b/apps/chat2/config_chat2.nim index 417d298a3f..d329e0c570 100644 --- a/apps/chat2/config_chat2.nim +++ b/apps/chat2/config_chat2.nim @@ -83,11 +83,18 @@ type name: "keep-alive" .}: bool - topics* {. - desc: "Default topics to subscribe to (space separated list).", - defaultValue: "/waku/2/rs/0/0", - name: "topics" - .}: string + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 0, + name: "cluster-id" + .}: uint16 + + shards* {. + desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + defaultValue: @[uint16(0)], + name: "shard" + .}: seq[uint16] ## Store config store* {. From 9d2134b0c3cdf0db527eebe95b6204604276b223 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 10:45:08 -0600 Subject: [PATCH 15/33] fixing tests --- tests/node/test_wakunode_relay_rln.nim | 6 +-- tests/test_relay_peer_exchange.nim | 10 ++--- tests/test_wakunode.nim | 6 +-- tests/test_wakunode_lightpush.nim | 4 +- .../test_wakunode_rln_relay.nim | 41 +++++++++---------- tests/waku_rln_relay/utils_static.nim | 4 +- 6 files changed, 34 insertions(+), 37 deletions(-) diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index e1bdb7c6a7..1304575c7d 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -96,9 +96,9 @@ proc getWakuRlnConfigOnChain*( ) proc setupRelayWithOnChainRln*( - node: WakuNode, pubsubTopics: seq[string], wakuRlnConfig: WakuRlnConfig + node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await node.mountRlnRelay(wakuRlnConfig) suite "Waku RlnRelay - End to End - Static": @@ -223,7 +223,7 @@ suite "Waku RlnRelay - End to End - Static": nodekey = generateSecp256k1Key() node = newTestWakuNode(nodekey, parseIpAddress("0.0.0.0"), Port(0)) - await node.mountRelay(@[DefaultPubsubTopic]) + await node.mountRelay(@[DefaultRelayShard]) let contractAddress = await uploadRLNContract(EthClient) let wakuRlnConfig = WakuRlnConfig( diff --git a/tests/test_relay_peer_exchange.nim b/tests/test_relay_peer_exchange.nim index be27365a63..0be3c91932 100644 --- a/tests/test_relay_peer_exchange.nim +++ b/tests/test_relay_peer_exchange.nim @@ -25,8 +25,8 @@ procSuite "Relay (GossipSub) Peer Exchange": newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true) # When both client and server mount relay without a handler - await node1.mountRelay(@[DefaultPubsubTopic]) - await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler)) + await node1.mountRelay(@[DefaultRelayShard]) + await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler)) # Then the relays are mounted without a handler check: @@ -75,9 +75,9 @@ procSuite "Relay (GossipSub) Peer Exchange": peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler # Givem the nodes mount relay with a peer exchange handler - await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) - await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) - await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle)) + await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) + await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) + await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle)) # Ensure that node1 prunes all peers after the first connection node1.wakuRelay.parameters.dHigh = 1 diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index 4640d49f9b..e3be3e252e 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -28,7 +28,7 @@ suite "WakuNode": node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000)) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002)) - pubSubTopic = "/waku/2/rs/0/0" + shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() message = WakuMessage(payload: payload, contentTopic: contentTopic) @@ -36,13 +36,13 @@ suite "WakuNode": # Setup node 1 with stable codec "/vac/waku/relay/2.0.0" await node1.start() - await node1.mountRelay(@[pubSubTopic]) + await node1.mountRelay(@[shard]) node1.wakuRelay.codec = "/vac/waku/relay/2.0.0" # Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2" await node2.start() - await node2.mountRelay(@[pubSubTopic]) + await node2.mountRelay(@[shard]) node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2" check: diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 5f2c5e2df2..c680fb4685 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -19,8 +19,8 @@ suite "WakuNode - Lightpush": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - await destNode.mountRelay(@[DefaultPubsubTopic]) - await bridgeNode.mountRelay(@[DefaultPubsubTopic]) + await destNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountRelay(@[DefaultRelayShard]) await bridgeNode.mountLightPush() lightNode.mountLightPushClient() diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index e227a0bb79..bc7e7a1504 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -50,7 +50,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -66,7 +66,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -81,7 +81,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -131,18 +131,15 @@ procSuite "WakuNode - RLN relay": await node2.stop() await node3.stop() - asyncTest "testing rln-relay is applied in all rln pubsub/content topics": + asyncTest "testing rln-relay is applied in all rln shards/content topics": # create 3 nodes let nodes = toSeq(0 ..< 3).mapIt( newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) ) await allFutures(nodes.mapIt(it.start())) - let pubsubTopics = - @[ - PubsubTopic("/waku/2/pubsubtopic-a/proto"), - PubsubTopic("/waku/2/pubsubtopic-b/proto"), - ] + let shards = + @[RelayShard(clusterId: 0, shard: 0), RelayShard(clusterId: 0, shard: 1)] let contentTopics = @[ ContentTopic("/waku/2/content-topic-a/proto"), @@ -150,7 +147,7 @@ procSuite "WakuNode - RLN relay": ] # set up three nodes - await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics))) + await allFutures(nodes.mapIt(it.mountRelay(shards))) # mount rlnrelay in off-chain mode for index, node in nodes: @@ -245,7 +242,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -261,7 +258,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -276,7 +273,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -361,7 +358,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic]) + await node1.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -377,7 +374,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( @@ -392,7 +389,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic]) + await node3.mountRelay(@[DefaultRelayShard]) # mount rlnrelay in off-chain mode let wakuRlnConfig3 = WakuRlnConfig( @@ -485,7 +482,7 @@ procSuite "WakuNode - RLN relay": # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopicSeq = @[DefaultPubsubTopic] + shardSeq = @[DefaultRelayShard] nodeKey1 = generateSecp256k1Key() node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0)) nodeKey2 = generateSecp256k1Key() @@ -493,12 +490,12 @@ procSuite "WakuNode - RLN relay": epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4 # Given both nodes mount relay and rlnrelay - await node1.mountRelay(pubsubTopicSeq) + await node1.mountRelay(shardSeq) let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) @@ -613,7 +610,7 @@ procSuite "WakuNode - RLN relay": # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopicSeq = @[DefaultPubsubTopic] + shardSeq = @[DefaultRelayShard] nodeKey1 = generateSecp256k1Key() node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0)) nodeKey2 = generateSecp256k1Key() @@ -622,12 +619,12 @@ procSuite "WakuNode - RLN relay": # Given both nodes mount relay and rlnrelay # Mount rlnrelay in node1 in off-chain mode - await node1.mountRelay(pubsubTopicSeq) + await node1.mountRelay(shardSeq) let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultRelayShard]) let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) diff --git a/tests/waku_rln_relay/utils_static.nim b/tests/waku_rln_relay/utils_static.nim index 00f29c7048..d2a781fcdb 100644 --- a/tests/waku_rln_relay/utils_static.nim +++ b/tests/waku_rln_relay/utils_static.nim @@ -32,9 +32,9 @@ proc setupStaticRln*( ) proc setupRelayWithStaticRln*( - node: WakuNode, identifier: uint, pubsubTopics: seq[string] + node: WakuNode, identifier: uint, shards: seq[RelayShard] ) {.async.} = - await node.mountRelay(pubsubTopics) + await node.mountRelay(shards) await setupStaticRln(node, identifier) proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] = From 3913762f3a71cc6580e3bd0ef61af901860ba438 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 10:55:19 -0600 Subject: [PATCH 16/33] fix compilation error --- tests/test_wakunode.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index e3be3e252e..975070465a 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -58,15 +58,15 @@ suite "WakuNode": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == pubSubTopic + topic == $shard msg.contentTopic == contentTopic msg.payload == payload completionFut.complete(true) - node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) await sleepAsync(2000.millis) - var res = await node1.publish(some(pubSubTopic), message) + var res = await node1.publish(some($shard), message) assert res.isOk(), $res.error await sleepAsync(2000.millis) From 0e91c276273ab2838d7658525f7b2bfd52871d6a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 11:08:37 -0600 Subject: [PATCH 17/33] more compilation errors --- tests/node/peer_manager/test_peer_manager.nim | 34 ++++++++----------- tests/test_peer_manager.nim | 6 ++-- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index 3d0cb08c2c..940d7d115f 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -29,18 +29,16 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() clusterId = 1 - shardTopic0 = "/waku/2/rs/" & $clusterId & "/0" - shardTopic1 = "/waku/2/rs/" & $clusterId & "/1" + shard0 = RelayShard(clusterId: clusterId, shardId: 0) + shard1 = RelayShard(clusterId: clusterId, shardId: 1) asyncTest "light client is not disconnected": # Given two nodes with the same shardId let - server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] - ) - client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1] - ) + server = + newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) # And both mount metadata and filter discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic @@ -70,12 +68,10 @@ suite "Peer Manager": asyncTest "relay with same shardId is not disconnected": # Given two nodes with the same shardId let - server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] - ) - client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] - ) + server = + newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic @@ -103,12 +99,10 @@ suite "Peer Manager": asyncTest "relay with different shardId is disconnected": # Given two nodes with different shardIds let - server = newTestWakuNode( - serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0] - ) - client = newTestWakuNode( - clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1] - ) + server = + newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 639ea3983d..8e882b8351 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -418,7 +418,7 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/3/0"], + shards = @[RelayShard(clusterId: 3, shardId: 0)], ) # same network @@ -426,13 +426,13 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/4/0"], + shards = @[RelayShard(clusterId: 4, shardId: 0)], ) node3 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - pubsubTopics = @["/waku/2/rs/4/0"], + shards = @[RelayShard(clusterId: 4, shardId: 0)], ) node1.mountMetadata(3).expect("Mounted Waku Metadata") From f42865e04c9e9cc21291bf91f75959b38d31b17f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 11:19:42 -0600 Subject: [PATCH 18/33] more fixes --- tests/node/peer_manager/test_peer_manager.nim | 34 ++++++++++--------- tests/test_peer_manager.nim | 9 +++-- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index 940d7d115f..57acf13df4 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -20,8 +20,6 @@ suite "Peer Manager": serverKey {.threadvar.}: PrivateKey clientKey {.threadvar.}: PrivateKey clusterId {.threadvar.}: uint64 - shardTopic0 {.threadvar.}: string - shardTopic1 {.threadvar.}: string asyncSetup: listenPort = Port(0) @@ -29,16 +27,16 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() clusterId = 1 - shard0 = RelayShard(clusterId: clusterId, shardId: 0) - shard1 = RelayShard(clusterId: clusterId, shardId: 1) asyncTest "light client is not disconnected": # Given two nodes with the same shardId let - server = - newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) - client = - newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) + server = newTestWakuNode( + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] + ) + client = newTestWakuNode( + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] + ) # And both mount metadata and filter discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic @@ -68,10 +66,12 @@ suite "Peer Manager": asyncTest "relay with same shardId is not disconnected": # Given two nodes with the same shardId let - server = - newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) - client = - newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) + server = newTestWakuNode( + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] + ) + client = newTestWakuNode( + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] + ) # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic @@ -99,10 +99,12 @@ suite "Peer Manager": asyncTest "relay with different shardId is disconnected": # Given two nodes with different shardIds let - server = - newTestWakuNode(serverKey, listenAddress, listenPort, shards = @[shard0]) - client = - newTestWakuNode(clientKey, listenAddress, listenPort, shards = @[shard1]) + server = newTestWakuNode( + serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0] + ) + client = newTestWakuNode( + clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1] + ) # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 8e882b8351..3dbcf44275 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -418,7 +418,8 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - shards = @[RelayShard(clusterId: 3, shardId: 0)], + clusterId = 3, + shards = @[0], ) # same network @@ -426,13 +427,15 @@ procSuite "Peer Manager": generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - shards = @[RelayShard(clusterId: 4, shardId: 0)], + clusterId = 4, + shards = @[0], ) node3 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, - shards = @[RelayShard(clusterId: 4, shardId: 0)], + clusterId = 4, + shards = @[0], ) node1.mountMetadata(3).expect("Mounted Waku Metadata") From 5bb390ead1043a3fb08efe73150125c0a5cd9eef Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 11:27:36 -0600 Subject: [PATCH 19/33] fix type --- tests/test_peer_manager.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 3dbcf44275..d71f186cac 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -419,7 +419,7 @@ procSuite "Peer Manager": ValidIpAddress.init("0.0.0.0"), port, clusterId = 3, - shards = @[0], + shards = @[uint16(0)], ) # same network @@ -428,14 +428,14 @@ procSuite "Peer Manager": ValidIpAddress.init("0.0.0.0"), port, clusterId = 4, - shards = @[0], + shards = @[uint16(0)], ) node3 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), port, clusterId = 4, - shards = @[0], + shards = @[uint16(0)], ) node1.mountMetadata(3).expect("Mounted Waku Metadata") From dd85361ef95038c190ec3476c422d740d87b9b1c Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 11:36:13 -0600 Subject: [PATCH 20/33] fix --- tests/test_waku_enr.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_waku_enr.nim b/tests/test_waku_enr.nim index 7180302526..f187fa3002 100644 --- a/tests/test_waku_enr.nim +++ b/tests/test_waku_enr.nim @@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding": clusterId: uint16 = 22 shardId: uint16 = 1 - let shard = RelayShard(clusterId, shardId) + let shard = RelayShard(clusterId: clusterId, shardId: shardId) ## When let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards") From ecb771f72dc0542f061c4aa00104c23672b83cef Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 17:35:42 -0600 Subject: [PATCH 21/33] more test fixes --- tests/wakunode_rest/test_rest_relay.nim | 72 ++++++++++++------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index c8a372984a..665eb2ace6 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -54,16 +54,16 @@ suite "Waku v2 Rest API - Relay": installRelayApiHandlers(restServer.router, node, cache) restServer.start() - let pubSubTopics = - @[ - PubSubTopic("pubsub-topic-1"), - PubSubTopic("pubsub-topic-2"), - PubSubTopic("pubsub-topic-3"), - ] + let + shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0) + shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1) + shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2) + + let shards = @[$shard0, $shard1, $shard2] # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) - let response = await client.relayPostSubscriptionsV1(pubSubTopics) + let response = await client.relayPostSubscriptionsV1(shards) # Then check: @@ -72,12 +72,12 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - cache.isPubsubSubscribed("pubsub-topic-1") - cache.isPubsubSubscribed("pubsub-topic-2") - cache.isPubsubSubscribed("pubsub-topic-3") + cache.isPubsubSubscribed($shard0) + cache.isPubsubSubscribed($shard1) + cache.isPubsubSubscribed($shard2) check: - toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len + toSeq(node.wakuRelay.subscribedTopics).len == shards.len await restServer.stop() await restServer.closeWait() @@ -87,9 +87,15 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay( - @["pubsub-topic-1", "pubsub-topic-2", "pubsub-topic-3", "pubsub-topic-x"] - ) + + let + shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0) + shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1) + shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2) + shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3) + shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4) + + await node.mountRelay(@[$shard0, $shard1, $shard2, $shard3]) var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -98,25 +104,19 @@ suite "Waku v2 Rest API - Relay": restPort = restServer.httpServer.address.port # update with bound port for client use let cache = MessageCache.init() - cache.pubsubSubscribe("pubsub-topic-1") - cache.pubsubSubscribe("pubsub-topic-2") - cache.pubsubSubscribe("pubsub-topic-3") - cache.pubsubSubscribe("pubsub-topic-x") + cache.pubsubSubscribe($shard0) + cache.pubsubSubscribe($shard1) + cache.pubsubSubscribe($shard2) + cache.pubsubSubscribe($shard3) installRelayApiHandlers(restServer.router, node, cache) restServer.start() - let pubSubTopics = - @[ - PubSubTopic("pubsub-topic-1"), - PubSubTopic("pubsub-topic-2"), - PubSubTopic("pubsub-topic-3"), - PubSubTopic("pubsub-topic-y"), - ] + let shards = @[$shard0, $shard1, $shard2, $shard4] # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) - let response = await client.relayDeleteSubscriptionsV1(pubSubTopics) + let response = await client.relayDeleteSubscriptionsV1(shards) # Then check: @@ -125,16 +125,16 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - not cache.isPubsubSubscribed("pubsub-topic-1") - not node.wakuRelay.isSubscribed("pubsub-topic-1") - not cache.isPubsubSubscribed("pubsub-topic-2") - not node.wakuRelay.isSubscribed("pubsub-topic-2") - not cache.isPubsubSubscribed("pubsub-topic-3") - not node.wakuRelay.isSubscribed("pubsub-topic-3") - cache.isPubsubSubscribed("pubsub-topic-x") - node.wakuRelay.isSubscribed("pubsub-topic-x") - not cache.isPubsubSubscribed("pubsub-topic-y") - not node.wakuRelay.isSubscribed("pubsub-topic-y") + not cache.isPubsubSubscribed($shard0) + not node.wakuRelay.isSubscribed($shard0) + not cache.isPubsubSubscribed($shard1) + not node.wakuRelay.isSubscribed($shard1) + not cache.isPubsubSubscribed($shard2) + not node.wakuRelay.isSubscribed($shard2) + cache.isPubsubSubscribed($shard3) + node.wakuRelay.isSubscribed($shard3) + not cache.isPubsubSubscribed($shard4) + not node.wakuRelay.isSubscribed($shard4) await restServer.stop() await restServer.closeWait() From c64ecc5e7079f16fa4739a33d4251383defba524 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 17:40:58 -0600 Subject: [PATCH 22/33] fix types --- tests/wakunode_rest/test_rest_relay.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 665eb2ace6..9732d114b6 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -95,7 +95,7 @@ suite "Waku v2 Rest API - Relay": shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3) shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4) - await node.mountRelay(@[$shard0, $shard1, $shard2, $shard3]) + await node.mountRelay(@[shard0, shard1, shard2, shard3]) var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") From 98e0681cb5fb4f70c946f1b02bffc96cfb1cd7c1 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 17:46:30 -0600 Subject: [PATCH 23/33] fix name --- tests/waku_rln_relay/test_wakunode_rln_relay.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index bc7e7a1504..042a955647 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -139,7 +139,7 @@ procSuite "WakuNode - RLN relay": await allFutures(nodes.mapIt(it.start())) let shards = - @[RelayShard(clusterId: 0, shard: 0), RelayShard(clusterId: 0, shard: 1)] + @[RelayShard(clusterId: 0, shardId: 0), RelayShard(clusterId: 0, shardId: 1)] let contentTopics = @[ ContentTopic("/waku/2/content-topic-a/proto"), From 528a7c90f5679f514e689cce79bf09ed77b823b0 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 17:52:49 -0600 Subject: [PATCH 24/33] fix naming --- tests/waku_rln_relay/test_wakunode_rln_relay.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index 042a955647..f837d34113 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -174,14 +174,14 @@ procSuite "WakuNode - RLN relay": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = info "relayHandler. The received topic:", topic - if topic == pubsubTopics[0]: + if topic == $shards[0]: rxMessagesTopic1 = rxMessagesTopic1 + 1 - elif topic == pubsubTopics[1]: + elif topic == $shards[1]: rxMessagesTopic2 = rxMessagesTopic2 + 1 # mount the relay handlers - nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler)) - nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)) await sleepAsync(1000.millis) # generate some messages with rln proofs first. generating @@ -211,9 +211,9 @@ procSuite "WakuNode - RLN relay": # publish 3 messages from node[0] (last 2 are spam, window is 10 secs) # publish 3 messages from node[1] (last 2 are spam, window is 10 secs) for msg in messages1: - discard await nodes[0].publish(some(pubsubTopics[0]), msg) + discard await nodes[0].publish(some($shards[0]), msg) for msg in messages2: - discard await nodes[1].publish(some(pubsubTopics[1]), msg) + discard await nodes[1].publish(some($shards[1]), msg) # wait for gossip to propagate await sleepAsync(5000.millis) From f0ecd1e7206d8cd6eb6f6f3d3993bf14658f7677 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 27 Aug 2024 18:12:55 -0600 Subject: [PATCH 25/33] fix networkmonitor compilation --- apps/networkmonitor/networkmonitor.nim | 11 ++++++----- apps/networkmonitor/networkmonitor_config.nim | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 4fe53a66f6..25e6a5ca09 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -441,10 +441,12 @@ proc initAndStartApp( ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort) ) builder.withWakuCapabilities(flags) - let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics) - if addShardedTopics.isErr(): - error "failed to add sharded topics to ENR", error = addShardedTopics.error - return err($addShardedTopics.error) + + builder.withWakuRelaySharding( + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + ).isOkOr: + error "failed to add sharded topics to ENR", error = error + return err("failed to add sharded topics to ENR: " & $error) let recordRes = builder.build() let record = @@ -561,7 +563,6 @@ when isMainModule: let twnClusterConf = ClusterConf.TheWakuNetworkConf() conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes - conf.pubsubTopics = twnClusterConf.pubsubTopics conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 912780b589..4934b16b2a 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -44,7 +44,7 @@ type NetworkMonitorConf* = object .}: seq[uint16] networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: - uint16 + uint32 refreshInterval* {. desc: "How often new peers are discovered and connected to (in seconds)", From ef22c8c3e02e1843db51139ea5ad90dedc0432c9 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 28 Aug 2024 14:16:38 -0600 Subject: [PATCH 26/33] adding temporary support to pubsub topics --- waku/factory/external_config.nim | 7 +++++++ waku/factory/internal_config.nim | 29 +---------------------------- waku/factory/waku.nim | 19 +++++++++++-------- 3 files changed, 19 insertions(+), 36 deletions(-) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 1d6af3e38c..69cdf78940 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -315,6 +315,13 @@ type WakuNodeConf* = object networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: uint32 + pubsubTopics* {. + desc: + "Deprecated. Default pubsub topic to subscribe to. Argument may be repeated.", + defaultValue: @[], + name: "pubsub-topic" + .}: seq[string] + shards* {. desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", defaultValue: diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 355ac40c62..064a03acd9 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -28,35 +28,8 @@ proc enrConfiguration*( enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - var shards = newSeq[uint16]() - - # Only for dev purposes. After everything compiles, remove this block and uncomment the one below - let shardsOpt = topicsToRelayShards(@["/waku/2/rs/0/0"]).valueOr: - error "failed to parse pubsub topic, please format according to static shard specification", - error = $error - return err("failed to parse pubsub topic: " & $error) - - #[ let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: - error "failed to parse pubsub topic, please format according to static shard specification", - error = $error - return err("failed to parse pubsub topic: " & $error) ]# - - if shardsOpt.isSome(): - let relayShards = shardsOpt.get() - - if relayShards.clusterid != conf.clusterId: - error "pubsub topic corresponds to different shard than configured", - nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterid - return err("pubsub topic corresponds to different shard than configured") - - shards = relayShards.shardIds - elif conf.shards.len > 0: - shards = toSeq(conf.shards.mapIt(uint16(it))) - else: - info "no pubsub topics specified" - enrBuilder.withWakuRelaySharding( - RelayShards(clusterId: conf.clusterId, shardIds: shards) + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) ).isOkOr: return err("could not initialize ENR with shards") diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index fe8255caf1..1dc41cfafa 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -107,18 +107,21 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = logging.setupLog(conf.logLevel, conf.logFormat) # TODO: remove after pubsubtopic config gets removed - #[ let shards = newSeq[uint16]() - if conf.pubsubTopics.length > 0: - let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr: + var shards = newSeq[uint16]() + if conf.pubsubTopics.len > 0: + let shardsRes = topicsToRelayShards(conf.pubsubTopics) + if shardsRes.isErr(): error "failed to parse pubsub topic, please format according to static shard specification", - error = $error - return err("failed to parse pubsub topic: " & $error) + error = shardsRes.error + return err("failed to parse pubsub topic: " & $shardsRes.error) + + let shardsOpt = shardsRes.get() if shardsOpt.isSome(): let relayShards = shardsOpt.get() - for shard in relayShards: - shards.add(shard.shardId) - confCopy.shards = shards ]# + for shard in relayShards.shardIds: + shards.add(shard) + confCopy.shards = shards case confCopy.clusterId From f2320703144a38c0058dd423ad588264f12213b5 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 28 Aug 2024 14:47:58 -0600 Subject: [PATCH 27/33] fixing shards validation --- waku/factory/external_config.nim | 5 +++-- waku/factory/node_factory.nim | 2 +- waku/factory/waku.nim | 7 ++++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 69cdf78940..d62304a114 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -312,8 +312,9 @@ type WakuNodeConf* = object name: "keep-alive" .}: bool - networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: - uint32 + networkShards* {. + desc: "Number of shards in the network", defaultValue: 0, name: "network-shards" + .}: uint32 pubsubTopics* {. desc: diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 0fad26dedf..64d86a9082 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -117,7 +117,7 @@ proc getNetworkShards*(conf: WakuNodeConf): uint32 = if conf.networkShards != 0: return conf.networkShards # If conf.networkShards is not set, use the number of shards configured as networkShards - return uint32(conf.shards.len) + return uint32(max(conf.shards) + 1) proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 1dc41cfafa..669799c431 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -119,6 +119,11 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = if shardsOpt.isSome(): let relayShards = shardsOpt.get() + if relayShards.clusterId != conf.clusterId: + error "clusterId of the pubsub topic should match the node's cluster", + nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterId + return err("clusterId of the pubsub topic should match the node's cluster") + for shard in relayShards.shardIds: shards.add(shard) confCopy.shards = shards @@ -152,7 +157,7 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = info "Running nwaku node", version = git_version logConfig(confCopy) - let validateShardsRes = validateShards(conf) + let validateShardsRes = validateShards(confCopy) if validateShardsRes.isErr(): error "Failed validating shards", error = $validateShardsRes.error return err("Failed validating shards: " & $validateShardsRes.error) From 8688ec83d91d429cf8e3f356fab54e79015220ff Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 28 Aug 2024 16:25:10 -0600 Subject: [PATCH 28/33] removing unnecessary chat2bridge config and improving default node documentation --- apps/chat2bridge/config_chat2bridge.nim | 6 ------ docs/operators/how-to/run.md | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index c858fdb3b2..c7d8bb56a8 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -67,12 +67,6 @@ type Chat2MatterbridgeConf* = object name: "nodekey" .}: crypto.PrivateKey - topics* {. - desc: "Default topics to subscribe to (space separated list)", - defaultValue: "/waku/2/rs/0/0", - name: "topics" - .}: string - store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" .}: bool diff --git a/docs/operators/how-to/run.md b/docs/operators/how-to/run.md index b85efe1bb6..bc90394792 100644 --- a/docs/operators/how-to/run.md +++ b/docs/operators/how-to/run.md @@ -18,7 +18,7 @@ By default a nwaku node will: See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key. - listen for incoming libp2p connections on the default TCP port (`60000`) - enable `relay` protocol -- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0` +- subscribe to the default clusterId (0) and shard (0) - enable `store` protocol, but only as a client. This implies that the nwaku node will not persist any historical messages itself, but can query `store` service peers who do so. From dbcbf1311d684e87203949c19efccd4fec7650d7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 29 Aug 2024 11:44:25 -0600 Subject: [PATCH 29/33] improving logs and using defaults --- tests/testlib/wakunode.nim | 4 ++-- waku/factory/waku.nim | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 96c2deeab7..b6184e330f 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -37,8 +37,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = nat: "any", maxConnections: 50, maxMessageSize: "1024 KiB", - clusterId: 0, - shards: @[uint16(0)], + clusterId: DefaultClusterId, + shards: @[DefaultShardId], relay: true, storeMessageDbUrl: "sqlite://store.sqlite3", ) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 669799c431..9a6c6bec7c 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -91,8 +91,9 @@ proc validateShards(conf: WakuNodeConf): Result[void, string] = for shard in conf.shards: if shard >= networkShards: - let msg = "Invalid shard: " & $shard & " when networkShards: " & $networkShards - # fmt doesn't work + let msg = + "validateShards invalid shard: " & $shard & " when networkShards: " & + $networkShards # fmt doesn't work error "validateShards failed", error = msg return err(msg) @@ -120,9 +121,11 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = if shardsOpt.isSome(): let relayShards = shardsOpt.get() if relayShards.clusterId != conf.clusterId: - error "clusterId of the pubsub topic should match the node's cluster", + error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22", nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterId - return err("clusterId of the pubsub topic should match the node's cluster") + return err( + "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22" + ) for shard in relayShards.shardIds: shards.add(shard) From 80294063fbc27c7d4f27f36234c4ae0b1884191a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 29 Aug 2024 11:52:27 -0600 Subject: [PATCH 30/33] changing networkShards to numShardsInNetwork --- apps/networkmonitor/networkmonitor.nim | 4 ++-- apps/networkmonitor/networkmonitor_config.nim | 5 +++-- waku/factory/external_config.nim | 7 +++++-- waku/factory/networks_config.nim | 4 ++-- waku/factory/node_factory.nim | 14 +++++++------- waku/factory/waku.nim | 10 +++++----- 6 files changed, 24 insertions(+), 20 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 25e6a5ca09..09a64c8d54 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -567,10 +567,10 @@ when isMainModule: conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit - conf.networkShards = twnClusterConf.networkShards + conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork if conf.shards.len == 0: - conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.networkShards - 1)) + conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1)) if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 4934b16b2a..48c5ab999d 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -43,8 +43,9 @@ type NetworkMonitorConf* = object name: "shard" .}: seq[uint16] - networkShards* {.desc: "Number of shards in the network", name: "network-shards".}: - uint32 + numShardsInNetwork* {. + desc: "Number of shards in the network", name: "num-shards-in-network" + .}: uint32 refreshInterval* {. desc: "How often new peers are discovered and connected to (in seconds)", diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index d62304a114..6cfc867d8a 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -312,8 +312,11 @@ type WakuNodeConf* = object name: "keep-alive" .}: bool - networkShards* {. - desc: "Number of shards in the network", defaultValue: 0, name: "network-shards" + # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork + numShardsInNetwork* {. + desc: "Number of shards in the network", + defaultValue: 0, + name: "num-shards-in-network" .}: uint32 pubsubTopics* {. diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index b337cce4ec..41678f5904 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -10,7 +10,7 @@ type ClusterConf* = object rlnRelayBandwidthThreshold*: int rlnEpochSizeSec*: uint64 rlnRelayUserMessageLimit*: uint64 - networkShards*: uint32 + numShardsInNetwork*: uint32 discv5Discovery*: bool discv5BootstrapNodes*: seq[string] @@ -28,7 +28,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayBandwidthThreshold: 0, rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, - networkShards: 8, + numShardsInNetwork: 8, discv5Discovery: true, discv5BootstrapNodes: @[ diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 64d86a9082..d96ea3423e 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -113,10 +113,10 @@ proc initNode( ## Mount protocols -proc getNetworkShards*(conf: WakuNodeConf): uint32 = - if conf.networkShards != 0: - return conf.networkShards - # If conf.networkShards is not set, use the number of shards configured as networkShards +proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = + if conf.numShardsInNetwork != 0: + return conf.numShardsInNetwork + # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork return uint32(max(conf.shards) + 1) proc setupProtocols( @@ -133,10 +133,10 @@ proc setupProtocols( node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) - # If conf.networkShards is not set, use the number of shards configured as networkShards - let networkShards = getNetworkShards(conf) + # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork + let numShardsInNetwork = getNumShardsInNetwork(conf) - node.mountSharding(conf.clusterId, networkShards).isOkOr: + node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: return err("failed to mount waku sharding: " & error) # Mount relay on all nodes diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9a6c6bec7c..cdc795c30b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -87,13 +87,13 @@ func version*(waku: Waku): string = waku.version proc validateShards(conf: WakuNodeConf): Result[void, string] = - let networkShards = getNetworkShards(conf) + let numShardsInNetwork = getNumShardsInNetwork(conf) for shard in conf.shards: - if shard >= networkShards: + if shard >= numShardsInNetwork: let msg = - "validateShards invalid shard: " & $shard & " when networkShards: " & - $networkShards # fmt doesn't work + "validateShards invalid shard: " & $shard & " when numShardsInNetwork: " & + $numShardsInNetwork # fmt doesn't work error "validateShards failed", error = msg return err(msg) @@ -149,7 +149,7 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit - confCopy.networkShards = twnClusterConf.networkShards + confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork # Only set rlnRelay to true if relay is configured if confCopy.relay: From a86bb90acaa4dc027264805cd17f53c45619b105 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 29 Aug 2024 12:02:24 -0600 Subject: [PATCH 31/33] apply suggestions --- apps/chat2/config_chat2.nim | 3 ++- apps/liteprotocoltester/tester_config.nim | 2 +- apps/networkmonitor/networkmonitor_config.nim | 3 ++- apps/wakucanary/wakucanary.nim | 3 ++- waku/factory/external_config.nim | 3 ++- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/chat2/config_chat2.nim b/apps/chat2/config_chat2.nim index d329e0c570..5086aef51b 100644 --- a/apps/chat2/config_chat2.nim +++ b/apps/chat2/config_chat2.nim @@ -91,7 +91,8 @@ type .}: uint16 shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", defaultValue: @[uint16(0)], name: "shard" .}: seq[uint16] diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index 5683b544f5..35f214f8d0 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -98,7 +98,7 @@ type LiteProtocolTesterConf* = object ## TODO: extend lite protocol tester configuration based on testing needs # shards* {. - # desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + # desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", # defaultValue: @[], # name: "shard" # .}: seq[uint16] diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 48c5ab999d..291144c5ba 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -39,7 +39,8 @@ type NetworkMonitorConf* = object .}: string shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", name: "shard" .}: seq[uint16] diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index a67534f44d..9d7f5450e1 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -81,7 +81,8 @@ type WakuCanaryConf* = object .}: bool shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", defaultValue: @[], name: "shard", abbr: "s" diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 6cfc867d8a..532a657fd0 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -327,7 +327,8 @@ type WakuNodeConf* = object .}: seq[string] shards* {. - desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", defaultValue: @[ uint16(0), From 3f78bfe4c43b397669857d8cf601021cefbe9263 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 29 Aug 2024 17:54:38 -0600 Subject: [PATCH 32/33] log warning when automagically setting numShardsInNetwork --- waku/factory/node_factory.nim | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d96ea3423e..d5452ef030 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -136,6 +136,10 @@ proc setupProtocols( # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork let numShardsInNetwork = getNumShardsInNetwork(conf) + if conf.numShardsInNetwork == 0: + warn "Number of shards in network not configured, setting it to", + numShardsInNetwork = $numShardsInNetwork + node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: return err("failed to mount waku sharding: " & error) From c6e7332ab45e5546740eedcbddc927aa69af4d03 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 10 Sep 2024 09:18:38 -0600 Subject: [PATCH 33/33] setting 1024 shards as default if users don't set it --- waku/factory/node_factory.nim | 5 +++-- waku/waku_enr/sharding.nim | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d5452ef030..d6672f8a8a 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -116,8 +116,9 @@ proc initNode( proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = if conf.numShardsInNetwork != 0: return conf.numShardsInNetwork - # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork - return uint32(max(conf.shards) + 1) + # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec + # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding + return uint32(MaxShardIndex + 1) proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 3b7cdae149..d1beb2864b 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -13,7 +13,7 @@ import ../common/enr, ../waku_core logScope: topics = "waku enr sharding" -const MaxShardIndex: uint16 = 1023 +const MaxShardIndex*: uint16 = 1023 const ShardingIndicesListEnrField* = "rs"