diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index 4e10556f73..104baa6ef4 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -8,6 +8,7 @@ import chronos, # chronos/timer, chronicles, + times, libp2p/[peerstore, crypto/crypto, multiaddress] from times import getTime, toUnix @@ -62,9 +63,9 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, listenIp, listenPort) + server = newTestWakuNode(serverKey, listenIp, Port(3000)) serverPeerStore = server.peerManager.peerStore - client = newTestWakuNode(clientKey, listenIp, listenPort) + client = newTestWakuNode(clientKey, listenIp, Port(3001)) clientPeerStore = client.peerManager.peerStore await allFutures(server.start(), client.start()) @@ -577,77 +578,49 @@ suite "Peer Manager": Connectedness.CannotConnect suite "Automatic Reconnection": - xasyncTest "Automatic Reconnection Implementation": + asyncTest "Automatic Reconnection Implementation": # Given two correctly initialised nodes, that are available for reconnection await server.mountRelay() await client.mountRelay() await client.connectToNodes(@[serverRemotePeerInfo]) - await server.switch.stop() - await client.switch.stop() - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + await client.disconnectNode(serverRemotePeerInfo) + + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect # When triggering the reconnection await client.peerManager.reconnectPeers(WakuRelayCodec) # Then both peers should be marked as Connected - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected - xasyncTest "Automatic Reconnection Implementation (With Backoff)": - # Given two correctly initialised nodes, that are available for reconnection - await server.mountRelay() - await client.mountRelay() - await client.connectToNodes(@[serverRemotePeerInfo]) - waitFor allFutures(server.switch.stop(), client.switch.stop()) - waitFor allFutures(server.switch.start(), client.switch.start()) - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + ## Now let's do the same but with backoff period + await client.disconnectNode(serverRemotePeerInfo) - # When triggering a reconnection with a backoff period - let - backoffPeriod = 10.seconds - halfBackoffPeriod = 5.seconds + waitActive: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + # When triggering a reconnection with a backoff period + let backoffPeriod = chronos.seconds(1) + let beforeReconnect = getTime().toUnixFloat() await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod) - await sleepAsync(halfBackoffPeriod) - - # If the backoff period is not over, then the peers should still be marked as CanConnect - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect - - # When waiting for the backoff period to be over - await sleepAsync(halfBackoffPeriod) - - # Then both peers should be marked as Connected - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected - - xasyncTest "Automatic Reconnection Implementation (After client restart)": - # Given two correctly initialised nodes, that are available for reconnection - await server.mountRelay() - await client.mountRelay() - await client.connectToNodes(@[serverRemotePeerInfo]) - await server.switch.stop() - await client.switch.stop() - check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect - - # When triggering the reconnection, and some time for the reconnection to happen - waitFor allFutures(client.stop(), server.stop()) - await allFutures(server.start(), client.start()) - await sleepAsync(FUTURE_TIMEOUT_LONG) + let reconnectDurationWithBackoffPeriod = + getTime().toUnixFloat() - beforeReconnect # Then both peers should be marked as Connected check: clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + reconnectDurationWithBackoffPeriod > backoffPeriod.seconds.float suite "Handling Connections on Different Networks": # TODO: Implement after discv5 and peer manager's interaction is understood diff --git a/tests/testlib/testutils.nim b/tests/testlib/testutils.nim index febebe36ba..b436c6ac4a 100644 --- a/tests/testlib/testutils.nim +++ b/tests/testlib/testutils.nim @@ -1,4 +1,4 @@ -import testutils/unittests +import testutils/unittests, chronos template xsuite*(name: string, body: untyped) = discard @@ -27,3 +27,11 @@ template xasyncTest*(name: string, body: untyped) = template asyncTestx*(name: string, body: untyped) = test name: skip() + +template waitActive*(condition: bool) = + for i in 0 ..< 200: + if condition: + break + await sleepAsync(10) + + assert condition diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 12c6a084f7..87506e7a4c 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -226,6 +226,10 @@ proc connectRelay*( return false +proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} = + let peerId = peer.peerId + await pm.switch.disconnect(peerId) + # Dialing should be used for just protocols that require a stream to write and read # This shall not be used to dial Relay protocols, since that would create # unneccesary unused streams. @@ -560,46 +564,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str pm.addPeer(remotePeerInfo) -proc reconnectPeers*( - pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0) -) {.async.} = - ## Reconnect to peers registered for this protocol. This will update connectedness. - ## Especially useful to resume connections from persistent storage after a restart. - - trace "Reconnecting peers", proto = proto - - # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): - # Check that the peer can be connected - if peerInfo.connectedness == CannotConnect: - error "Not reconnecting to unreachable or non-existing peer", - peerId = peerInfo.peerId - continue - - # Respect optional backoff period where applicable. - let - # TODO: Add method to peerStore (eg isBackoffExpired()) - disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert - currentTime = Moment.init(getTime().toUnix, Second) - # Current time comparable to persisted value - backoffTime = disconnectTime + backoff - currentTime - # Consider time elapsed since last disconnect - - trace "Respecting backoff", - backoff = backoff, - disconnectTime = disconnectTime, - currentTime = currentTime, - backoffTime = backoffTime - - # TODO: This blocks the whole function. Try to connect to another peer in the meantime. - if backoffTime > ZeroDuration: - trace "Backing off before reconnect...", - peerId = peerInfo.peerId, backoffTime = backoffTime - # We disconnected recently and still need to wait for a backoff period before connecting - await sleepAsync(backoffTime) - - discard await pm.connectRelay(peerInfo) - #################### # Dialer interface # #################### @@ -647,7 +611,7 @@ proc connectToNodes*( if nodes.len == 0: return - info "Dialing multiple peers", numOfPeers = nodes.len + info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes var futConns: seq[Future[bool]] var connectedPeers: seq[RemotePeerInfo] @@ -685,6 +649,30 @@ proc connectToNodes*( # later. await sleepAsync(chronos.seconds(5)) +proc reconnectPeers*( + pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0) +) {.async.} = + ## Reconnect to peers registered for this protocol. This will update connectedness. + ## Especially useful to resume connections from persistent storage after a restart. + + debug "Reconnecting peers", proto = proto + + # Proto is not persisted, we need to iterate over all peers. + for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): + # Check that the peer can be connected + if peerInfo.connectedness == CannotConnect: + error "Not reconnecting to unreachable or non-existing peer", + peerId = peerInfo.peerId + continue + + if backoffTime > ZeroDuration: + debug "Backing off before reconnect", + peerId = peerInfo.peerId, backoffTime = backoffTime + # We disconnected recently and still need to wait for a backoff period before connecting + await sleepAsync(backoffTime) + + await pm.connectToNodes(@[peerInfo]) + proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = ## Returns the peerIds of physical connections (in and out) ## containing at least one stream with the given protocol. diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e137f3ed07..ca10ca799b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -198,6 +198,9 @@ proc connectToNodes*( # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source = source) +proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} = + await peer_manager.disconnectNode(node.peerManager, remotePeer) + ## Waku Sync proc mountWakuSync*(