Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: deprecating pubsub topic #2997

Merged
merged 33 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
af84f0f
initial changes
gabrielmer Aug 26, 2024
d4432e4
fixing some compilation errors
gabrielmer Aug 26, 2024
86a1481
fixing compilation error
gabrielmer Aug 26, 2024
cbcb93b
chat2 fix and code to temporarily convert from pubsub topics to shards
gabrielmer Aug 26, 2024
7e69272
fixes to testlib
gabrielmer Aug 26, 2024
20c6a24
adding defaults and removing redundant staticSharding proc
gabrielmer Aug 26, 2024
10d5a2e
fixing compilation errors
gabrielmer Aug 26, 2024
0db34d2
adding default pubsub topic
gabrielmer Aug 26, 2024
57f5ef0
fix syntax error
gabrielmer Aug 26, 2024
be46c5e
fix type
gabrielmer Aug 26, 2024
80e1ba1
fix typo
gabrielmer Aug 26, 2024
f932b78
fix more compilation errors
gabrielmer Aug 26, 2024
0fccc84
fix variable name
gabrielmer Aug 26, 2024
1b0ce73
fixing chat2 compilation
gabrielmer Aug 27, 2024
9d2134b
fixing tests
gabrielmer Aug 27, 2024
3913762
fix compilation error
gabrielmer Aug 27, 2024
0e91c27
more compilation errors
gabrielmer Aug 27, 2024
f42865e
more fixes
gabrielmer Aug 27, 2024
5bb390e
fix type
gabrielmer Aug 27, 2024
dd85361
fix
gabrielmer Aug 27, 2024
ecb771f
more test fixes
gabrielmer Aug 27, 2024
c64ecc5
fix types
gabrielmer Aug 27, 2024
98e0681
fix name
gabrielmer Aug 27, 2024
528a7c9
fix naming
gabrielmer Aug 27, 2024
f0ecd1e
fix networkmonitor compilation
gabrielmer Aug 28, 2024
ef22c8c
adding temporary support to pubsub topics
gabrielmer Aug 28, 2024
f232070
fixing shards validation
gabrielmer Aug 28, 2024
8688ec8
removing unnecessary chat2bridge config and improving default node do…
gabrielmer Aug 28, 2024
dbcbf13
improving logs and using defaults
gabrielmer Aug 29, 2024
8029406
changing networkShards to numShardsInNetwork
gabrielmer Aug 29, 2024
a86bb90
apply suggestions
gabrielmer Aug 29, 2024
3f78bfe
log warning when automagically setting numShardsInNetwork
gabrielmer Aug 29, 2024
c6e7332
setting 1024 shards as default if users don't set it
gabrielmer Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ when not (compileOption("threads")):

{.push raises: [].}

import std/[strformat, strutils, times, options, random]
import std/[strformat, strutils, times, options, random, sequtils]
import
confutils,
chronicles,
Expand Down Expand Up @@ -379,7 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed")

if conf.relay:
await node.mountRelay(conf.topics.split(" "))
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
await node.mountRelay(shards)

await node.mountLibp2pPing()

Expand Down
18 changes: 13 additions & 5 deletions apps/chat2/config_chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,19 @@ type
name: "keep-alive"
.}: bool

topics* {.
desc: "Default topics to subscribe to (space separated list).",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
.}: uint16

shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[uint16(0)],
name: "shard"
.}: seq[uint16]

## Store config
store* {.
Expand Down
6 changes: 0 additions & 6 deletions apps/chat2bridge/config_chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ type Chat2MatterbridgeConf* = object
name: "nodekey"
.}: crypto.PrivateKey

topics* {.
desc: "Default topics to subscribe to (space separated list)",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string

store* {.
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
.}: bool
Expand Down
2 changes: 1 addition & 1 deletion apps/liteprotocoltester/tester_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type LiteProtocolTesterConf* = object

## TODO: extend lite protocol tester configuration based on testing needs
# shards* {.
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
# desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[uint16]
Expand Down
23 changes: 15 additions & 8 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,12 @@ proc initAndStartApp(
ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort)
)
builder.withWakuCapabilities(flags)
let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error = addShardedTopics.error
return err($addShardedTopics.error)

builder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
error "failed to add sharded topics to ENR", error = error
return err("failed to add sharded topics to ENR: " & $error)

let recordRes = builder.build()
let record =
Expand Down Expand Up @@ -561,11 +563,14 @@ when isMainModule:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()

conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes
conf.pubsubTopics = twnClusterConf.pubsubTopics
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork

if conf.shards.len == 0:
conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1))

if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
Expand Down Expand Up @@ -631,9 +636,11 @@ when isMainModule:
error "failed to mount waku metadata protocol: ", err = error
quit 1

for pubsubTopic in conf.pubsubTopics:
# Subscribe the node to the default pubsubtopic, to count messages
subscribeAndHandleMessages(node, pubsubTopic, msgPerContentTopic)
for shard in conf.shards:
# Subscribe the node to the shards, to count messages
subscribeAndHandleMessages(
node, $RelayShard(shardId: shard, clusterId: conf.clusterId), msgPerContentTopic
)

# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)
Expand Down
15 changes: 10 additions & 5 deletions apps/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ type NetworkMonitorConf* = object
name: "dns-discovery-url"
.}: string

pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
name: "shard"
.}: seq[uint16]

numShardsInNetwork* {.
desc: "Number of shards in the network", name: "num-shards-in-network"
.}: uint32

refreshInterval* {.
desc: "How often new peers are discovered and connected to (in seconds)",
Expand All @@ -55,7 +60,7 @@ type NetworkMonitorConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 1,
name: "cluster-id"
.}: uint32
.}: uint16

rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
Expand Down
3 changes: 2 additions & 1 deletion apps/wakucanary/wakucanary.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type WakuCanaryConf* = object
.}: bool

shards* {.
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[],
name: "shard",
abbr: "s"
Expand Down
2 changes: 1 addition & 1 deletion docs/operators/how-to/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ By default a nwaku node will:
See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key.
- listen for incoming libp2p connections on the default TCP port (`60000`)
- enable `relay` protocol
- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0`
- subscribe to the default clusterId (0) and shard (0)
- enable `store` protocol, but only as a client.
This implies that the nwaku node will not persist any historical messages itself,
but can query `store` service peers who do so.
Expand Down
16 changes: 6 additions & 10 deletions tests/node/peer_manager/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@ suite "Peer Manager":
serverKey {.threadvar.}: PrivateKey
clientKey {.threadvar.}: PrivateKey
clusterId {.threadvar.}: uint64
shardTopic0 {.threadvar.}: string
shardTopic1 {.threadvar.}: string

asyncSetup:
listenPort = Port(0)
listenAddress = ValidIpAddress.init("0.0.0.0")
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
clusterId = 1
shardTopic0 = "/waku/2/rs/" & $clusterId & "/0"
shardTopic1 = "/waku/2/rs/" & $clusterId & "/1"

asyncTest "light client is not disconnected":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and filter
Expand Down Expand Up @@ -71,10 +67,10 @@ suite "Peer Manager":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and relay
Expand Down Expand Up @@ -104,10 +100,10 @@ suite "Peer Manager":
# Given two nodes with different shardIds
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and relay
Expand Down
6 changes: 3 additions & 3 deletions tests/node/test_wakunode_relay_rln.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ proc getWakuRlnConfigOnChain*(
)

proc setupRelayWithOnChainRln*(
node: WakuNode, pubsubTopics: seq[string], wakuRlnConfig: WakuRlnConfig
node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig
) {.async.} =
await node.mountRelay(pubsubTopics)
await node.mountRelay(shards)
await node.mountRlnRelay(wakuRlnConfig)

suite "Waku RlnRelay - End to End - Static":
Expand Down Expand Up @@ -223,7 +223,7 @@ suite "Waku RlnRelay - End to End - Static":
nodekey = generateSecp256k1Key()
node = newTestWakuNode(nodekey, parseIpAddress("0.0.0.0"), Port(0))

await node.mountRelay(@[DefaultPubsubTopic])
await node.mountRelay(@[DefaultRelayShard])

let contractAddress = await uploadRLNContract(EthClient)
let wakuRlnConfig = WakuRlnConfig(
Expand Down
9 changes: 6 additions & 3 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -418,21 +418,24 @@ procSuite "Peer Manager":
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/3/0"],
clusterId = 3,
shards = @[uint16(0)],
)

# same network
node2 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)

node1.mountMetadata(3).expect("Mounted Waku Metadata")
Expand Down
10 changes: 5 additions & 5 deletions tests/test_relay_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ procSuite "Relay (GossipSub) Peer Exchange":
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)

# When both client and server mount relay without a handler
await node1.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler))
await node1.mountRelay(@[DefaultRelayShard])
await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))

# Then the relays are mounted without a handler
check:
Expand Down Expand Up @@ -75,9 +75,9 @@ procSuite "Relay (GossipSub) Peer Exchange":
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler

# Givem the nodes mount relay with a peer exchange handler
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle))
await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))

# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_enr.nim
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding":
clusterId: uint16 = 22
shardId: uint16 = 1

let shard = RelayShard.staticSharding(clusterId, shardId)
let shard = RelayShard(clusterId: clusterId, shardId: shardId)

## When
let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards")
Expand Down
12 changes: 6 additions & 6 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ suite "WakuNode":
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000))
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002))
pubSubTopic = "/waku/2/rs/0/0"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)

# Setup node 1 with stable codec "/vac/waku/relay/2.0.0"

await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
node1.wakuRelay.codec = "/vac/waku/relay/2.0.0"

# Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2"

await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2"

check:
Expand All @@ -58,15 +58,15 @@ suite "WakuNode":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(2000.millis)

var res = await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some($shard), message)
assert res.isOk(), $res.error

await sleepAsync(2000.millis)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ suite "WakuNode - Lightpush":

await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())

await destNode.mountRelay(@[DefaultPubsubTopic])
await bridgeNode.mountRelay(@[DefaultPubsubTopic])
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()

Expand Down
Loading
Loading