Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: drop old msgs to be relayed
Browse files Browse the repository at this point in the history
diegomrsantos committed Jan 31, 2024

Verified

This commit was signed with the committer’s verified signature.
1 parent 9fe2ec6 commit 61a1300
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
@@ -53,10 +53,16 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

Ttlmessage* = object
msg*: seq[byte]
ttl*: Moment

RpcMessageQueue* = ref object
sendPriorityQueue: Deque[Future[void]]
nonPriorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[Ttlmessage]
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
maxDurationInNonPriorityQueue: Duration

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
@@ -289,7 +295,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now()))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
@@ -373,10 +379,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[0]
p.clearSendPriorityQueue()
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)
if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
continue
await p.sendMsg(ttlMsg.msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
@@ -394,10 +402,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[Ttlmessage](),
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
)

proc new*(

0 comments on commit 61a1300

Please sign in to comment.