Skip to content

Commit

Permalink
fix(p2p): infinity loop if no peer in queue (#3593)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Jan 23, 2025
1 parent fd24486 commit 8e1c532
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions tm2/pkg/p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type MultiplexSwitch struct {
privatePeers sync.Map // ID -> nothing; lookup table of peers who are not shared
transport Transport

dialQueue *dial.Queue
events *events.Events
dialQueue *dial.Queue
dialNotify chan struct{}
events *events.Events
}

// NewMultiplexSwitch creates a new MultiplexSwitch with the given config.
Expand All @@ -88,6 +89,7 @@ func NewMultiplexSwitch(
peers: newSet(),
transport: transport,
dialQueue: dial.NewQueue(),
dialNotify: make(chan struct{}, 1),
events: events.New(),
maxInboundPeers: defaultCfg.MaxNumInboundPeers,
maxOutboundPeers: defaultCfg.MaxNumOutboundPeers,
Expand Down Expand Up @@ -262,13 +264,15 @@ func (sw *MultiplexSwitch) runDialLoop(ctx context.Context) {
select {
case <-ctx.Done():
sw.Logger.Debug("dial context canceled")

return

default:
// Grab a dial item
item := sw.dialQueue.Peek()
if item == nil {
// Nothing to dial
// Nothing to dial, wait until something is
// added to the queue
sw.waitForPeersToDial(ctx)
continue
}

Expand Down Expand Up @@ -565,6 +569,7 @@ func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress) {
}

sw.dialQueue.Push(item)
sw.notifyAddPeerToDial()
}
}

Expand All @@ -588,6 +593,7 @@ func (sw *MultiplexSwitch) dialItems(dialItems ...dial.Item) {
}

sw.dialQueue.Push(dialItem)
sw.notifyAddPeerToDial()
}
}

Expand Down Expand Up @@ -698,6 +704,20 @@ func (sw *MultiplexSwitch) addPeer(p PeerConn) error {
return nil
}

func (sw *MultiplexSwitch) notifyAddPeerToDial() {
select {
case sw.dialNotify <- struct{}{}:
default:
}
}

func (sw *MultiplexSwitch) waitForPeersToDial(ctx context.Context) {
select {
case <-ctx.Done():
case <-sw.dialNotify:
}
}

// logTelemetry logs the switch telemetry data
// to global metrics funnels
func (sw *MultiplexSwitch) logTelemetry() {
Expand Down

0 comments on commit 8e1c532

Please sign in to comment.