From 125df48bf100255b133c6ec0688e19c09254c5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Tue, 19 Mar 2024 17:57:31 +0100 Subject: [PATCH 1/6] Make some useful consts public, add some utils. --- tests/node/peer_manager/peer_store/utils.nim | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/node/peer_manager/peer_store/utils.nim b/tests/node/peer_manager/peer_store/utils.nim index 1d5dc6e227..82a7daa0c3 100644 --- a/tests/node/peer_manager/peer_store/utils.nim +++ b/tests/node/peer_manager/peer_store/utils.nim @@ -1,12 +1,9 @@ -import std/options, stew/results, libp2p/peerstore +import stew/results import - waku/node/peer_manager/[waku_peer_store, peer_store/waku_peer_storage], + ../../../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../../waku_archive/archive_utils -proc newTestWakuPeerStorage*(path: Option[string] = string.none()): WakuPeerStorage = - let db = newSqliteDatabase(path) +proc newTestWakuPeerStorage*(): WakuPeerStorage = + let db = newSqliteDatabase() WakuPeerStorage.new(db).value() - -proc peerExists*(peerStore: PeerStore, peerId: PeerId): bool = - return peerStore[AddressBook].contains(peerId) From 032bfcf2418a1d79d8fcb1ee87f08bf6a4b94d6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Wed, 3 Apr 2024 17:25:00 +0200 Subject: [PATCH 2/6] Implement various utilities. --- tests/node/peer_manager/peer_store/utils.nim | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/node/peer_manager/peer_store/utils.nim b/tests/node/peer_manager/peer_store/utils.nim index 82a7daa0c3..8c631b5080 100644 --- a/tests/node/peer_manager/peer_store/utils.nim +++ b/tests/node/peer_manager/peer_store/utils.nim @@ -1,9 +1,12 @@ -import stew/results +import std/options, stew/results, libp2p/peerstore import - ../../../../waku/node/peer_manager/peer_store/waku_peer_storage, + ../../../../waku/node/peer_manager/[waku_peer_store, peer_store/waku_peer_storage], ../../../waku_archive/archive_utils -proc newTestWakuPeerStorage*(): WakuPeerStorage = - let db = newSqliteDatabase() +proc newTestWakuPeerStorage*(path: Option[string] = string.none()): WakuPeerStorage = + let db = newSqliteDatabase(path) WakuPeerStorage.new(db).value() + +proc peerExists*(peerStore: PeerStore, peerId: PeerId): bool = + return peerStore[AddressBook].contains(peerId) From cf198b189d3e6e1dc1f8c078d15a447696e23c3d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 23 Sep 2024 14:17:26 +0200 Subject: [PATCH 3/6] peer_manager reconnectPeers enhancements --- tests/node/test_wakunode_peer_manager.nim | 86 ++++++++--------------- tests/testlib/testutils.nim | 10 ++- waku/node/peer_manager/peer_manager.nim | 70 ++++++++---------- waku/node/waku_node.nim | 3 + 4 files changed, 72 insertions(+), 97 deletions(-) diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index 4e10556f73..d7551286a6 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -8,6 +8,7 @@ import chronos, # chronos/timer, chronicles, + times, libp2p/[peerstore, crypto/crypto, multiaddress] from times import getTime, toUnix @@ -62,9 +63,9 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, listenIp, listenPort) + server = newTestWakuNode(serverKey, listenIp, Port(3000)) serverPeerStore = server.peerManager.peerStore - client = newTestWakuNode(clientKey, listenIp, listenPort) + client = newTestWakuNode(clientKey, listenIp, Port(3001)) clientPeerStore = client.peerManager.peerStore await allFutures(server.start(), client.start()) @@ -577,77 +578,52 @@ suite "Peer Manager": Connectedness.CannotConnect suite "Automatic Reconnection": - xasyncTest "Automatic Reconnection Implementation": + asyncTest "Automatic Reconnection Implementation": # Given two correctly initialised nodes, that are available for reconnection await server.mountRelay() await client.mountRelay() await client.connectToNodes(@[serverRemotePeerInfo]) - await server.switch.stop() - await client.switch.stop() - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + await client.disconnectNode(serverRemotePeerInfo) + + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect # When triggering the reconnection + var beforeReconnect = getTime().toUnixFloat() await client.peerManager.reconnectPeers(WakuRelayCodec) + let reconnectDuration = getTime().toUnixFloat() - beforeReconnect # Then both peers should be marked as Connected - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected - xasyncTest "Automatic Reconnection Implementation (With Backoff)": - # Given two correctly initialised nodes, that are available for reconnection - await server.mountRelay() - await client.mountRelay() - await client.connectToNodes(@[serverRemotePeerInfo]) - waitFor allFutures(server.switch.stop(), client.switch.stop()) - waitFor allFutures(server.switch.start(), client.switch.start()) - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + ## Now let's do the same but with backoff period + await client.disconnectNode(serverRemotePeerInfo) - # When triggering a reconnection with a backoff period - let - backoffPeriod = 10.seconds - halfBackoffPeriod = 5.seconds + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + # When triggering a reconnection with a backoff period + let backoffPeriod = chronos.seconds(1) + beforeReconnect = getTime().toUnixFloat() await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod) - await sleepAsync(halfBackoffPeriod) - - # If the backoff period is not over, then the peers should still be marked as CanConnect - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect - - # When waiting for the backoff period to be over - await sleepAsync(halfBackoffPeriod) - - # Then both peers should be marked as Connected - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected - - xasyncTest "Automatic Reconnection Implementation (After client restart)": - # Given two correctly initialised nodes, that are available for reconnection - await server.mountRelay() - await client.mountRelay() - await client.connectToNodes(@[serverRemotePeerInfo]) - await server.switch.stop() - await client.switch.stop() - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect - - # When triggering the reconnection, and some time for the reconnection to happen - waitFor allFutures(client.stop(), server.stop()) - await allFutures(server.start(), client.start()) - await sleepAsync(FUTURE_TIMEOUT_LONG) + let reconnectDurationWithBackoffPeriod = + getTime().toUnixFloat() - beforeReconnect # Then both peers should be marked as Connected check: clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + reconnectDurationWithBackoffPeriod > + (reconnectDuration + backoffPeriod.seconds.float) suite "Handling Connections on Different Networks": # TODO: Implement after discv5 and peer manager's interaction is understood diff --git a/tests/testlib/testutils.nim b/tests/testlib/testutils.nim index febebe36ba..b436c6ac4a 100644 --- a/tests/testlib/testutils.nim +++ b/tests/testlib/testutils.nim @@ -1,4 +1,4 @@ -import testutils/unittests +import testutils/unittests, chronos template xsuite*(name: string, body: untyped) = discard @@ -27,3 +27,11 @@ template xasyncTest*(name: string, body: untyped) = template asyncTestx*(name: string, body: untyped) = test name: skip() + +template waitActive*(condition: bool) = + for i in 0 ..< 200: + if condition: + break + await sleepAsync(10) + + assert condition diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 12c6a084f7..87506e7a4c 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -226,6 +226,10 @@ proc connectRelay*( return false +proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} = + let peerId = peer.peerId + await pm.switch.disconnect(peerId) + # Dialing should be used for just protocols that require a stream to write and read # This shall not be used to dial Relay protocols, since that would create # unneccesary unused streams. @@ -560,46 +564,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str pm.addPeer(remotePeerInfo) -proc reconnectPeers*( - pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0) -) {.async.} = - ## Reconnect to peers registered for this protocol. This will update connectedness. - ## Especially useful to resume connections from persistent storage after a restart. - - trace "Reconnecting peers", proto = proto - - # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): - # Check that the peer can be connected - if peerInfo.connectedness == CannotConnect: - error "Not reconnecting to unreachable or non-existing peer", - peerId = peerInfo.peerId - continue - - # Respect optional backoff period where applicable. - let - # TODO: Add method to peerStore (eg isBackoffExpired()) - disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert - currentTime = Moment.init(getTime().toUnix, Second) - # Current time comparable to persisted value - backoffTime = disconnectTime + backoff - currentTime - # Consider time elapsed since last disconnect - - trace "Respecting backoff", - backoff = backoff, - disconnectTime = disconnectTime, - currentTime = currentTime, - backoffTime = backoffTime - - # TODO: This blocks the whole function. Try to connect to another peer in the meantime. - if backoffTime > ZeroDuration: - trace "Backing off before reconnect...", - peerId = peerInfo.peerId, backoffTime = backoffTime - # We disconnected recently and still need to wait for a backoff period before connecting - await sleepAsync(backoffTime) - - discard await pm.connectRelay(peerInfo) - #################### # Dialer interface # #################### @@ -647,7 +611,7 @@ proc connectToNodes*( if nodes.len == 0: return - info "Dialing multiple peers", numOfPeers = nodes.len + info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes var futConns: seq[Future[bool]] var connectedPeers: seq[RemotePeerInfo] @@ -685,6 +649,30 @@ proc connectToNodes*( # later. await sleepAsync(chronos.seconds(5)) +proc reconnectPeers*( + pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0) +) {.async.} = + ## Reconnect to peers registered for this protocol. This will update connectedness. + ## Especially useful to resume connections from persistent storage after a restart. + + debug "Reconnecting peers", proto = proto + + # Proto is not persisted, we need to iterate over all peers. + for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): + # Check that the peer can be connected + if peerInfo.connectedness == CannotConnect: + error "Not reconnecting to unreachable or non-existing peer", + peerId = peerInfo.peerId + continue + + if backoffTime > ZeroDuration: + debug "Backing off before reconnect", + peerId = peerInfo.peerId, backoffTime = backoffTime + # We disconnected recently and still need to wait for a backoff period before connecting + await sleepAsync(backoffTime) + + await pm.connectToNodes(@[peerInfo]) + proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = ## Returns the peerIds of physical connections (in and out) ## containing at least one stream with the given protocol. diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e137f3ed07..ca10ca799b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -198,6 +198,9 @@ proc connectToNodes*( # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source = source) +proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} = + await peer_manager.disconnectNode(node.peerManager, remotePeer) + ## Waku Sync proc mountWakuSync*( From 0613f000711d41029af33a64034b20cb9e08f614 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 23 Sep 2024 14:23:37 +0200 Subject: [PATCH 4/6] revert unnecesary change --- tests/node/peer_manager/peer_store/utils.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/node/peer_manager/peer_store/utils.nim b/tests/node/peer_manager/peer_store/utils.nim index 8c631b5080..1d5dc6e227 100644 --- a/tests/node/peer_manager/peer_store/utils.nim +++ b/tests/node/peer_manager/peer_store/utils.nim @@ -1,7 +1,7 @@ import std/options, stew/results, libp2p/peerstore import - ../../../../waku/node/peer_manager/[waku_peer_store, peer_store/waku_peer_storage], + waku/node/peer_manager/[waku_peer_store, peer_store/waku_peer_storage], ../../../waku_archive/archive_utils proc newTestWakuPeerStorage*(path: Option[string] = string.none()): WakuPeerStorage = From 772131ebafe2be1a7465f90087a960c13bf3f0f9 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 24 Sep 2024 09:36:40 +0200 Subject: [PATCH 5/6] simplify test_wakunode_peer_manager --- tests/node/test_wakunode_peer_manager.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index d7551286a6..bbfeee4ef1 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -595,9 +595,7 @@ suite "Peer Manager": serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect # When triggering the reconnection - var beforeReconnect = getTime().toUnixFloat() await client.peerManager.reconnectPeers(WakuRelayCodec) - let reconnectDuration = getTime().toUnixFloat() - beforeReconnect # Then both peers should be marked as Connected waitActive: @@ -622,8 +620,7 @@ suite "Peer Manager": check: clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected - reconnectDurationWithBackoffPeriod > - (reconnectDuration + backoffPeriod.seconds.float) + reconnectDurationWithBackoffPeriod > backoffPeriod.seconds.float suite "Handling Connections on Different Networks": # TODO: Implement after discv5 and peer manager's interaction is understood From fe29066147f738f7a7c295090be546a73a3d3074 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 24 Sep 2024 17:36:47 +0200 Subject: [PATCH 6/6] test_wakunode_peer_manager: I missed a let keyword --- tests/node/test_wakunode_peer_manager.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index bbfeee4ef1..104baa6ef4 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -611,7 +611,7 @@ suite "Peer Manager": # When triggering a reconnection with a backoff period let backoffPeriod = chronos.seconds(1) - beforeReconnect = getTime().toUnixFloat() + let beforeReconnect = getTime().toUnixFloat() await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod) let reconnectDurationWithBackoffPeriod = getTime().toUnixFloat() - beforeReconnect