From 1ad628f19b9115912446a22963d2e78c04885f6d Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 21 Nov 2024 12:40:12 +0530 Subject: [PATCH] send error codes on transport conn and stream failures --- p2p/net/mock/mock_stream.go | 14 +++++++++++++- p2p/net/swarm/swarm.go | 8 ++++++-- p2p/net/swarm/swarm_conn.go | 6 +++++- p2p/net/upgrader/listener.go | 6 +++++- p2p/transport/quic/listener.go | 5 ++--- p2p/transport/quic/transport.go | 4 +--- p2p/transport/quic/virtuallistener.go | 3 ++- p2p/transport/quicreuse/listener.go | 3 ++- p2p/transport/webtransport/stream.go | 4 ---- 9 files changed, 36 insertions(+), 17 deletions(-) diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 3ba29ddd80..d4381af096 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -145,7 +145,19 @@ func (s *stream) Reset() error { } func (s *stream) ResetWithError(errCode network.StreamErrorCode) error { - panic("not implemented") + // Cancel any pending reads/writes with an error. + // TODO: Should these be the other way round(remote=true)? + s.write.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode}) + s.read.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode}) + + select { + case s.reset <- struct{}{}: + default: + } + <-s.closed + + // No meaningful error case here. + return nil } func (s *stream) teardown() { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 12e83d38ef..a0ccb5091f 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -385,8 +385,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // If we do this in the Upgrader, we will not be able to do this. if s.gater != nil { if allow, _ := s.gater.InterceptUpgraded(c); !allow { - // TODO Send disconnect with reason here - err := tc.Close() + var err error + if tcc, ok := tc.(network.CloseWithErrorer); ok { + err = tcc.CloseWithError(network.ConnGated) + } else { + err = tc.Close() + } if err != nil { log.Warnf("failed to close connection with peer %s and addr %s; err: %s", p, addr, err) } diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index b7cc46fb71..80911d0ab5 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -138,7 +138,11 @@ func (c *Conn) start() { } scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound) if err != nil { - ts.Reset() + if tse, ok := ts.(network.ResetWithErrorer); ok { + tse.ResetWithError(network.StreamResourceLimitExceeded) + } else { + ts.Reset() + } continue } c.swarm.refs.Add(1) diff --git a/p2p/net/upgrader/listener.go b/p2p/net/upgrader/listener.go index c2e81d2e93..f2230e5619 100644 --- a/p2p/net/upgrader/listener.go +++ b/p2p/net/upgrader/listener.go @@ -162,7 +162,11 @@ func (l *listener) handleIncoming() { // if we stop accepting connections for some reason, // we'll eventually close all the open ones // instead of hanging onto them. - conn.Close() + if cc, ok := conn.(network.CloseWithErrorer); ok { + cc.CloseWithError(network.ConnRateLimited) + } else { + conn.Close() + } } }() } diff --git a/p2p/transport/quic/listener.go b/p2p/transport/quic/listener.go index f90bdf53f0..30868e49eb 100644 --- a/p2p/transport/quic/listener.go +++ b/p2p/transport/quic/listener.go @@ -11,7 +11,6 @@ import ( tpt "github.com/libp2p/go-libp2p/core/transport" p2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" - ma "github.com/multiformats/go-multiaddr" "github.com/quic-go/quic-go" ) @@ -54,12 +53,12 @@ func (l *listener) Accept() (tpt.CapableConn, error) { c, err := l.wrapConn(qconn) if err != nil { log.Debugf("failed to setup connection: %s", err) - qconn.CloseWithError(1, "") + qconn.CloseWithError(quic.ApplicationErrorCode(network.ConnResourceLimitExceeded), "") continue } l.transport.addConn(qconn, c) if l.transport.gater != nil && !(l.transport.gater.InterceptAccept(c) && l.transport.gater.InterceptSecured(network.DirInbound, c.remotePeerID, c)) { - c.closeWithError(errorCodeConnectionGating, "connection gated") + c.closeWithError(quic.ApplicationErrorCode(network.ConnGated), "connection gated") continue } diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index 4d3d9e551d..62d31a8d2a 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -34,8 +34,6 @@ var ErrHolePunching = errors.New("hole punching attempted; no active dial") var HolePunchTimeout = 5 * time.Second -const errorCodeConnectionGating = 0x47415445 // GATE in ASCII - // The Transport implements the tpt.Transport interface for QUIC connections. type transport struct { privKey ic.PrivKey @@ -169,7 +167,7 @@ func (t *transport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p pee remoteMultiaddr: raddr, } if t.gater != nil && !t.gater.InterceptSecured(network.DirOutbound, p, c) { - pconn.CloseWithError(errorCodeConnectionGating, "connection gated") + pconn.CloseWithError(quic.ApplicationErrorCode(network.ConnGated), "connection gated") return nil, fmt.Errorf("secured connection gated") } t.addConn(pconn, c) diff --git a/p2p/transport/quic/virtuallistener.go b/p2p/transport/quic/virtuallistener.go index 7927225567..ceee530b7d 100644 --- a/p2p/transport/quic/virtuallistener.go +++ b/p2p/transport/quic/virtuallistener.go @@ -3,6 +3,7 @@ package libp2pquic import ( "sync" + "github.com/libp2p/go-libp2p/core/network" tpt "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" @@ -142,8 +143,8 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version select { case ch <- acceptVal{conn: conn}: default: + conn.(network.CloseWithErrorer).CloseWithError(network.ConnRateLimited) // accept queue filled up, drop the connection - conn.Close() log.Warn("Accept queue filled. Dropping connection.") } diff --git a/p2p/transport/quicreuse/listener.go b/p2p/transport/quicreuse/listener.go index 4ee20042d3..31daf59df4 100644 --- a/p2p/transport/quicreuse/listener.go +++ b/p2p/transport/quicreuse/listener.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" "github.com/quic-go/quic-go" @@ -212,7 +213,7 @@ func (l *listener) Close() error { close(l.queue) // drain the queue for conn := range l.queue { - conn.CloseWithError(1, "closing") + conn.CloseWithError(quic.ApplicationErrorCode(network.ConnShutdown), "closing") } }) return nil diff --git a/p2p/transport/webtransport/stream.go b/p2p/transport/webtransport/stream.go index 583708edc2..0849fc9f38 100644 --- a/p2p/transport/webtransport/stream.go +++ b/p2p/transport/webtransport/stream.go @@ -56,10 +56,6 @@ func (s *stream) Reset() error { return nil } -func (s *stream) ResetWithError(errCode network.StreamErrorCode) error { - panic("not implemented") -} - func (s *stream) Close() error { s.Stream.CancelRead(reset) return s.Stream.Close()