Skip to content

Commit

Permalink
add specific ttl and peer selection
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Aug 29, 2024
1 parent be0e331 commit e227d05
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 111 deletions.
1 change: 1 addition & 0 deletions vendor/negentropy
Submodule negentropy added at f15207
1 change: 1 addition & 0 deletions vendor/nph
Submodule nph added at de5cd4
14 changes: 8 additions & 6 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ proc setupProtocols(
protectedTopic = topicKey.topic, publicKey = topicKey.key
node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics)

# Only relay nodes can be rendezvous points.
if conf.rendezvous:
try:
await mountRendezvous(node)
except CatchableError:
return
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())

# Keepalive mounted on all nodes
try:
await mountLibp2pPing(node)
Expand Down Expand Up @@ -346,12 +354,6 @@ proc setupProtocols(
return
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)

# Enable Rendezvous Discovery protocol
try:
await mountRendezvous(node, conf.rendezvous)
except CatchableError:
return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())

return ok()

## Start node
Expand Down
8 changes: 5 additions & 3 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1214,11 +1214,10 @@ proc startKeepalive*(node: WakuNode) =

asyncSpawn node.keepaliveLoop(defaultKeepalive)

proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} =
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

node.wakuRendezvous =
WakuRendezVous.new(node.switch, node.peerManager, node.enr, enabled)
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr)

if node.started:
try:
Expand All @@ -1231,6 +1230,9 @@ proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} =
except LPError:
error "failed to mount rendezvous", error = getCurrentExceptionMsg()

# Always start discovering peers at startup
await node.wakuRendezvous.initialRequestAll()

proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"):
Expand Down
3 changes: 2 additions & 1 deletion waku/waku_rendezvous/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import chronos
import ../waku_enr/capabilities

const DiscoverLimit* = 1000
const DefaultRegistrationInterval* = 1.minutes
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds

proc computeNamespace*(clusterId: uint16, shard: uint16): string =
var namespace = "rs/"
Expand Down
144 changes: 43 additions & 101 deletions waku/waku_rendezvous/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import
../common/enr,
../waku_enr/capabilities,
../waku_enr/sharding,
../waku_core/peers as peers,
../waku_core/peers,
../waku_core/topics,
./common

logScope:
topics = "waku rendez vous"

declarePublicCounter peerFound, "number of peers found via rendezvous"
declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous"

type WakuRendezVous* = ref object of RendezVous
peerManager: PeerManager
Expand All @@ -30,65 +31,46 @@ type WakuRendezVous* = ref object of RendezVous

periodicRegistrationFut: Future[void]

enabled: bool

proc advertise(
self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration
proc batchAdvertise*(
self: WakuRendezVous,
namespace: string,
ttl: Duration = MinimumDuration,
peers: seq[PeerId],
): Future[Result[void, string]] {.async.} =
## Register with all rendez vous peers under a namespace

let catchable = catch:
await procCall RendezVous(self).advertise(namespace, ttl)
await procCall RendezVous(self).batchAdvertise(namespace, ttl, peers)

if catchable.isErr():
return err(catchable.error.msg)

return ok()

proc request(
self: WakuRendezVous, namespace: string, count: int = DiscoverLimit
proc batchRequest*(
self: WakuRendezVous,
namespace: string,
count: int = DiscoverLimit,
peers: seq[PeerId],
): Future[Result[seq[PeerRecord], string]] {.async.} =
## Request all records from all rendez vous peers with matching a namespace

let catchable = catch:
await RendezVous(self).request(namespace, count)
await RendezVous(self).batchRequest(namespace, count, peers)

if catchable.isErr():
return err(catchable.error.msg)

return ok(catchable.get())

proc requestLocally(self: WakuRendezVous, namespace: string): seq[PeerRecord] =
RendezVous(self).requestLocally(namespace)

proc unsubscribeLocally(self: WakuRendezVous, namespace: string) =
RendezVous(self).unsubscribeLocally(namespace)

proc unsubscribe(
self: WakuRendezVous, namespace: string
): Future[Result[void, string]] {.async.} =
## Unsubscribe from all rendez vous peers including locally

let catchable = catch:
await RendezVous(self).unsubscribe(namespace)

if catchable.isErr():
return err(catchable.error.msg)

return ok()

proc getRelayShards(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()

proc new*(
T: type WakuRendezVous,
switch: Switch,
peerManager: PeerManager,
enr: Record,
enabled: bool,
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): T =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
Expand All @@ -97,10 +79,7 @@ proc new*(
let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager,
relayshard: relayshard,
capabilities: capabilities,
enabled: enabled,
peerManager: peerManager, relayshard: relayshard, capabilities: capabilities
)

RendezVous(wrv).setup(switch)
Expand All @@ -112,112 +91,75 @@ proc new*(

return wrv

proc advertisementNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
for cap in self.capabilities:
computeNamespace(self.relayShard.clusterId, shard, cap)

return namespaces

proc requestNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
for cap in Capabilities:
computeNamespace(self.relayShard.clusterId, shard, cap)

return namespaces
proc advertiseAll(self: WakuRendezVous) {.async.} =
let pubsubTopics = self.relayShard.topics()

proc shardOnlyNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
computeNamespace(self.relayShard.clusterId, shard)

return namespaces
let futs = collect(newSeq):
for pubsubTopic in pubsubTopics:
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)

proc advertiseAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()
# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
continue

let futs = collect(newSeq):
for namespace in namespaces:
self.advertise(namespace, 1.minutes)
# Advertise yourself on that peer
self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to advertise", error = res.error
warn "rendezvous advertise failed", error = res.error

debug "waku rendezvous advertisements finished", adverCount = handles.len

proc requestAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()
proc initialRequestAll*(self: WakuRendezVous) {.async.} =
let pubsubTopics = self.relayShard.topics()

let futs = collect(newSeq):
for namespace in namespaces:
self.request(namespace)
for pubsubTopic in pubsubTopics:
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)

# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
continue

# Ask for 12 peer records for that shard
self.batchRequest(namespace, 12, @[rpi.peerId])

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to request", error = res.error
warn "rendezvous request failed", error = res.error
else:
for peer in res.get():
peerFound.inc()
peerFoundTotal.inc()
self.peerManager.addPeer(peer)

debug "waku rendezvous requests finished", requestCount = handles.len

proc unsubcribeAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()

let futs = collect(newSeq):
for namespace in namespaces:
self.unsubscribe(namespace)

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to unsubcribe", error = res.error

debug "waku rendezvous unsubscriptions finished", unsubCount = handles.len

return

proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval

# infinite loop
while true:
# default ttl of registration is the same as default interval
await self.advertiseAll()

await sleepAsync(DefaultRegistrationInterval)

proc start*(self: WakuRendezVous) {.async.} =
await self.requestAll()

if not self.enabled:
return
await self.advertiseAll()

proc start*(self: WakuRendezVous) {.async.} =
debug "starting waku rendezvous discovery"

# start registering forever
self.periodicRegistrationFut = self.periodicRegistration()

proc stopWait*(self: WakuRendezVous) {.async.} =
if not self.enabled:
return

debug "stopping waku rendezvous discovery"

if not self.periodicRegistrationFut.isNil():
Expand Down

0 comments on commit e227d05

Please sign in to comment.