Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: peermanager for stale peers management. #3051

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ const
# TODO: Make configurable
DefaultDialTimeout* = chronos.seconds(10)

# Max attempts before removing the peer
MaxFailedAttempts = 5

# Time to wait before attempting to dial again is calculated as:
# initialBackoffInSec*(backoffFactor^(failedAttempts-1))
# 120s, 480s, 1920, 7680s
Expand All @@ -71,13 +68,14 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5

Threshold = chronos.hours(2)

type PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
storage*: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
Expand Down Expand Up @@ -184,9 +182,8 @@ proc connectRelay*(
if not pm.peerStore.hasPeer(peerId, WakuRelayCodec):
pm.addPeer(peer)

let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
trace "Connecting to relay peer",
wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts
wireAddr = peer.addrs, peerId = peerId

var deadline = sleepAsync(dialTimeout)
let workfut = pm.switch.connect(peerId, peer.addrs)
Expand All @@ -208,20 +205,21 @@ proc connectRelay*(
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])

pm.peerStore[NumberFailedConnBook][peerId] = 0

if pm.peerStore[FirstFailedConnBook].contains(peerId):
discard pm.peerStore[FirstFailedConnBook].del(peerId)

return true

# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] =
pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
pm.peerStore[FirstFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)

trace "Connecting relay peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
reason = reasonFailed

waku_peers_dials.inc(labelValues = [reasonFailed])

return false
Expand Down Expand Up @@ -311,22 +309,25 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]

# if it never errored, we can try to connect
if failedAttempts == 0:
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
return true

# if there are too many failed attempts, do not reconnect
if failedAttempts >= pm.maxFailedAttempts:

# if it's break threshold then do not reconnect
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)

if (currentTime - disconnectTime) > Threshold:
return false

# If it errored we wait an exponential backoff from last connection
# the more failed attempts, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff =
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, 5)

return now >= (lastFailed + backoff)

Expand Down Expand Up @@ -457,7 +458,6 @@ proc new*(
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
): PeerManager {.gcsafe.} =
Expand Down Expand Up @@ -489,7 +489,7 @@ proc new*(
maxRelayPeersValue = maxConnections - (maxConnections div 5)

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, 5)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
Expand All @@ -506,7 +506,6 @@ proc new*(
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
)
Expand Down Expand Up @@ -849,8 +848,11 @@ proc prunePeerStore*(pm: PeerManager) =
var peersToPrune: HashSet[PeerId]

# prune failed connections
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
for peerId in pm.peerStore[FirstFailedConnBook].book.keys:
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)
if (currentTime - disconnectTime) < Threshold:
continue

if peersToPrune.len >= pruningCount:
Expand Down
6 changes: 3 additions & 3 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type
# Last failed connection attemp timestamp
LastFailedConnBook* = ref object of PeerBook[Moment]

# Failed connection attempts
NumberFailedConnBook* = ref object of PeerBook[int]
# First failed connection attemp timestamp
FirstFailedConnBook* = ref object of PeerBook[Moment]

# Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
Expand Down Expand Up @@ -67,7 +67,7 @@ proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo =
origin: peerStore[SourceBook][peerId],
direction: peerStore[DirectionBook][peerId],
lastFailedConn: peerStore[LastFailedConnBook][peerId],
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
firstFailedConn: peerStore[FirstFailedConnBook][peerId],
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
Expand Down
5 changes: 3 additions & 2 deletions waku/waku_core/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type RemotePeerInfo* = ref object
disconnectTime*: int64
origin*: PeerOrigin
direction*: PeerDirection
firstFailedConn*: Moment
lastFailedConn*: Moment
numberFailedConn*: int


func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId

Expand Down
Loading