diff --git a/tm2/pkg/p2p/switch.go b/tm2/pkg/p2p/switch.go index 5c1c37f7729..0dd087026dd 100644 --- a/tm2/pkg/p2p/switch.go +++ b/tm2/pkg/p2p/switch.go @@ -72,8 +72,9 @@ type MultiplexSwitch struct { privatePeers sync.Map // ID -> nothing; lookup table of peers who are not shared transport Transport - dialQueue *dial.Queue - events *events.Events + dialQueue *dial.Queue + dialNotify chan struct{} + events *events.Events } // NewMultiplexSwitch creates a new MultiplexSwitch with the given config. @@ -88,6 +89,7 @@ func NewMultiplexSwitch( peers: newSet(), transport: transport, dialQueue: dial.NewQueue(), + dialNotify: make(chan struct{}, 1), events: events.New(), maxInboundPeers: defaultCfg.MaxNumInboundPeers, maxOutboundPeers: defaultCfg.MaxNumOutboundPeers, @@ -262,13 +264,15 @@ func (sw *MultiplexSwitch) runDialLoop(ctx context.Context) { select { case <-ctx.Done(): sw.Logger.Debug("dial context canceled") - return + default: // Grab a dial item item := sw.dialQueue.Peek() if item == nil { - // Nothing to dial + // Nothing to dial, wait until something is + // added to the queue + sw.waitForPeersToDial(ctx) continue } @@ -565,6 +569,7 @@ func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress) { } sw.dialQueue.Push(item) + sw.notifyAddPeerToDial() } } @@ -588,6 +593,7 @@ func (sw *MultiplexSwitch) dialItems(dialItems ...dial.Item) { } sw.dialQueue.Push(dialItem) + sw.notifyAddPeerToDial() } } @@ -698,6 +704,20 @@ func (sw *MultiplexSwitch) addPeer(p PeerConn) error { return nil } +func (sw *MultiplexSwitch) notifyAddPeerToDial() { + select { + case sw.dialNotify <- struct{}{}: + default: + } +} + +func (sw *MultiplexSwitch) waitForPeersToDial(ctx context.Context) { + select { + case <-ctx.Done(): + case <-sw.dialNotify: + } +} + // logTelemetry logs the switch telemetry data // to global metrics funnels func (sw *MultiplexSwitch) logTelemetry() {