From e417c72c3b1c00ad3783cdc5f9b9e01725d86544 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Thu, 29 Jul 2021 16:47:31 +0100 Subject: [PATCH] multirpc/subpub: fix potential goroutine deadlocks When the connection to a peer is lost, broadcastHandler errors in its SendMessage call, and the entire goroutine stops. No goroutine will continue receiving on the write channel, and sooner than later, sends to the write channel will start blocking. This starts causing deadlocks further up in IPFSsync. SubPub.Subscribe and SubPub.PeerStreamWrite can now block forever, and further up the chain in IPFSsync, that can mean some goroutines hold onto mutexes forever. On one hand, this chain of events can hang IPFSsync, stopping it from doing anything useful until a restart. On the other hand, it causes goroutine leaks. When more calls to IPFSsync.Handle come through, using new goroutines via the router, those try to grab the deadlocked mutexes and hang forever. First, fix the root cause: peerSub now has a "closed" channel, which gets closed by peersManager when the peer is dropped. Its goroutines, both for reading and writing messages, keep running until that happens. Second, make the symptom of the deadlock less severe: prevent blocking on channel sends forever. Any send on the "write" channel now stops on "closed". And the send on BroadcastWriter, which could also block forever, now has a fallback timeout of five minutes. Updates #243. Perhaps not a total fix, as there might be other leaks. --- multirpc/subpub/discovery.go | 2 ++ multirpc/subpub/peers.go | 20 +++++++++++-- multirpc/subpub/stream.go | 29 ++++++++++++++----- multirpc/transports/subpubtransport/subpub.go | 13 ++++++++- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/multirpc/subpub/discovery.go b/multirpc/subpub/discovery.go index bd354554e..1ccdb60e0 100644 --- a/multirpc/subpub/discovery.go +++ b/multirpc/subpub/discovery.go @@ -60,6 +60,7 @@ func (ps *SubPub) Subscribe(ctx context.Context) { case <-ps.close: return case msg := <-ps.BroadcastWriter: + ps.PeersMu.Lock() for _, peer := range ps.Peers { if peer.write == nil { @@ -67,6 +68,7 @@ func (ps *SubPub) Subscribe(ctx context.Context) { } select { case peer.write <- msg: + case <-peer.peerClosed: default: log.Infof("dropping broadcast message for peer %s", peer.id) } diff --git a/multirpc/subpub/peers.go b/multirpc/subpub/peers.go index 2c612935d..e8f8566d1 100644 --- a/multirpc/subpub/peers.go +++ b/multirpc/subpub/peers.go @@ -14,8 +14,14 @@ import ( ) type peerSub struct { - id libpeer.ID + id libpeer.ID + write chan []byte + + // peerClosed signals that we've lost the connection with the peer, or + // it has been removed by peersManager. + // When closed, its goroutines stop. + peerClosed chan bool } // PeerStreamWrite looks for an existing connection with peerID and calls the callback function with the writer channel as parameter @@ -32,8 +38,14 @@ func (ps *SubPub) PeerStreamWrite(peerID string, msg []byte) error { if peerIdx < 0 { return fmt.Errorf("no connection with peer %s, cannot open stream", peerID) } - ps.Peers[peerIdx].write <- msg - return nil + + peer := ps.Peers[peerIdx] + select { + case peer.write <- msg: + return nil + case <-peer.peerClosed: + return nil + } } // FindTopic opens one or multiple new streams with the peers announcing the namespace. @@ -103,6 +115,8 @@ func (ps *SubPub) peersManager() { if len(ps.Host.Network().ConnsToPeer(peer.id)) > 0 { continue } + close(peer.peerClosed) + // Remove peer if no active connection ps.Peers[i] = ps.Peers[len(ps.Peers)-1] ps.Peers = ps.Peers[:len(ps.Peers)-1] diff --git a/multirpc/subpub/stream.go b/multirpc/subpub/stream.go index dbd2b526a..a4ec4b8bc 100644 --- a/multirpc/subpub/stream.go +++ b/multirpc/subpub/stream.go @@ -10,9 +10,11 @@ import ( ) func (ps *SubPub) handleStream(stream network.Stream) { + peerClosed := make(chan bool) + // First, ensure that any messages read from the stream are sent to the // SubPub.Reader channel. - go ps.readHandler(stream) + go ps.readHandler(peerClosed, stream) // Second, ensure that, from now on, any broadcast message is sent to // this stream as well. @@ -27,38 +29,50 @@ func (ps *SubPub) handleStream(stream network.Stream) { pid := stream.Conn().RemotePeer() ps.PeersMu.Lock() defer ps.PeersMu.Unlock() - ps.Peers = append(ps.Peers, peerSub{pid, write}) // TO-DO this should be a map + ps.Peers = append(ps.Peers, peerSub{ // TO-DO this should be a map + id: pid, + peerClosed: peerClosed, + write: write, + }) if fn := ps.onPeerAdd; fn != nil { fn(pid) } log.Infof("connected to peer %s: %+v", pid, stream.Conn().RemoteMultiaddr()) - go ps.broadcastHandler(write, bufio.NewWriter(stream)) + go ps.broadcastHandler(peerClosed, write, bufio.NewWriter(stream)) } -func (ps *SubPub) broadcastHandler(write <-chan []byte, w *bufio.Writer) { +func (ps *SubPub) broadcastHandler(peerClosed <-chan bool, write <-chan []byte, w *bufio.Writer) { for { select { case <-ps.close: return + case <-peerClosed: + return case msg := <-write: if err := ps.SendMessage(w, msg); err != nil { log.Debugf("error writing to buffer: (%s)", err) - return + continue } if err := w.Flush(); err != nil { log.Debugf("error flushing write buffer: (%s)", err) - return + continue } } } } -func (ps *SubPub) readHandler(stream network.Stream) { +func (ps *SubPub) readHandler(peerClosed <-chan bool, stream network.Stream) { r := bufio.NewReader(stream) + + // Ensure that we always close the stream. + defer stream.Close() + for { select { case <-ps.close: return + case <-peerClosed: + return default: // continues below } @@ -67,7 +81,6 @@ func (ps *SubPub) readHandler(stream network.Stream) { bare.MaxUnmarshalBytes(bareMaxUnmarshalBytes) if err := bare.UnmarshalReader(io.Reader(r), message); err != nil { log.Debugf("error reading stream buffer %s: %v", stream.Conn().RemotePeer().Pretty(), err) - stream.Close() return } else if len(message.Data) == 0 { log.Debugf("no data could be read from stream: %s (%+v)", stream.Conn().RemotePeer().Pretty(), stream.Stat()) diff --git a/multirpc/transports/subpubtransport/subpub.go b/multirpc/transports/subpubtransport/subpub.go index 1353e11f8..f86b621ac 100644 --- a/multirpc/transports/subpubtransport/subpub.go +++ b/multirpc/transports/subpubtransport/subpub.go @@ -86,7 +86,18 @@ func (s *SubPubHandle) ConnectionType() string { func (s *SubPubHandle) Send(msg transports.Message) error { log.Debugf("sending %d bytes to broadcast channel", len(msg.Data)) - s.SubPub.BroadcastWriter <- msg.Data + + // Use a fallback timeout of five minutes, to prevent blocking forever + // or leaking goroutines. + // TODO(mvdan): turn this fallback timeout into a ctx parameter + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) + defer cancel() + + select { + case s.SubPub.BroadcastWriter <- msg.Data: + case <-ctx.Done(): + return ctx.Err() + } return nil }