Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: test peer connection management #3049

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 31 additions & 55 deletions tests/node/test_wakunode_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import
chronos,
# chronos/timer,
chronicles,
times,
libp2p/[peerstore, crypto/crypto, multiaddress]

from times import getTime, toUnix
Expand Down Expand Up @@ -62,9 +63,9 @@ suite "Peer Manager":
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, listenIp, listenPort)
server = newTestWakuNode(serverKey, listenIp, Port(3000))
serverPeerStore = server.peerManager.peerStore
client = newTestWakuNode(clientKey, listenIp, listenPort)
client = newTestWakuNode(clientKey, listenIp, Port(3001))
clientPeerStore = client.peerManager.peerStore

await allFutures(server.start(), client.start())
Expand Down Expand Up @@ -577,77 +578,52 @@ suite "Peer Manager":
Connectedness.CannotConnect

suite "Automatic Reconnection":
xasyncTest "Automatic Reconnection Implementation":
asyncTest "Automatic Reconnection Implementation":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

await client.disconnectNode(serverRemotePeerInfo)

waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering the reconnection
var beforeReconnect = getTime().toUnixFloat()
await client.peerManager.reconnectPeers(WakuRelayCodec)
let reconnectDuration = getTime().toUnixFloat() - beforeReconnect

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

xasyncTest "Automatic Reconnection Implementation (With Backoff)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
waitFor allFutures(server.switch.stop(), client.switch.stop())
waitFor allFutures(server.switch.start(), client.switch.start())
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
## Now let's do the same but with backoff period
await client.disconnectNode(serverRemotePeerInfo)

# When triggering a reconnection with a backoff period
let
backoffPeriod = 10.seconds
halfBackoffPeriod = 5.seconds
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering a reconnection with a backoff period
let backoffPeriod = chronos.seconds(1)
beforeReconnect = getTime().toUnixFloat()
await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
await sleepAsync(halfBackoffPeriod)

# If the backoff period is not over, then the peers should still be marked as CanConnect
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When waiting for the backoff period to be over
await sleepAsync(halfBackoffPeriod)

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

xasyncTest "Automatic Reconnection Implementation (After client restart)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering the reconnection, and some time for the reconnection to happen
waitFor allFutures(client.stop(), server.stop())
await allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT_LONG)
let reconnectDurationWithBackoffPeriod =
getTime().toUnixFloat() - beforeReconnect

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
reconnectDurationWithBackoffPeriod >
(reconnectDuration + backoffPeriod.seconds.float)
Comment on lines +625 to +626
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why this is true. Shouldn't they be theoretically equal, but in practice any of them be longer than the other one?

Or in other words, why reconnectDuration + backoffPeriod.seconds.float can't be greater than reconnectDurationWithBackoffPeriod?

Copy link
Collaborator Author

@Ivansete-status Ivansete-status Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why this is true. Shouldn't they be theoretically equal, but in practice any of them be longer than the other one?

Or in other words, why reconnectDuration + backoffPeriod.seconds.float can't be greater than reconnectDurationWithBackoffPeriod?

Actually, in both cases we are doing the same operation but, in when the backoff time assigned is != 0, then we add this extra block:

    if backoffTime > ZeroDuration:
      debug "Backing off before reconnect",
        peerId = peerInfo.peerId, backoffTime = backoffTime
      # We disconnected recently and still need to wait for a backoff period before connecting
      await sleepAsync(backoffTime)

... where we are adding an extra backoffTime plus the time spent in the debug call plus the if statement. Hence, the > condition should happen.


suite "Handling Connections on Different Networks":
# TODO: Implement after discv5 and peer manager's interaction is understood
Expand Down
10 changes: 9 additions & 1 deletion tests/testlib/testutils.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import testutils/unittests
import testutils/unittests, chronos

template xsuite*(name: string, body: untyped) =
discard
Expand Down Expand Up @@ -27,3 +27,11 @@ template xasyncTest*(name: string, body: untyped) =
template asyncTestx*(name: string, body: untyped) =
test name:
skip()

template waitActive*(condition: bool) =
for i in 0 ..< 200:
if condition:
break
await sleepAsync(10)

assert condition
70 changes: 29 additions & 41 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ proc connectRelay*(

return false

proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
let peerId = peer.peerId
await pm.switch.disconnect(peerId)

# Dialing should be used for just protocols that require a stream to write and read
# This shall not be used to dial Relay protocols, since that would create
# unneccesary unused streams.
Expand Down Expand Up @@ -560,46 +564,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str

pm.addPeer(remotePeerInfo)

proc reconnectPeers*(
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

trace "Reconnecting peers", proto = proto

# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

# Respect optional backoff period where applicable.
let
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second)
# Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime
# Consider time elapsed since last disconnect

trace "Respecting backoff",
backoff = backoff,
disconnectTime = disconnectTime,
currentTime = currentTime,
backoffTime = backoffTime

# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
trace "Backing off before reconnect...",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

discard await pm.connectRelay(peerInfo)

####################
# Dialer interface #
####################
Expand Down Expand Up @@ -647,7 +611,7 @@ proc connectToNodes*(
if nodes.len == 0:
return

info "Dialing multiple peers", numOfPeers = nodes.len
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes

var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
Expand Down Expand Up @@ -685,6 +649,30 @@ proc connectToNodes*(
# later.
await sleepAsync(chronos.seconds(5))

proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

debug "Reconnecting peers", proto = proto

# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

await pm.connectToNodes(@[peerInfo])

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
Expand Down
3 changes: 3 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ proc connectToNodes*(
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source = source)

proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
await peer_manager.disconnectNode(node.peerManager, remotePeer)

## Waku Sync

proc mountWakuSync*(
Expand Down
Loading