Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drop msgs to be relayed waiting for too long in the queue #1017

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
@@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
disconnectPeerAboveRateLimit: false,
maxDurationInNonPriorityQueue: Opt.none(Duration),
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
@@ -220,6 +221,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0

pubSubPeer.stopSendNonPriorityTask()

procCall FloodSub(g).unsubscribePeer(peer)

proc handleSubscribe*(g: GossipSub,
@@ -279,31 +282,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages
let
isPruneNotEmpty = respControl.prune.len > 0
isIWantNotEmpty = respControl.iwant.len > 0

if isPruneNotEmpty or isIWantNotEmpty:

if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

if isPruneNotEmpty:
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl)), true)

if messages.len > 0:
for smsg in messages:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
# iwant replies have lower priority
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
RPCMsg(messages: messages), false)

proc validateAndRelay(g: GossipSub,
msg: Message,
@@ -370,7 +382,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
@@ -441,7 +453,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
@@ -655,7 +667,7 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]), true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
@@ -740,4 +752,5 @@ method getOrCreatePeer*(
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
return peer
4 changes: 4 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
@@ -147,6 +147,10 @@ type
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool

# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
maxDurationInNonPriorityQueue*: Opt[Duration]

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]

11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
@@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to remote peer
##

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} =
msg: RPCMsg,
isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
@@ -195,12 +196,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
p.send(peer, msg, isHighPriority)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)
asyncSpawn peer.sendEncoded(encoded, isHighPriority)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
138 changes: 113 additions & 25 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
@@ -31,6 +31,12 @@
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

declareCounter(libp2p_gossipsub_non_priority_msgs_dropped, "the number of dropped messages in the non-priority queue", labels = ["id"])


type
PeerRateLimitError* = object of CatchableError

@@ -49,6 +55,20 @@
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

QueuedMessage* = object
msg*: seq[byte]
addedAt*: Moment

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[QueuedMessage]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
maxDurationInNonPriorityQueue*: Opt[Duration]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
@@ -70,6 +90,8 @@
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue*: RpcMessageQueue

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}

@@ -82,6 +104,16 @@
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"

func hash*(p: PubSubPeer): Hash =
p.peerId.hash

@@ -227,17 +259,13 @@
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
@@ -262,6 +290,30 @@

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

if isHighPriority:
p.clearSendPriorityQueue()
let f = p.sendMsg(msg)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now()))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
@@ -297,7 +349,7 @@
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
@@ -317,11 +369,11 @@

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
await p.sendEncoded(encoded, isHighPriority)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
@@ -330,14 +382,58 @@
return true
return false

proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
# we send non-priority messages only if there are no pending priority messages
let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
p.clearSendPriorityQueue()
# this minimizes the number of times we have to wait for something (each wait = performance cost)
# we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed
# to be finished already (since sends are processed in order).
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue:
when defined(libp2p_expensive_metrics):

Check warning on line 400 in libp2p/protocols/pubsub/pubsubpeer.nim

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsubpeer.nim#L400

Added line #L400 was not covered by tests
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
continue
await p.sendMsg(queuedMsg.msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
)

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
getConn: GetConn,
onEvent: OnEvent,
codec: string,
maxMessageSize: int,
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =

result = T(
getConn: getConn,
@@ -346,17 +442,9 @@
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
result.startSendNonPriorityTask()
31 changes: 24 additions & 7 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@

{.used.}

import std/[options, deques, sequtils, enumerate, algorithm]
import std/[options, deques, sequtils, enumerate, algorithm, os]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
@@ -718,15 +718,19 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)):
Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let
nodes = generateNodes(2, gossip = true, verifySignature = false)
discard await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start()
)
var gossip0: GossipSub = GossipSub(nodes[0])
var gossip1: GossipSub = GossipSub(nodes[1])

await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1
await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs)

var receivedMessages = new(HashSet[seq[byte]])

@@ -736,12 +740,10 @@ suite "GossipSub internal":
proc handlerB(topic: string, data: seq[byte]) {.async.} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
gossip0.subscribe("foobar", handlerA)
gossip1.subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip0: GossipSub = GossipSub(nodes[0])
var gossip1: GossipSub = GossipSub(nodes[1])

return (gossip0, gossip1, receivedMessages)

@@ -844,3 +846,18 @@ suite "GossipSub internal":
check receivedMessages[].len == 1

await teardownTest(gossip0, gossip1)

asyncTest "e2e - drop msg if it is in the non-priority queue for too long":
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
# Expected: Both messages should be received.
let maxDurationInNonPriorityQueueGossip1 = 100.millis
let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1))

let topic = "foobar"
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false)
sleep(100) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false)
await sleepAsync(100.milliseconds) # wait for the messages to be processed
check: receivedMessages[].len == 1 # only the second message should be received

await teardownTest(gossip0, gossip1)