Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bitswap lock contention under high load #817

Merged
merged 22 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ The following emojis are used to highlight certain changes:

### Fixed

`bitswap/client`: Fix runaway goroutine creation under high load. Under high load conditions, goroutines are created faster than they can complete and the more goroutines creates the slower them complete. This creates a positive feedback cycle that ends in OOM. The fix dynamically adjusts message send scheduling to avoid the runaway condition. [#817](https://github.com/ipfs/boxo/pull/817)


### Security


Expand Down
15 changes: 14 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@
}
}

// WithPerPeerSendDelay determines how long to wait, based on the number of
// peers, for wants to accumulate before sending a bitswap message to peers. A
// value of 0 uses bitswap messagequeue default.
func WithPerPeerSendDelay(delay time.Duration) Option {
return func(bs *Client) {
bs.perPeerSendDelay = delay
}

Check warning on line 85 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L82-L85

Added lines #L82 - L85 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -178,7 +187,9 @@
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig))
return bsmq.New(ctx, p, network, onDontHaveTimeout,
bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig),
bsmq.WithPerPeerSendDelay(bs.perPeerSendDelay))
}
bs.dontHaveTimeoutConfig = nil

Expand Down Expand Up @@ -292,6 +303,8 @@

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool

perPeerSendDelay time.Duration
}

type counters struct {
Expand Down
160 changes: 84 additions & 76 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/filecoin-project/go-clock"
Expand Down Expand Up @@ -42,14 +43,16 @@
sendErrorBackoff = 100 * time.Millisecond
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
sendMessageCutoff = 256
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 20 * time.Millisecond
sendTimeout = 30 * time.Second
// wait this long before sending next message
sendTimeout = 30 * time.Second

defaultPerPeerDelay = time.Millisecond / 8
maxSendMessageDelay = time.Second
minSendMessageDelay = 20 * time.Millisecond
)

var peerCount atomic.Int64

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand Down Expand Up @@ -81,7 +84,7 @@
maxValidLatency time.Duration

// Signals that there are outgoing wants / cancels ready to be processed
outgoingWork chan time.Time
outgoingWork chan struct{}

// Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer
responses chan []cid.Cid
Expand All @@ -105,6 +108,8 @@

// Used to track things that happen asynchronously -- used only in test
events chan<- messageEvent

perPeerDelay time.Duration
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -232,7 +237,8 @@
}

type optsConfig struct {
dhtConfig *DontHaveTimeoutConfig
dhtConfig *DontHaveTimeoutConfig
perPeerDelay time.Duration
}

type option func(*optsConfig)
Expand All @@ -243,11 +249,22 @@
}
}

func WithPerPeerSendDelay(perPeerDelay time.Duration) option {
return func(cfg *optsConfig) {
if perPeerDelay == 0 {
perPeerDelay = defaultPerPeerDelay
}
cfg.perPeerDelay = perPeerDelay
}
}

// New creates a new MessageQueue.
//
// If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue {
var opts optsConfig
opts := optsConfig{
perPeerDelay: defaultPerPeerDelay,
}
for _, o := range options {
o(&opts)
}
Expand All @@ -261,7 +278,9 @@
}
dhTimeoutMgr = newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, opts.dhtConfig)
}
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil)
mq := newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil)
mq.perPeerDelay = opts.perPeerDelay
return mq
}

type messageEvent int
Expand Down Expand Up @@ -298,7 +317,7 @@
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
outgoingWork: make(chan struct{}, 1),
responses: make(chan []cid.Cid, 8),
rebroadcastNow: make(chan struct{}),
sendErrorBackoff: sendErrorBackoff,
Expand Down Expand Up @@ -377,9 +396,9 @@
mq.dhTimeoutMgr.CancelPending(cancelKs)
}

mq.wllock.Lock()
var workReady bool

workReady := false
mq.wllock.Lock()

// Remove keys from broadcast and peer wants, and add to cancels
for _, c := range cancelKs {
Expand Down Expand Up @@ -456,6 +475,9 @@
func (mq *MessageQueue) runQueue() {
const runRebroadcastsInterval = rebroadcastInterval / 2

peerCount.Add(1)
defer peerCount.Add(-1)

defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
Expand All @@ -466,10 +488,12 @@
<-scheduleWork.C
}

perPeerDelay := mq.perPeerDelay
hasWorkChan := mq.outgoingWork

rebroadcastTimer := mq.clock.Timer(runRebroadcastsInterval)
defer rebroadcastTimer.Stop()

var workScheduled time.Time
for {
select {
case now := <-rebroadcastTimer.C:
Expand All @@ -479,37 +503,20 @@
case <-mq.rebroadcastNow:
mq.rebroadcastWantlist(mq.clock.Now(), 0)

case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
// We send the time on the channel so we accurately
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
<-scheduleWork.C
case <-hasWorkChan:
if mq.events != nil {
mq.events <- messageQueued
}

// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCutoff ||
mq.clock.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
if mq.events != nil {
mq.events <- messageQueued
}
}
mq.sendMessage()
hasWorkChan = nil

delay := time.Duration(peerCount.Load()) * perPeerDelay
delay = max(minSendMessageDelay, min(maxSendMessageDelay, delay))
scheduleWork.Reset(delay)

case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
hasWorkChan = mq.outgoingWork

case res := <-mq.responses:
// We received a response from the peer, calculate latency
Expand Down Expand Up @@ -538,17 +545,11 @@

func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- mq.clock.Now():
case mq.outgoingWork <- struct{}{}:
default:
}
}

func (mq *MessageQueue) sendIfReady() {
if mq.hasPendingWork() {
mq.sendMessage()
}
}

func (mq *MessageQueue) sendMessage() {
sender, err := mq.initializeSender()
if err != nil {
Expand All @@ -565,38 +566,50 @@
mq.dhTimeoutMgr.Start()
}

// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
supportsHave := mq.sender.SupportsHave()

// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)

if message.Empty() {
return
}
var wantlist []bsmsg.Entry

wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)
for {
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(supportsHave)
if message.Empty() {
return
}

if err := sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}
wantlist = message.FillWantlist(wantlist)
mq.logOutgoingMessage(wantlist)

// Record sent time so as to calculate message latency
onSent()
if err = sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}

Check warning on line 592 in bitswap/client/internal/messagequeue/messagequeue.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/messagequeue.go#L587-L592

Added lines #L587 - L592 were not covered by tests

// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)
// Record sent time so as to calculate message latency
onSent()

// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)

// If the message was too big and only a subset of wants could be sent,
// send more if the the workcount is above the cutoff. Otherwise,
// schedule sending the rest of the wants in the next iteration of the
// event loop.
pendingWork := mq.pendingWorkCount()
if pendingWork < sendMessageCutoff {
if pendingWork > 0 {
mq.signalWorkReady()
}
return
}

mq.msg.Reset(false)

Check warning on line 612 in bitswap/client/internal/messagequeue/messagequeue.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/messagequeue.go#L612

Added line #L612 was not covered by tests
}
}

Expand Down Expand Up @@ -721,11 +734,6 @@
}
}

// Whether there is work to be processed
func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}

// The amount of work that is waiting to be processed
func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
Expand Down
Loading