Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement error codes spec #2927

Merged
merged 13 commits into from
Feb 9, 2025
50 changes: 50 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,60 @@ package network

import (
"context"
"fmt"
"io"

ic "github.com/libp2p/go-libp2p/core/crypto"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

ma "github.com/multiformats/go-multiaddr"
)

type ConnErrorCode uint32

type ConnError struct {
Remote bool
ErrorCode ConnErrorCode
TransportError error
}

func (c *ConnError) Error() string {
side := "local"
if c.Remote {
side = "remote"
}
if c.TransportError != nil {
return fmt.Sprintf("connection closed (%s): code: 0x%x: transport error: %s", side, c.ErrorCode, c.TransportError)
}
return fmt.Sprintf("connection closed (%s): code: 0x%x", side, c.ErrorCode)
}

func (c *ConnError) Is(target error) bool {
if tce, ok := target.(*ConnError); ok {
return tce.ErrorCode == c.ErrorCode && tce.Remote == c.Remote
}
return false
}

func (c *ConnError) Unwrap() []error {
return []error{ErrReset, c.TransportError}
}

const (
ConnNoError ConnErrorCode = 0
ConnProtocolNegotiationFailed ConnErrorCode = 0x1000
ConnResourceLimitExceeded ConnErrorCode = 0x1001
ConnRateLimited ConnErrorCode = 0x1002
ConnProtocolViolation ConnErrorCode = 0x1003
ConnSupplanted ConnErrorCode = 0x1004
ConnGarbageCollected ConnErrorCode = 0x1005
ConnShutdown ConnErrorCode = 0x1006
ConnGated ConnErrorCode = 0x1007
ConnCodeOutOfRange ConnErrorCode = 0x1008
)

// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
Expand All @@ -24,6 +69,11 @@ type Conn interface {
ConnStat
ConnScoper

// CloseWithError closes the connection with errCode. The errCode is sent to the
// peer on a best effort basis. For transports that do not support sending error
// codes on connection close, the behavior is identical to calling Close.
CloseWithError(errCode ConnErrorCode) error

// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
ID() string
Expand Down
53 changes: 53 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
Expand All @@ -11,6 +12,49 @@ import (
// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")

type StreamErrorCode uint32

type StreamError struct {
ErrorCode StreamErrorCode
Remote bool
TransportError error
}

func (s *StreamError) Error() string {
side := "local"
if s.Remote {
side = "remote"
}
if s.TransportError != nil {
return fmt.Sprintf("stream reset (%s): code: 0x%x: transport error: %s", side, s.ErrorCode, s.TransportError)
}
return fmt.Sprintf("stream reset (%s): code: 0x%x", side, s.ErrorCode)
}

func (s *StreamError) Is(target error) bool {
if tse, ok := target.(*StreamError); ok {
return tse.ErrorCode == s.ErrorCode && tse.Remote == s.Remote
}
return false
}

func (s *StreamError) Unwrap() []error {
return []error{ErrReset, s.TransportError}
}

const (
StreamNoError StreamErrorCode = 0
StreamProtocolNegotiationFailed StreamErrorCode = 0x1001
StreamResourceLimitExceeded StreamErrorCode = 0x1002
StreamRateLimited StreamErrorCode = 0x1003
StreamProtocolViolation StreamErrorCode = 0x1004
StreamSupplanted StreamErrorCode = 0x1005
StreamGarbageCollected StreamErrorCode = 0x1006
StreamShutdown StreamErrorCode = 0x1007
StreamGated StreamErrorCode = 0x1008
StreamCodeOutOfRange StreamErrorCode = 0x1009
)

// MuxedStream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
Expand Down Expand Up @@ -56,6 +100,11 @@ type MuxedStream interface {
// side to hang up and go away.
Reset() error

// ResetWithError aborts both ends of the stream with `errCode`. `errCode` is sent
// to the peer on a best effort basis. For transports that do not support sending
// error codes to remote peer, the behavior is identical to calling Reset
ResetWithError(errCode StreamErrorCode) error

SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
Expand All @@ -75,6 +124,10 @@ type MuxedConn interface {
// Close closes the stream muxer and the the underlying net.Conn.
io.Closer

// CloseWithError closes the connection with errCode. The errCode is sent
// to the peer.
CloseWithError(errCode ConnErrorCode) error

// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
Expand Down
4 changes: 4 additions & 0 deletions core/network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ type Stream interface {

// Scope returns the user's view of this stream's resource scope
Scope() StreamScope

// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer.
ResetWithError(errCode StreamErrorCode) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about CloseWithError as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need that? Close implies non error reliable close, no? For instance, we also flush all writes, and retransmit if necessary, when calling Close.

}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/libp2p/go-nat v0.2.0
github.com/libp2p/go-netroute v0.2.2
github.com/libp2p/go-reuseport v0.4.0
github.com/libp2p/go-yamux/v4 v4.0.2
github.com/libp2p/go-yamux/v5 v5.0.0
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ github.com/libp2p/go-netroute v0.2.2 h1:Dejd8cQ47Qx2kRABg6lPwknU7+nBnFRpko45/fFP
github.com/libp2p/go-netroute v0.2.2/go.mod h1:Rntq6jUAH0l9Gg17w5bFGhcC9a+vk4KNXs6s7IljKYE=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v4 v4.0.2 h1:nrLh89LN/LEiqcFiqdKDRHjGstN300C1269K/EX0CPU=
github.com/libp2p/go-yamux/v4 v4.0.2/go.mod h1:C808cCRgOs1iBwY4S71T5oxgMxgLmqUw56qh4AeBW2o=
github.com/libp2p/go-yamux/v5 v5.0.0 h1:2djUh96d3Jiac/JpGkKs4TO49YhsfLopAoryfPmf+Po=
github.com/libp2p/go-yamux/v5 v5.0.0/go.mod h1:en+3cdX51U0ZslwRdRLrvQsdayFt3TSUKvBGErzpWbU=
github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q=
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
Expand Down
9 changes: 5 additions & 4 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
} else {
log.Debugf("protocol mux failed: %s (took %s, id:%s, remote peer:%s, remote addr:%v)", err, took, s.ID(), s.Conn().RemotePeer(), s.Conn().RemoteMultiaddr())
}
s.Reset()
s.ResetWithError(network.StreamProtocolNegotiationFailed)
return
}

Expand All @@ -478,7 +478,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {

if err := s.SetProtocol(protoID); err != nil {
log.Debugf("error setting stream protocol: %s", err)
s.Reset()
s.ResetWithError(network.StreamResourceLimitExceeded)
return
}

Expand Down Expand Up @@ -717,7 +717,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
}
defer func() {
if strErr != nil && s != nil {
s.Reset()
s.ResetWithError(network.StreamProtocolNegotiationFailed)
}
}()

Expand Down Expand Up @@ -761,13 +761,14 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
return nil, fmt.Errorf("failed to negotiate protocol: %w", err)
}
case <-ctx.Done():
s.Reset()
s.ResetWithError(network.StreamProtocolNegotiationFailed)
// wait for `SelectOneOf` to error out because of resetting the stream.
<-errCh
return nil, fmt.Errorf("failed to negotiate protocol: %w", ctx.Err())
}

if err := s.SetProtocol(selected); err != nil {
s.ResetWithError(network.StreamResourceLimitExceeded)
return nil, err
}
_ = h.Peerstore().AddProtocols(p, selected) // adding the protocol to the peerstore isn't critical
Expand Down
3 changes: 2 additions & 1 deletion p2p/muxer/testsuite/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -462,7 +463,7 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
time.Sleep(time.Millisecond * 50)

_, err = s.Write([]byte("foo"))
if err != network.ErrReset {
if !errors.Is(err, network.ErrReset) {
t.Error("should have been stream reset")
}
s.Close()
Expand Down
10 changes: 7 additions & 3 deletions p2p/muxer/yamux/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core/network"

"github.com/libp2p/go-yamux/v4"
"github.com/libp2p/go-yamux/v5"
)

// conn implements mux.MuxedConn over yamux.Session.
Expand All @@ -23,6 +23,10 @@ func (c *conn) Close() error {
return c.yamux().Close()
}

func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
return c.yamux().CloseWithError(uint32(errCode))
}

// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
Expand All @@ -32,7 +36,7 @@ func (c *conn) IsClosed() bool {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.yamux().OpenStream(ctx)
if err != nil {
return nil, err
return nil, parseError(err)
}

return (*stream)(s), nil
Expand All @@ -41,7 +45,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
return (*stream)(s), parseError(err)
}

func (c *conn) yamux() *yamux.Session {
Expand Down
38 changes: 27 additions & 11 deletions p2p/muxer/yamux/stream.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
package yamux

import (
"errors"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/network"

"github.com/libp2p/go-yamux/v4"
"github.com/libp2p/go-yamux/v5"
)

// stream implements mux.MuxedStream over yamux.Stream.
type stream yamux.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
func parseError(err error) error {
if err == nil {
return err
}
se := &yamux.StreamError{}
if errors.As(err, &se) {
return &network.StreamError{Remote: se.Remote, ErrorCode: network.StreamErrorCode(se.ErrorCode), TransportError: err}
}
ce := &yamux.GoAwayError{}
if errors.As(err, &ce) {
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode), TransportError: err}
}
if errors.Is(err, yamux.ErrStreamReset) {
return fmt.Errorf("%w: %w", network.ErrReset, err)
}
return err
}

return n, err
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
return n, parseError(err)
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}

return n, err
return n, parseError(err)
}

func (s *stream) Close() error {
Expand All @@ -39,6 +51,10 @@ func (s *stream) Reset() error {
return s.yamux().Reset()
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
return s.yamux().ResetWithError(uint32(errCode))
}

func (s *stream) CloseRead() error {
return s.yamux().CloseRead()
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/muxer/yamux/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/libp2p/go-libp2p/core/network"

"github.com/libp2p/go-yamux/v4"
"github.com/libp2p/go-yamux/v5"
)

var DefaultTransport *Transport
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (cm *BasicConnMgr) memoryEmergency() {
// Trim connections without paying attention to the silence period.
for _, c := range cm.getConnsToCloseEmergency(target) {
log.Infow("low on memory. closing conn", "peer", c.RemotePeer())
c.Close()

c.CloseWithError(network.ConnGarbageCollected)
}

// finally, update the last trim time.
Expand Down Expand Up @@ -388,7 +389,7 @@ func (cm *BasicConnMgr) trim() {
// do the actual trim.
for _, c := range cm.getConnsToClose() {
log.Debugw("closing conn", "peer", c.RemotePeer())
c.Close()
c.CloseWithError(network.ConnGarbageCollected)
}
}

Expand Down
Loading