diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 090cd6f760..ed6fa079f3 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -202,7 +202,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) { return } - bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message) + //bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *bcproto.BlockRequest: diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index cc342a3224..ae14aaf8d0 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -250,7 +250,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) { return } - bcR.Logger.Debug("Receive", "src", e.Src, "chID", e.ChannelID, "msg", e.Message) + //bcR.Logger.Debug("Receive", "src", e.Src, "chID", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *bcproto.BlockRequest: diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 7c3330a528..349877e414 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -462,7 +462,7 @@ func (r *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) { return } - r.logger.Debug("Receive", "src", e.Src.ID(), "chID", e.ChannelID, "msg", e.Message) + //r.logger.Debug("Receive", "src", e.Src.ID(), "chID", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *bcproto.StatusRequest: diff --git a/consensus/reactor.go b/consensus/reactor.go index 2e09c3ced4..37feec35ed 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -257,7 +257,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { return } - conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) + //conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) // Get peer states ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState) @@ -1015,7 +1015,7 @@ OUTER_LOOP: if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -1044,7 +1044,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -1074,7 +1074,7 @@ OUTER_LOOP: if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -1106,7 +1106,7 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, diff --git a/go.mod b/go.mod index bdc1316c36..ff3dfba90a 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,8 @@ require ( github.com/minio/highwayhash v1.0.2 github.com/ory/dockertest v3.3.5+incompatible github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/client_golang v1.19.1 + github.com/quic-go/quic-go v0.46.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/rs/cors v1.8.3 github.com/sasha-s/go-deadlock v0.3.1 @@ -137,6 +138,7 @@ require ( github.com/go-git/go-billy/v5 v5.5.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/go-toolsmith/astcast v1.1.0 // indirect github.com/go-toolsmith/astcopy v1.1.0 // indirect github.com/go-toolsmith/astequal v1.2.0 // indirect @@ -208,7 +210,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mgechev/revive v1.3.9 // indirect github.com/mimoo/StrobeGo v0.0.0-20210601165009-122bf33a46e0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -221,6 +222,7 @@ require ( github.com/nishanths/predeclared v0.2.2 // indirect github.com/nunnatsa/ginkgolinter v0.16.2 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/onsi/ginkgo/v2 v2.20.2 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect github.com/opencontainers/runc v1.1.3 // indirect @@ -231,14 +233,15 @@ require ( github.com/pkg/profile v1.7.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polyfloyd/go-errorlint v1.6.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/quasilyte/go-ruleguard v0.4.3-0.20240823090925-0fe6f58b47b1 // indirect github.com/quasilyte/go-ruleguard/dsl v0.3.22 // indirect github.com/quasilyte/gogrep v0.5.0 // indirect github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect + github.com/quic-go/qpack v0.4.0 // indirect github.com/rs/zerolog v1.29.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/ryancurrah/gomodguard v1.3.5 // indirect @@ -288,6 +291,7 @@ require ( go.opentelemetry.io/otel/trace v1.30.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/automaxprocs v1.5.3 // indirect + go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect diff --git a/go.sum b/go.sum index 63a5828fe5..d5f80cb990 100644 --- a/go.sum +++ b/go.sum @@ -332,7 +332,6 @@ github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -530,8 +529,6 @@ github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/Qd github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mgechev/revive v1.3.9 h1:18Y3R4a2USSBF+QZKFQwVkBROUda7uoBlkEuBD+YD1A= github.com/mgechev/revive v1.3.9/go.mod h1:+uxEIr5UH0TjXWHTno3xh4u7eg6jDpXKzQccA9UGhHU= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= @@ -620,12 +617,12 @@ github.com/polyfloyd/go-errorlint v1.6.0 h1:tftWV9DE7txiFzPpztTAwyoRLKNj9gpVm2cg github.com/polyfloyd/go-errorlint v1.6.0/go.mod h1:HR7u8wuP1kb1NeN1zqTd1ZMlqUKPPHF+Id4vIPvDqVw= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= -github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/quasilyte/go-ruleguard v0.4.3-0.20240823090925-0fe6f58b47b1 h1:+Wl/0aFp0hpuHM3H//KMft64WQ1yX9LdJY64Qm/gFCo= @@ -638,6 +635,10 @@ github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 h1:TCg2WBOl github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727/go.mod h1:rlzQ04UMyJXu/aOvhd8qT+hvDrFpiwqp8MRXDY9szc0= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 h1:M8mH9eK4OUR4lu7Gd+PU1fV2/qnDNfzT635KRSObncs= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567/go.mod h1:DWNGW8A4Y+GyBgPuaQJuWiy0XYftx4Xm/y5Jqk9I6VQ= +github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= +github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= +github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y= +github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -835,6 +836,8 @@ go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -896,7 +899,6 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 85b5298518..45b1f099c8 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -158,7 +158,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { - memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + //memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: protoTxs := msg.GetTxs() @@ -176,7 +176,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { ntx := types.Tx(tx) err = memR.mempool.CheckTx(ntx, nil, txInfo) if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + memR.Logger.Debug("Tx already exists in cache", "tx", ntx.Hash()) } else if err != nil { memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 7e0b22bcf5..dd65ddede2 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -177,7 +177,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { - memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + //memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: protoTxs := msg.GetTxs() @@ -202,7 +202,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { ) err = memR.mempool.CheckTx(ntx, nil, txInfo) if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + memR.Logger.Debug("Tx already exists in cache", "tx", ntx.Hash()) } else if err != nil { memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) } diff --git a/node/node.go b/node/node.go index e2c749fb1f..c8bfceb79b 100644 --- a/node/node.go +++ b/node/node.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/quic-go/quic-go" "net" "net/http" "strings" @@ -555,8 +556,7 @@ func createTransport( []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig, tracer) + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, tracer) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) @@ -571,7 +571,7 @@ func createTransport( connFilters = append( connFilters, // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + func(_ p2p.ConnSet, c quic.Connection, _ []net.IP) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), }) diff --git a/p2p/certificate.go b/p2p/certificate.go new file mode 100644 index 0000000000..9fe4ff94ea --- /dev/null +++ b/p2p/certificate.go @@ -0,0 +1,209 @@ +package p2p + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/asn1" + "errors" + "fmt" + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/encoding" + crypto2 "github.com/tendermint/tendermint/proto/tendermint/crypto" + "math/big" + "time" +) + +// TODO(rach-id): mention this code is adapted from libp2p p2p/security/tls/crypto.go + +const certValidityPeriod = 24 * 39 * time.Hour +const certificatePrefix = "tendermint-tls:" + +// TODO(rach-id): update the OID prefix to reflect Celestia/Tendermint +var extensionPrefix = []int{1, 3, 6, 1, 4, 1, 53594} + +// getPrefixedExtensionID returns an Object Identifier +// that can be used in x509 Certificates. +func getPrefixedExtensionID(suffix []int) []int { + return append(extensionPrefix, suffix...) +} + +var extensionID = getPrefixedExtensionID([]int{1, 1}) +var extensionCritical bool // so we can mark the extension critical in tests + +// extensionIDEqual compares two extension IDs. +func extensionIDEqual(a, b []int) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +type signedKey struct { + PubKey []byte + Signature []byte +} + +// NewTLSConfig creates a new TLS configuration +func NewTLSConfig(privKey crypto.PrivKey) (*tls.Config, error) { + template, err := certTemplate() + if err != nil { + return nil, err + } + cert, err := keyToCertificate(privKey, template) + if err != nil { + return nil, err + } + return &tls.Config{ + MinVersion: tls.VersionTLS13, + InsecureSkipVerify: true, // This is not insecure here. We will verify the cert chain ourselves. + ClientAuth: tls.RequireAnyClientCert, + Certificates: []tls.Certificate{*cert}, + VerifyPeerCertificate: func(_ [][]byte, _ [][]*x509.Certificate) error { + panic("tls config not specialized for peer") + }, + }, nil +} + +// VerifyCertificate verifies the certificate chain and extract the remote's public key. +func VerifyCertificate(cert *x509.Certificate) (crypto.PubKey, error) { + pool := x509.NewCertPool() + pool.AddCert(cert) + var found bool + var keyExt pkix.Extension + // find the tendermint key extension, skipping all unknown extensions + for _, ext := range cert.Extensions { + if extensionIDEqual(ext.Id, extensionID) { + keyExt = ext + found = true + for i, oident := range cert.UnhandledCriticalExtensions { + if oident.Equal(ext.Id) { + // delete the extension from UnhandledCriticalExtensions + cert.UnhandledCriticalExtensions = append(cert.UnhandledCriticalExtensions[:i], cert.UnhandledCriticalExtensions[i+1:]...) + break + } + } + break + } + } + if !found { + return nil, errors.New("expected certificate to contain the key extension") + } + if _, err := cert.Verify(x509.VerifyOptions{Roots: pool}); err != nil { + // If we return an x509 error here, it will be sent on the wire. + // Wrap the error to avoid that. + return nil, fmt.Errorf("certificate verification failed: %s", err) + } + + var sk signedKey + if _, err := asn1.Unmarshal(keyExt.Value, &sk); err != nil { + return nil, fmt.Errorf("unmarshalling signed certificate failed: %s", err) + } + protoPubKey := crypto2.PublicKey{} + err := proto.Unmarshal(sk.PubKey, &protoPubKey) + if err != nil { + return nil, fmt.Errorf("unmarshalling public key failed: %s", err) + } + certKeyPub, err := x509.MarshalPKIXPublicKey(cert.PublicKey) + if err != nil { + return nil, err + } + pubKey, err := encoding.PubKeyFromProto(protoPubKey) + valid := pubKey.VerifySignature(append([]byte(certificatePrefix), certKeyPub...), sk.Signature) + if err != nil { + return nil, fmt.Errorf("signature verification failed: %s", err) + } + if !valid { + return nil, errors.New("signature invalid") + } + return pubKey, nil +} + +// GenerateSignedExtension uses the provided private key to sign the public key, and returns the +// signature within a pkix.Extension. +// This extension is included in a certificate to cryptographically tie it to the libp2p private key. +func GenerateSignedExtension(nodePrivateKey crypto.PrivKey, certificatePublicKey *ecdsa.PublicKey) (pkix.Extension, error) { + protoPubKey, err := encoding.PubKeyToProto(nodePrivateKey.PubKey()) + if err != nil { + return pkix.Extension{}, err + } + keyBytes, err := proto.Marshal(&protoPubKey) + if err != nil { + return pkix.Extension{}, err + } + certKeyPub, err := x509.MarshalPKIXPublicKey(certificatePublicKey) + if err != nil { + return pkix.Extension{}, err + } + signature, err := nodePrivateKey.Sign(append([]byte(certificatePrefix), certKeyPub...)) + if err != nil { + return pkix.Extension{}, err + } + value, err := asn1.Marshal(signedKey{ + PubKey: keyBytes, + Signature: signature, + }) + if err != nil { + return pkix.Extension{}, err + } + + return pkix.Extension{Id: extensionID, Critical: extensionCritical, Value: value}, nil +} + +// keyToCertificate generates a new ECDSA private key and corresponding x509 certificate. +// The certificate includes an extension that cryptographically ties it to the provided libp2p +// private key to authenticate TLS connections. +func keyToCertificate(sk crypto.PrivKey, certTmpl *x509.Certificate) (*tls.Certificate, error) { + certKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, err + } + + // after calling CreateCertificate, these will end up in Certificate.Extensions + extension, err := GenerateSignedExtension(sk, &certKey.PublicKey) + if err != nil { + return nil, err + } + certTmpl.ExtraExtensions = append(certTmpl.ExtraExtensions, extension) + + certDER, err := x509.CreateCertificate(rand.Reader, certTmpl, certTmpl, certKey.Public(), certKey) + if err != nil { + return nil, err + } + return &tls.Certificate{ + Certificate: [][]byte{certDER}, + PrivateKey: certKey, + }, nil +} + +// certTemplate returns the template for generating an Identity's TLS certificates. +func certTemplate() (*x509.Certificate, error) { + bigNum := big.NewInt(1 << 62) + sn, err := rand.Int(rand.Reader, bigNum) + if err != nil { + return nil, err + } + + subjectSN, err := rand.Int(rand.Reader, bigNum) + if err != nil { + return nil, err + } + + return &x509.Certificate{ + SerialNumber: sn, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(certValidityPeriod), + // According to RFC 3280, the issuer field must be set, + // see https://datatracker.ietf.org/doc/html/rfc3280#section-4.1.2.4. + Subject: pkix.Name{SerialNumber: subjectSN.String()}, + }, nil +} diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 87ab2ed28a..6e166983ca 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -1,26 +1,11 @@ package conn import ( - "bufio" - "errors" "fmt" - "io" - "math" - "net" - "reflect" - "runtime/debug" - "sync/atomic" "time" "github.com/gogo/protobuf/proto" - flow "github.com/tendermint/tendermint/libs/flowrate" - "github.com/tendermint/tendermint/libs/log" - cmtmath "github.com/tendermint/tendermint/libs/math" - "github.com/tendermint/tendermint/libs/protoio" - "github.com/tendermint/tendermint/libs/service" - cmtsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/libs/timer" tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" ) @@ -50,672 +35,6 @@ const ( type receiveCbFunc func(chID byte, msgBytes []byte) type errorCbFunc func(interface{}) -/* -Each peer has one `MConnection` (multiplex connection) instance. - -__multiplex__ *noun* a system or signal involving simultaneous transmission of -several messages along a single channel of communication. - -Each `MConnection` handles message transmission on multiple abstract communication -`Channel`s. Each channel has a globally unique byte id. -The byte id and the relative priorities of each `Channel` are configured upon -initialization of the connection. - -There are two methods for sending messages: - - func (m MConnection) Send(chID byte, msgBytes []byte) bool {} - func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {} - -`Send(chID, msgBytes)` is a blocking call that waits until `msg` is -successfully queued for the channel with the given id byte `chID`, or until the -request times out. The message `msg` is serialized using Protobuf. - -`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the -channel's queue is full. - -Inbound message bytes are handled with an onReceive callback function. -*/ -type MConnection struct { - service.BaseService - - conn net.Conn - bufConnReader *bufio.Reader - bufConnWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor - send chan struct{} - pong chan struct{} - channels []*Channel - channelsIdx map[byte]*Channel - onReceive receiveCbFunc - onError errorCbFunc - errored uint32 - config MConnConfig - - // Closing quitSendRoutine will cause the sendRoutine to eventually quit. - // doneSendRoutine is closed when the sendRoutine actually quits. - quitSendRoutine chan struct{} - doneSendRoutine chan struct{} - - // Closing quitRecvRouting will cause the recvRouting to eventually quit. - quitRecvRoutine chan struct{} - - // used to ensure FlushStop and OnStop - // are safe to call concurrently. - stopMtx cmtsync.Mutex - - flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled. - pingTimer *time.Ticker // send pings periodically - - // close conn if pong is not received in pongTimeout - pongTimer *time.Timer - pongTimeoutCh chan bool // true - timeout, false - peer sent pong - - chStatsTimer *time.Ticker // update channel stats periodically - - created time.Time // time of creation - - _maxPacketMsgSize int -} - -// MConnConfig is a MConnection configuration. -type MConnConfig struct { - SendRate int64 `mapstructure:"send_rate"` - RecvRate int64 `mapstructure:"recv_rate"` - - // Maximum payload size - MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"` - - // Interval to flush writes (throttled) - FlushThrottle time.Duration `mapstructure:"flush_throttle"` - - // Interval to send pings - PingInterval time.Duration `mapstructure:"ping_interval"` - - // Maximum wait time for pongs - PongTimeout time.Duration `mapstructure:"pong_timeout"` -} - -// DefaultMConnConfig returns the default config. -func DefaultMConnConfig() MConnConfig { - return MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, - MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize, - FlushThrottle: defaultFlushThrottle, - PingInterval: defaultPingInterval, - PongTimeout: defaultPongTimeout, - } -} - -// NewMConnection wraps net.Conn and creates multiplex connection -func NewMConnection( - conn net.Conn, - chDescs []*ChannelDescriptor, - onReceive receiveCbFunc, - onError errorCbFunc, -) *MConnection { - return NewMConnectionWithConfig( - conn, - chDescs, - onReceive, - onError, - DefaultMConnConfig()) -} - -// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config -func NewMConnectionWithConfig( - conn net.Conn, - chDescs []*ChannelDescriptor, - onReceive receiveCbFunc, - onError errorCbFunc, - config MConnConfig, -) *MConnection { - if config.PongTimeout >= config.PingInterval { - panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") - } - - mconn := &MConnection{ - conn: conn, - bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), - bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flow.New(0, 0), - recvMonitor: flow.New(0, 0), - send: make(chan struct{}, 1), - pong: make(chan struct{}, 1), - onReceive: onReceive, - onError: onError, - config: config, - created: time.Now(), - } - - // Create channels - var channelsIdx = map[byte]*Channel{} - var channels = []*Channel{} - - for _, desc := range chDescs { - channel := newChannel(mconn, *desc) - channelsIdx[channel.desc.ID] = channel - channels = append(channels, channel) - } - mconn.channels = channels - mconn.channelsIdx = channelsIdx - - mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn) - - // maxPacketMsgSize() is a bit heavy, so call just once - mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() - - return mconn -} - -func (c *MConnection) SetLogger(l log.Logger) { - c.BaseService.SetLogger(l) - for _, ch := range c.channels { - ch.SetLogger(l) - } -} - -// OnStart implements BaseService -func (c *MConnection) OnStart() error { - if err := c.BaseService.OnStart(); err != nil { - return err - } - c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle) - c.pingTimer = time.NewTicker(c.config.PingInterval) - c.pongTimeoutCh = make(chan bool, 1) - c.chStatsTimer = time.NewTicker(updateStats) - c.quitSendRoutine = make(chan struct{}) - c.doneSendRoutine = make(chan struct{}) - c.quitRecvRoutine = make(chan struct{}) - go c.sendRoutine() - go c.recvRoutine() - return nil -} - -// stopServices stops the BaseService and timers and closes the quitSendRoutine. -// if the quitSendRoutine was already closed, it returns true, otherwise it returns false. -// It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time. -func (c *MConnection) stopServices() (alreadyStopped bool) { - c.stopMtx.Lock() - defer c.stopMtx.Unlock() - - select { - case <-c.quitSendRoutine: - // already quit - return true - default: - } - - select { - case <-c.quitRecvRoutine: - // already quit - return true - default: - } - - c.BaseService.OnStop() - c.flushTimer.Stop() - c.pingTimer.Stop() - c.chStatsTimer.Stop() - - // inform the recvRouting that we are shutting down - close(c.quitRecvRoutine) - close(c.quitSendRoutine) - return false -} - -// FlushStop replicates the logic of OnStop. -// It additionally ensures that all successful -// .Send() calls will get flushed before closing -// the connection. -func (c *MConnection) FlushStop() { - if c.stopServices() { - return - } - - // this block is unique to FlushStop - { - // wait until the sendRoutine exits - // so we dont race on calling sendSomePacketMsgs - <-c.doneSendRoutine - - // Send and flush all pending msgs. - // Since sendRoutine has exited, we can call this - // safely - eof := c.sendSomePacketMsgs() - for !eof { - eof = c.sendSomePacketMsgs() - } - c.flush() - - // Now we can close the connection - } - - c.conn.Close() - - // We can't close pong safely here because - // recvRoutine may write to it after we've stopped. - // Though it doesn't need to get closed at all, - // we close it @ recvRoutine. - - // c.Stop() -} - -// OnStop implements BaseService -func (c *MConnection) OnStop() { - if c.stopServices() { - return - } - - c.conn.Close() - - // We can't close pong safely here because - // recvRoutine may write to it after we've stopped. - // Though it doesn't need to get closed at all, - // we close it @ recvRoutine. -} - -func (c *MConnection) String() string { - return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr()) -} - -func (c *MConnection) flush() { - c.Logger.Debug("Flush", "conn", c) - err := c.bufConnWriter.Flush() - if err != nil { - c.Logger.Debug("MConnection flush failed", "err", err) - } -} - -// Catch panics, usually caused by remote disconnects. -func (c *MConnection) _recover() { - if r := recover(); r != nil { - c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack())) - c.stopForError(fmt.Errorf("recovered from panic: %v", r)) - } -} - -func (c *MConnection) stopForError(r interface{}) { - if err := c.Stop(); err != nil { - c.Logger.Error("Error stopping connection", "err", err) - } - if atomic.CompareAndSwapUint32(&c.errored, 0, 1) { - if c.onError != nil { - c.onError(r) - } - } -} - -// Queues a message to be sent to channel. -func (c *MConnection) Send(chID byte, msgBytes []byte) bool { - if !c.IsRunning() { - return false - } - - c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) - - // Send message to channel. - channel, ok := c.channelsIdx[chID] - if !ok { - c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID)) - return false - } - - success := channel.sendBytes(msgBytes) - if success { - // Wake up sendRoutine if necessary - select { - case c.send <- struct{}{}: - default: - } - } else { - c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) - } - return success -} - -// Queues a message to be sent to channel. -// Nonblocking, returns true if successful. -func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool { - if !c.IsRunning() { - return false - } - - c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", log.NewLazySprintf("%X", msgBytes)) - - // Send message to channel. - channel, ok := c.channelsIdx[chID] - if !ok { - c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID)) - return false - } - - ok = channel.trySendBytes(msgBytes) - if ok { - // Wake up sendRoutine if necessary - select { - case c.send <- struct{}{}: - default: - } - } - - return ok -} - -// CanSend returns true if you can send more data onto the chID, false -// otherwise. Use only as a heuristic. -func (c *MConnection) CanSend(chID byte) bool { - if !c.IsRunning() { - return false - } - - channel, ok := c.channelsIdx[chID] - if !ok { - c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID)) - return false - } - return channel.canSend() -} - -// sendRoutine polls for packets to send from channels. -func (c *MConnection) sendRoutine() { - defer c._recover() - - protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) - -FOR_LOOP: - for { - var _n int - var err error - SELECTION: - select { - case <-c.flushTimer.Ch: - // NOTE: flushTimer.Set() must be called every time - // something is written to .bufConnWriter. - c.flush() - case <-c.chStatsTimer.C: - for _, channel := range c.channels { - channel.updateStats() - } - case <-c.pingTimer.C: - c.Logger.Debug("Send Ping") - _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) - if err != nil { - c.Logger.Error("Failed to send PacketPing", "err", err) - break SELECTION - } - c.sendMonitor.Update(_n) - c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) - c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { - select { - case c.pongTimeoutCh <- true: - default: - } - }) - c.flush() - case timeout := <-c.pongTimeoutCh: - if timeout { - c.Logger.Debug("Pong timeout") - err = errors.New("pong timeout") - } else { - c.stopPongTimer() - } - case <-c.pong: - c.Logger.Debug("Send Pong") - _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) - if err != nil { - c.Logger.Error("Failed to send PacketPong", "err", err) - break SELECTION - } - c.sendMonitor.Update(_n) - c.flush() - case <-c.quitSendRoutine: - break FOR_LOOP - case <-c.send: - // Send some PacketMsgs - eof := c.sendSomePacketMsgs() - if !eof { - // Keep sendRoutine awake. - select { - case c.send <- struct{}{}: - default: - } - } - } - - if !c.IsRunning() { - break FOR_LOOP - } - if err != nil { - c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err) - c.stopForError(err) - break FOR_LOOP - } - } - - // Cleanup - c.stopPongTimer() - close(c.doneSendRoutine) -} - -// Returns true if messages from channels were exhausted. -// Blocks in accordance to .sendMonitor throttling. -func (c *MConnection) sendSomePacketMsgs() bool { - // Block until .sendMonitor says we can write. - // Once we're ready we send more than we asked for, - // but amortized it should even out. - c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) - - // Now send some PacketMsgs. - for i := 0; i < numBatchPacketMsgs; i++ { - if c.sendPacketMsg() { - return true - } - } - return false -} - -// Returns true if messages from channels were exhausted. -func (c *MConnection) sendPacketMsg() bool { - // Choose a channel to create a PacketMsg from. - // The chosen channel will be the one whose recentlySent/priority is the least. - var leastRatio float32 = math.MaxFloat32 - var leastChannel *Channel - for _, channel := range c.channels { - // If nothing to send, skip this channel - if !channel.isSendPending() { - continue - } - // Get ratio, and keep track of lowest ratio. - ratio := float32(channel.recentlySent) / float32(channel.desc.Priority) - if ratio < leastRatio { - leastRatio = ratio - leastChannel = channel - } - } - - // Nothing to send? - if leastChannel == nil { - return true - } - // c.Logger.Info("Found a msgPacket to send") - - // Make & send a PacketMsg from this channel - _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter) - if err != nil { - c.Logger.Error("Failed to write PacketMsg", "err", err) - c.stopForError(err) - return true - } - c.sendMonitor.Update(_n) - c.flushTimer.Set() - return false -} - -// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer. -// After a whole message has been assembled, it's pushed to onReceive(). -// Blocks depending on how the connection is throttled. -// Otherwise, it never blocks. -func (c *MConnection) recvRoutine() { - defer c._recover() - - protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) - -FOR_LOOP: - for { - // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) - - // Peek into bufConnReader for debugging - /* - if numBytes := c.bufConnReader.Buffered(); numBytes > 0 { - bz, err := c.bufConnReader.Peek(cmtmath.MinInt(numBytes, 100)) - if err == nil { - // return - } else { - c.Logger.Debug("Error peeking connection buffer", "err", err) - // return nil - } - c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz) - } - */ - - // Read packet type - var packet tmp2p.Packet - - _n, err := protoReader.ReadMsg(&packet) - c.recvMonitor.Update(_n) - if err != nil { - // stopServices was invoked and we are shutting down - // receiving is excpected to fail since we will close the connection - select { - case <-c.quitRecvRoutine: - break FOR_LOOP - default: - } - - if c.IsRunning() { - if err == io.EOF { - c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c) - } else { - c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err) - } - c.stopForError(err) - } - break FOR_LOOP - } - - // Read more depending on packet type. - switch pkt := packet.Sum.(type) { - case *tmp2p.Packet_PacketPing: - // TODO: prevent abuse, as they cause flush()'s. - // https://github.com/tendermint/tendermint/issues/1190 - c.Logger.Debug("Receive Ping") - select { - case c.pong <- struct{}{}: - default: - // never block - } - case *tmp2p.Packet_PacketPong: - c.Logger.Debug("Receive Pong") - select { - case c.pongTimeoutCh <- false: - default: - // never block - } - case *tmp2p.Packet_PacketMsg: - channelID := byte(pkt.PacketMsg.ChannelID) - channel, ok := c.channelsIdx[channelID] - if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { - err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) - c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) - c.stopForError(err) - break FOR_LOOP - } - - msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) - if err != nil { - if c.IsRunning() { - c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) - c.stopForError(err) - } - break FOR_LOOP - } - if msgBytes != nil { - c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) - // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine - c.onReceive(channelID, msgBytes) - } - default: - err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) - c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) - c.stopForError(err) - break FOR_LOOP - } - } - - // Cleanup - close(c.pong) - for range c.pong { - // Drain - } -} - -// not goroutine-safe -func (c *MConnection) stopPongTimer() { - if c.pongTimer != nil { - _ = c.pongTimer.Stop() - c.pongTimer = nil - } -} - -// maxPacketMsgSize returns a maximum size of PacketMsg -func (c *MConnection) maxPacketMsgSize() int { - bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{ - ChannelID: 0x01, - EOF: true, - Data: make([]byte, c.config.MaxPacketMsgPayloadSize), - })) - if err != nil { - panic(err) - } - return len(bz) -} - -type ConnectionStatus struct { - Duration time.Duration - SendMonitor flow.Status - RecvMonitor flow.Status - Channels []ChannelStatus -} - -type ChannelStatus struct { - ID byte - SendQueueCapacity int - SendQueueSize int - Priority int - RecentlySent int64 -} - -func (c *MConnection) Status() ConnectionStatus { - var status ConnectionStatus - status.Duration = time.Since(c.created) - status.SendMonitor = c.sendMonitor.Status() - status.RecvMonitor = c.recvMonitor.Status() - status.Channels = make([]ChannelStatus, len(c.channels)) - for i, channel := range c.channels { - status.Channels[i] = ChannelStatus{ - ID: channel.desc.ID, - SendQueueCapacity: cap(channel.sendQueue), - SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)), - Priority: channel.desc.Priority, - RecentlySent: atomic.LoadInt64(&channel.recentlySent), - } - } - return status -} - //----------------------------------------------------------------------------- type ChannelDescriptor struct { @@ -741,147 +60,6 @@ func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { return } -// TODO: lowercase. -// NOTE: not goroutine-safe. -type Channel struct { - conn *MConnection - desc ChannelDescriptor - sendQueue chan []byte - sendQueueSize int32 // atomic. - recving []byte - sending []byte - recentlySent int64 // exponential moving average - - maxPacketMsgPayloadSize int - - Logger log.Logger -} - -func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { - desc = desc.FillDefaults() - if desc.Priority <= 0 { - panic("Channel default priority must be a positive integer") - } - return &Channel{ - conn: conn, - desc: desc, - sendQueue: make(chan []byte, desc.SendQueueCapacity), - recving: make([]byte, 0, desc.RecvBufferCapacity), - maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, - } -} - -func (ch *Channel) SetLogger(l log.Logger) { - ch.Logger = l -} - -// Queues message to send to this channel. -// Goroutine-safe -// Times out (and returns false) after defaultSendTimeout -func (ch *Channel) sendBytes(bytes []byte) bool { - select { - case ch.sendQueue <- bytes: - atomic.AddInt32(&ch.sendQueueSize, 1) - return true - case <-time.After(defaultSendTimeout): - return false - } -} - -// Queues message to send to this channel. -// Nonblocking, returns true if successful. -// Goroutine-safe -func (ch *Channel) trySendBytes(bytes []byte) bool { - select { - case ch.sendQueue <- bytes: - atomic.AddInt32(&ch.sendQueueSize, 1) - return true - default: - return false - } -} - -// Goroutine-safe -func (ch *Channel) loadSendQueueSize() (size int) { - return int(atomic.LoadInt32(&ch.sendQueueSize)) -} - -// Goroutine-safe -// Use only as a heuristic. -func (ch *Channel) canSend() bool { - return ch.loadSendQueueSize() < defaultSendQueueCapacity -} - -// Returns true if any PacketMsgs are pending to be sent. -// Call before calling nextPacketMsg() -// Goroutine-safe -func (ch *Channel) isSendPending() bool { - if len(ch.sending) == 0 { - if len(ch.sendQueue) == 0 { - return false - } - ch.sending = <-ch.sendQueue - } - return true -} - -// Creates a new PacketMsg to send. -// Not goroutine-safe -func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { - packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)} - maxSize := ch.maxPacketMsgPayloadSize - packet.Data = ch.sending[:cmtmath.MinInt(maxSize, len(ch.sending))] - if len(ch.sending) <= maxSize { - packet.EOF = true - ch.sending = nil - atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize - } else { - packet.EOF = false - ch.sending = ch.sending[cmtmath.MinInt(maxSize, len(ch.sending)):] - } - return packet -} - -// Writes next PacketMsg to w and updates c.recentlySent. -// Not goroutine-safe -func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { - packet := ch.nextPacketMsg() - n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) - atomic.AddInt64(&ch.recentlySent, int64(n)) - return -} - -// Handles incoming PacketMsgs. It returns a message bytes if message is -// complete. NOTE message bytes may change on next call to recvPacketMsg. -// Not goroutine-safe -func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { - ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) - var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) - if recvCap < recvReceived { - return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) - } - ch.recving = append(ch.recving, packet.Data...) - if packet.EOF { - msgBytes := ch.recving - - // clear the slice without re-allocating. - // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go - // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, - // at which point the recving slice stops being used and should be garbage collected - ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) - return msgBytes, nil - } - return nil, nil -} - -// Call this periodically to update stats for throttling purposes. -// Not goroutine-safe -func (ch *Channel) updateStats() { - // Exponential decay of stats. - // TODO: optimize. - atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8)) -} - //---------------------------------------- // Packet diff --git a/p2p/conn_set.go b/p2p/conn_set.go index 0a568c37f6..1fa5547540 100644 --- a/p2p/conn_set.go +++ b/p2p/conn_set.go @@ -1,6 +1,7 @@ package p2p import ( + "github.com/quic-go/quic-go" "net" cmtsync "github.com/tendermint/tendermint/libs/sync" @@ -8,15 +9,15 @@ import ( // ConnSet is a lookup table for connections and all their ips. type ConnSet interface { - Has(net.Conn) bool + Has(quic.Connection) bool HasIP(net.IP) bool - Set(net.Conn, []net.IP) - Remove(net.Conn) + Set(quic.Connection, []net.IP) + Remove(quic.Connection) RemoveAddr(net.Addr) } type connSetItem struct { - conn net.Conn + conn quic.Connection ips []net.IP } @@ -33,7 +34,7 @@ func NewConnSet() ConnSet { } } -func (cs *connSet) Has(c net.Conn) bool { +func (cs *connSet) Has(c quic.Connection) bool { cs.RLock() defer cs.RUnlock() @@ -57,7 +58,7 @@ func (cs *connSet) HasIP(ip net.IP) bool { return false } -func (cs *connSet) Remove(c net.Conn) { +func (cs *connSet) Remove(c quic.Connection) { cs.Lock() defer cs.Unlock() @@ -71,7 +72,7 @@ func (cs *connSet) RemoveAddr(addr net.Addr) { delete(cs.conns, addr.String()) } -func (cs *connSet) Set(c net.Conn, ips []net.IP) { +func (cs *connSet) Set(c quic.Connection, ips []net.IP) { cs.Lock() defer cs.Unlock() diff --git a/p2p/errors.go b/p2p/errors.go index 4fc915292f..07377fc675 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "github.com/quic-go/quic-go" "net" ) @@ -16,7 +17,7 @@ func (e ErrFilterTimeout) Error() string { // information as to the reason. type ErrRejected struct { addr NetAddress - conn net.Conn + conn quic.Connection err error id ID isAuthFailure bool diff --git a/p2p/netaddress.go b/p2p/netaddress.go index dec3317589..c5c3bde6c2 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -5,10 +5,14 @@ package p2p import ( + "context" + "crypto/tls" "encoding/hex" "errors" "flag" "fmt" + "github.com/quic-go/quic-go" + "github.com/quic-go/quic-go/logging" "net" "strconv" "strings" @@ -64,6 +68,29 @@ func NewNetAddress(id ID, addr net.Addr) *NetAddress { return na } +func NewUDPNetAddress(id ID, addr net.Addr) *NetAddress { + tcpAddr, ok := addr.(*net.UDPAddr) + if !ok { + if flag.Lookup("test.v") == nil { // normal run + panic(fmt.Sprintf("Only UDPAddrs are supported. Got: %v", addr)) + } else { // in testing + netAddr := NewNetAddressIPPort(net.IP("127.0.0.1"), 0) + netAddr.ID = id + return netAddr + } + } + + if err := validateID(id); err != nil { + panic(fmt.Sprintf("Invalid ID %v: %v (addr: %v)", id, err, addr)) + } + + ip := tcpAddr.IP + port := uint16(tcpAddr.Port) + na := NewNetAddressIPPort(ip, port) + na.ID = id + return na +} + // NewNetAddressString returns a new NetAddress using the provided address in // the form of "ID@IP:Port". // Also resolves the host if host is not an IP. @@ -234,17 +261,42 @@ func (na *NetAddress) DialString() string { } // Dial calls net.Dial on the address. -func (na *NetAddress) Dial() (net.Conn, error) { - conn, err := net.Dial("tcp", na.DialString()) - if err != nil { - return nil, err - } - return conn, nil -} +// TODO: add TLS stuff. +// Note: this one is not used in the code, the DialTimeout is used instead. +//func (na *NetAddress) Dial(ctx context.Context) (quic.Connection, error) { +// tlsConfig := tls.Config{ +// MinVersion: tls.VersionTLS13, +// InsecureSkipVerify: true, +// } +// quickConfig := quic.Config{ +// MaxIdleTimeout: 10 * time.Minute, +// MaxIncomingStreams: 10000, +// MaxIncomingUniStreams: 10000, +// KeepAlivePeriod: 2 * time.Minute, +// EnableDatagrams: true, +// } +// conn, err := quic.DialAddr(ctx, na.DialString(), &tlsConfig, &quickConfig) +// if err != nil { +// return nil, err +// } +// return conn, nil +//} // DialTimeout calls net.DialTimeout on the address. -func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { - conn, err := net.DialTimeout("tcp", na.DialString(), timeout) +func (na *NetAddress) DialTimeout(timeout time.Duration, tlsConf *tls.Config) (quic.Connection, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + quickConfig := quic.Config{ + MaxIdleTimeout: time.Minute, + MaxIncomingStreams: 10000, + MaxIncomingUniStreams: 10000, + KeepAlivePeriod: 10 * time.Second, + EnableDatagrams: true, + Tracer: func(ctx context.Context, perspective logging.Perspective, id quic.ConnectionID) *logging.ConnectionTracer { + return logging.NewMultiplexedConnectionTracer(GetNewTracer()) + }, + } + conn, err := quic.DialAddr(ctx, na.DialString(), tlsConf, &quickConfig) if err != nil { return nil, err } diff --git a/p2p/peer.go b/p2p/peer.go index bb04cd36c7..637b3906f1 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,9 +1,17 @@ package p2p import ( + "context" + "encoding/binary" + "encoding/hex" "fmt" + "github.com/quic-go/quic-go" + "github.com/tendermint/tendermint/libs/protoio" + "github.com/tendermint/tendermint/pkg/trace/schema" + "github.com/tendermint/tendermint/proto/tendermint/p2p" + "math" "net" - "reflect" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -12,9 +20,6 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/pkg/trace" - "github.com/tendermint/tendermint/pkg/trace/schema" - - cmtconn "github.com/tendermint/tendermint/p2p/conn" ) //go:generate ../scripts/mockery_generate.sh Peer @@ -37,7 +42,7 @@ type Peer interface { CloseConn() error // close original connection NodeInfo() NodeInfo // peer's info - Status() cmtconn.ConnectionStatus + Status() ConnectionStatus SocketAddr() *NetAddress // actual address of the socket // Deprecated: entities looking to act as peers should implement SendEnvelope instead. @@ -53,6 +58,8 @@ type Peer interface { SetRemovalFailed() GetRemovalFailed() bool + + GetConnectionContext() context.Context } type EnvelopeSender interface { @@ -79,7 +86,8 @@ func SendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool { lg.Error("marshaling message to send", "error", err) return false } - return p.Send(e.ChannelID, msgBytes) + p.Send(e.ChannelID, msgBytes) + return true } // EnvelopeTrySendShim implements a shim to allow the legacy peer type that @@ -110,17 +118,17 @@ func TrySendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool { type peerConn struct { outbound bool persistent bool - conn net.Conn // source connection + conn quic.Connection // source connection socketAddr *NetAddress - + created time.Time // time of creation // cached RemoteIP() ip net.IP } func newPeerConn( outbound, persistent bool, - conn net.Conn, + conn quic.Connection, socketAddr *NetAddress, ) peerConn { @@ -129,13 +137,14 @@ func newPeerConn( persistent: persistent, conn: conn, socketAddr: socketAddr, + created: time.Now(), } } // ID only exists for SecretConnection. -// NOTE: Will panic if conn is not *SecretConnection. +// TODO(rach-id): fix ID here func (pc peerConn) ID() ID { - return PubKeyToID(pc.conn.(*cmtconn.SecretConnection).RemotePubKey()) + return ID(pc.conn.RemoteAddr().String()) } // Return the IP from the connection RemoteAddr @@ -163,11 +172,13 @@ func (pc peerConn) RemoteIP() net.IP { // // Before using a peer, you will need to perform a handshake on connection. type peer struct { + sync.Mutex service.BaseService // raw peerConn and the multiplex connection peerConn - mconn *cmtconn.MConnection + + onReceive func(chID byte, msgBytes []byte) // peer's node info and the channel it knows about // channels = nodeInfo.Channels @@ -175,6 +186,13 @@ type peer struct { nodeInfo NodeInfo channels []byte + streams map[byte]quic.Stream + blockPartStreams []quic.Stream + mempoolStreams []quic.Stream + blockchainStreams []quic.Stream + snapshotStreams []quic.Stream + chunkStreams []quic.Stream + // User data Data *cmap.CMap @@ -197,50 +215,99 @@ func WithPeerTracer(t trace.Tracer) PeerOption { func newPeer( pc peerConn, - mConfig cmtconn.MConnConfig, nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, msgTypeByChID map[byte]proto.Message, - chDescs []*cmtconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), mlc *metricsLabelCache, + onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { p := &peer{ - peerConn: pc, - nodeInfo: nodeInfo, - channels: nodeInfo.(DefaultNodeInfo).Channels, - Data: cmap.NewCMap(), - metricsTicker: time.NewTicker(metricsTickerDuration), - metrics: NopMetrics(), - mlc: mlc, - traceClient: trace.NoOpTracer(), - } - - p.mconn = createMConnection( - pc.conn, - p, - reactorsByCh, - msgTypeByChID, - chDescs, - onPeerError, - mConfig, - ) + peerConn: pc, + nodeInfo: nodeInfo, + channels: nodeInfo.(DefaultNodeInfo).Channels, + Data: cmap.NewCMap(), + metricsTicker: time.NewTicker(metricsTickerDuration), + metrics: NopMetrics(), + mlc: mlc, + traceClient: trace.NoOpTracer(), + streams: make(map[byte]quic.Stream), + mempoolStreams: make([]quic.Stream, 0), + blockPartStreams: make([]quic.Stream, 0), + blockchainStreams: make([]quic.Stream, 0), + snapshotStreams: make([]quic.Stream, 0), + chunkStreams: make([]quic.Stream, 0), + } + + p.onReceive = func(chID byte, msgBytes []byte) { + reactor := reactorsByCh[chID] + if reactor == nil { + // Note that its ok to panic here as it's caught in the conn._recover, + // which does onPeerError. + panic(fmt.Sprintf("Unknown channel %X", chID)) + } + mt := msgTypeByChID[chID] + msg := proto.Clone(mt) + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + p.Logger.Error("before panic", "msg", msg, "channel", chID, "type", mt, "bytes", hex.EncodeToString(msgBytes), "raw_bytes", msgBytes) + return + } + + if w, ok := msg.(Unwrapper); ok { + msg, err = w.Unwrap() + if err != nil { + panic(fmt.Errorf("unwrapping message: %s", err)) + } + } + + labels := []string{ + "peer_id", string(p.ID()), + "chID", fmt.Sprintf("%#x", chID), + } + + p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) + p.metrics.MessageReceiveBytesTotal.With(append(labels, "message_type", p.mlc.ValueToMetricLabel(msg))...).Add(float64(len(msgBytes))) + schema.WriteReceivedBytes(p.traceClient, string(p.ID()), chID, len(msgBytes)) + if nr, ok := reactor.(EnvelopeReceiver); ok { + nr.ReceiveEnvelope(Envelope{ + ChannelID: chID, + Src: p, + Message: msg, + }) + } else { + reactor.Receive(chID, p, msgBytes) + } + } + + go func() { + err := p.StartReceiving() + if err != nil { + p.Logger.Error("error when receiving data from peer", "err", err.Error()) + onPeerError(p, err) + } + }() + p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { option(p) } + err := p.initializeAboveStreams() + if err != nil { + p.Logger.Error("error initializing mempool and block part channels", "err", err.Error()) + } + return p } // String representation. func (p *peer) String() string { if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v out}", p.conn.RemoteAddr().String(), p.ID()) } - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v in}", p.conn.RemoteAddr().String(), p.ID()) } //--------------------------------------------------- @@ -249,7 +316,6 @@ func (p *peer) String() string { // SetLogger implements BaseService. func (p *peer) SetLogger(l log.Logger) { p.Logger = l - p.mconn.SetLogger(l) } // OnStart implements BaseService. @@ -258,10 +324,6 @@ func (p *peer) OnStart() error { return err } - if err := p.mconn.Start(); err != nil { - return err - } - go p.metricsReporter() return nil } @@ -272,14 +334,20 @@ func (p *peer) OnStart() error { func (p *peer) FlushStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - p.mconn.FlushStop() // stop everything and close the conn + for _, stream := range p.streams { + stream.CancelRead(quic.StreamErrorCode(quic.NoError)) // stop everything and close the conn + stream.CancelWrite(quic.StreamErrorCode(quic.NoError)) // stop everything and close the conn + } + if err := p.conn.CloseWithError(quic.ApplicationErrorCode(quic.NoError), "stopping peer connection onStop"); err != nil { // stop everything and close the conn + p.Logger.Debug("Error while stopping peer", "err", err) + } } // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - if err := p.mconn.Stop(); err != nil { // stop everything and close the conn + if err := p.conn.CloseWithError(quic.ApplicationErrorCode(quic.NoError), "stopping peer connection onStop"); err != nil { // stop everything and close the conn p.Logger.Debug("Error while stopping peer", "err", err) } } @@ -324,12 +392,16 @@ func (p *peer) NodeInfo() NodeInfo { // For inbound peers, it's the address returned by the underlying connection // (not what's reported in the peer's NodeInfo). func (p *peer) SocketAddr() *NetAddress { - return p.peerConn.socketAddr + return p.socketAddr } // Status returns the peer's ConnectionStatus. -func (p *peer) Status() cmtconn.ConnectionStatus { - return p.mconn.Status() +func (p *peer) Status() ConnectionStatus { + return ConnectionStatus{ + Duration: time.Since(p.created), + // TODO(rach-id): register ecdsa.PublicKey protobuf definition + ConnectionState: p.conn.ConnectionState(), + } } // SendEnvelope sends the message in the envelope on the channel specified by the @@ -365,58 +437,23 @@ func (p *peer) SendEnvelope(e Envelope) bool { return res } -// Send msg bytes to the channel identified by chID byte. Returns false if the -// send queue is full after timeout, specified by MConnection. -// SendEnvelope replaces Send which will be deprecated in a future release. -func (p *peer) Send(chID byte, msgBytes []byte) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } - res := p.mconn.Send(chID, msgBytes) - if res { - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", chID), - } - p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) - } - return res +func (p *peer) addStream(stream quic.Stream, chID byte) { + p.Mutex.Lock() + defer p.Mutex.Unlock() + p.streams[chID] = stream } -// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the -// envelope. Returns false immediately if the connection's internal queue is full -// Using TrySendEnvelope allows for tracking the message bytes sent and received by message type -// as a metric which TrySend cannot support. -func (p *peer) TrySendEnvelope(e Envelope) bool { - if !p.IsRunning() { - // see Switch#Broadcast, where we fetch the list of peers and loop over - // them - while we're looping, one peer may be removed and stopped. - return false - } else if !p.hasChannel(e.ChannelID) { - return false - } - msg := e.Message - metricLabelValue := p.mlc.ValueToMetricLabel(msg) - if w, ok := msg.(Wrapper); ok { - msg = w.Wrap() - } - msgBytes, err := proto.Marshal(msg) - if err != nil { - p.Logger.Error("marshaling message to send", "error", err) - return false - } - res := p.TrySend(e.ChannelID, msgBytes) - if res { - labels := []string{ - "message_type", metricLabelValue, - "chID", fmt.Sprintf("%#x", e.ChannelID), - "peer_id", string(p.ID()), - } - p.metrics.MessageSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) - } - return res +func (p *peer) removeStream(chID byte) { + p.Mutex.Lock() + defer p.Mutex.Unlock() + delete(p.streams, chID) +} + +func (p *peer) getStream(chID byte) (quic.Stream, bool) { + p.Mutex.Lock() + defer p.Mutex.Unlock() + stream, has := p.streams[chID] + return stream, has } // TrySend msg bytes to the channel identified by chID byte. Immediately returns @@ -428,7 +465,7 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - res := p.mconn.TrySend(chID, msgBytes) + res := p.Send(chID, msgBytes) if res { labels := []string{ "peer_id", string(p.ID()), @@ -471,7 +508,7 @@ func (p *peer) hasChannel(chID byte) bool { // CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all. func (p *peer) CloseConn() error { - return p.peerConn.conn.Close() + return p.conn.CloseWithError(quic.ApplicationErrorCode(quic.NoError), "closed peer connection closeCon") } func (p *peer) SetRemovalFailed() { @@ -488,12 +525,12 @@ func (p *peer) GetRemovalFailed() bool { // CloseConn closes the underlying connection func (pc *peerConn) CloseConn() { - pc.conn.Close() + pc.conn.CloseWithError(quic.ApplicationErrorCode(quic.NoError), "closed peer connection closeCon") } // RemoteAddr returns peer's remote network address. func (p *peer) RemoteAddr() net.Addr { - return p.peerConn.conn.RemoteAddr() + return p.conn.RemoteAddr() } // CanSend returns true if the send queue is not full, false otherwise. @@ -501,7 +538,7 @@ func (p *peer) CanSend(chID byte) bool { if !p.IsRunning() { return false } - return p.mconn.CanSend(chID) + return true } //--------------------------------------------------- @@ -513,87 +550,345 @@ func PeerMetrics(metrics *Metrics) PeerOption { } func (p *peer) metricsReporter() { - for { - select { - case <-p.metricsTicker.C: - status := p.mconn.Status() - var sendQueueSize float64 - queues := make(map[byte]int, len(status.Channels)) - for _, chStatus := range status.Channels { - sendQueueSize += float64(chStatus.SendQueueSize) - queues[chStatus.ID] = chStatus.SendQueueSize - } + //for { + // select { + // case <-p.metricsTicker.C: + // status := p.mconn.Status() + // var sendQueueSize float64 + // queues := make(map[byte]int, len(status.Channels)) + // for _, chStatus := range status.Channels { + // sendQueueSize += float64(chStatus.SendQueueSize) + // queues[chStatus.ID] = chStatus.SendQueueSize + // } + // + // p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize) + // schema.WritePendingBytes(p.traceClient, string(p.ID()), queues) + // case <-p.Quit(): + // return + // } + //} +} + +const totalStream = 40 + +func (p *peer) initializeAboveStreams() error { + p.Mutex.Lock() + defer p.Mutex.Unlock() + for i := 0; i < totalStream; i++ { + stream1, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + return err + } + err = binary.Write(stream1, binary.BigEndian, DataChannel) + if err != nil { + p.Logger.Error("error sending channel ID", "err", err.Error()) + return err + } + p.blockPartStreams = append(p.blockPartStreams, stream1) - p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize) - schema.WritePendingBytes(p.traceClient, string(p.ID()), queues) - case <-p.Quit(): - return + stream2, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + return err } - } -} + err = binary.Write(stream2, binary.BigEndian, MempoolChannel) + if err != nil { + p.Logger.Error("error sending channel ID", "err", err.Error()) + return err + } + p.mempoolStreams = append(p.mempoolStreams, stream2) -//------------------------------------------------------------------ -// helper funcs + stream3, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + return err + } + err = binary.Write(stream3, binary.BigEndian, BlockchainChannel) + if err != nil { + p.Logger.Error("error sending channel ID", "err", err.Error()) + return err + } + p.blockchainStreams = append(p.blockchainStreams, stream3) -func createMConnection( - conn net.Conn, - p *peer, - reactorsByCh map[byte]Reactor, - msgTypeByChID map[byte]proto.Message, - chDescs []*cmtconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), - config cmtconn.MConnConfig, -) *cmtconn.MConnection { + stream4, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + return err + } + err = binary.Write(stream4, binary.BigEndian, SnapshotChannel) + if err != nil { + p.Logger.Error("error sending channel ID", "err", err.Error()) + return err + } + p.snapshotStreams = append(p.snapshotStreams, stream4) - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - // Note that its ok to panic here as it's caught in the conn._recover, - // which does onPeerError. - panic(fmt.Sprintf("Unknown channel %X", chID)) + stream5, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + return err } - mt := msgTypeByChID[chID] - msg := proto.Clone(mt) - err := proto.Unmarshal(msgBytes, msg) + err = binary.Write(stream5, binary.BigEndian, ChunkChannel) + if err != nil { + p.Logger.Error("error sending channel ID", "err", err.Error()) + return err + } + p.chunkStreams = append(p.chunkStreams, stream5) + } + return nil +} + +// Send msg bytes to the channel identified by chID byte. Returns false if the +// send queue is full after timeout, specified by MConnection. +// SendEnvelope replaces Send which will be deprecated in a future release. +func (p *peer) Send(chID byte, msgBytes []byte) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } + if chID == MempoolChannel || + chID == DataChannel || + chID == BlockchainChannel || + chID == SnapshotChannel || + chID == ChunkChannel { + return p.sendOther(chID, msgBytes) + } + stream, has := p.getStream(chID) + if !has { + newStream, err := p.conn.OpenStreamSync(context.Background()) + if err != nil { + p.Logger.Error("error opening quic stream", "err", err.Error()) + return false + } + p.addStream(newStream, chID) + stream = newStream + err = binary.Write(stream, binary.BigEndian, chID) if err != nil { - panic(fmt.Errorf("unmarshaling message: %s into type: %s", err, reflect.TypeOf(mt))) + p.Logger.Error("error sending channel ID", "err", err.Error()) + return false } + } - if w, ok := msg.(Unwrapper); ok { - msg, err = w.Unwrap() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(chID), + EOF: true, + Data: msgBytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) + if err != nil { + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "msgBytes", log.NewLazySprintf("%X", msgBytes)) + return false + } + labels := []string{ + "peer_id", string(p.ID()), + "chID", fmt.Sprintf("%#x", chID), + } + p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) + + return true +} + +const ( + BlockchainChannel = byte(0x40) + SnapshotChannel = byte(0x60) + ChunkChannel = byte(0x61) + MempoolChannel = byte(0x30) + DataChannel = byte(0x21) +) + +var ( + blockchainCount = int64(0) + snapshotCount = int64(0) + chunkCount = int64(0) + memCount = int64(0) + partsCount = int64(0) +) + +func (p *peer) sendOther(id byte, bytes []byte) bool { + if len(bytes) == 0 { + return true + } + var send func([]byte) bool + if id == BlockchainChannel { + send = func(bytes []byte) bool { + defer func() { + blockchainCount++ + }() + p.Mutex.Lock() + stream := p.blockchainStreams[blockchainCount%totalStream] + p.Mutex.Unlock() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(id), + EOF: true, + Data: bytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) if err != nil { - panic(fmt.Errorf("unwrapping message: %s", err)) + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "index", bytes[len(bytes)/2]%10, "msgBytes", log.NewLazySprintf("%X", bytes)) + return false } + return true } - - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", chID), + } else if id == SnapshotChannel { + send = func(bytes []byte) bool { + defer func() { + snapshotCount++ + }() + p.Mutex.Lock() + stream := p.snapshotStreams[snapshotCount%totalStream] + p.Mutex.Unlock() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(id), + EOF: true, + Data: bytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) + if err != nil { + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "index", bytes[len(bytes)/2]%10, "msgBytes", log.NewLazySprintf("%X", bytes)) + return false + } + return true + } + } else if id == ChunkChannel { + send = func(bytes []byte) bool { + defer func() { + chunkCount++ + }() + p.Mutex.Lock() + stream := p.chunkStreams[chunkCount%totalStream] + p.Mutex.Unlock() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(id), + EOF: true, + Data: bytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) + if err != nil { + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "index", bytes[len(bytes)/2]%10, "msgBytes", log.NewLazySprintf("%X", bytes)) + return false + } + return true + } + } else if id == MempoolChannel { + send = func(bytes []byte) bool { + defer func() { + memCount++ + }() + p.Mutex.Lock() + stream := p.mempoolStreams[memCount%totalStream] + p.Mutex.Unlock() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(id), + EOF: true, + Data: bytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) + if err != nil { + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "index", bytes[len(bytes)/2]%10, "msgBytes", log.NewLazySprintf("%X", bytes)) + return false + } + return true + } + } else { + send = func(bytes []byte) bool { + defer func() { + partsCount++ + }() + p.Mutex.Lock() + stream := p.blockPartStreams[partsCount%totalStream] + p.Mutex.Unlock() + packet := p2p.Packet{ + Sum: &p2p.Packet_PacketMsg{ + PacketMsg: &p2p.PacketMsg{ + ChannelID: int32(id), + EOF: true, + Data: bytes, + }, + }, + } + _, err := protoio.NewDelimitedWriter(stream).WriteMsg(&packet) + if err != nil { + p.Logger.Debug("Send failed", "channel", "stream_id", stream.StreamID(), "index", bytes[len(bytes)/2]%10, "msgBytes", log.NewLazySprintf("%X", bytes)) + return false + } + return true } + } + return send(bytes) +} - p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - p.metrics.MessageReceiveBytesTotal.With(append(labels, "message_type", p.mlc.ValueToMetricLabel(msg))...).Add(float64(len(msgBytes))) - schema.WriteReceivedBytes(p.traceClient, string(p.ID()), chID, len(msgBytes)) - if nr, ok := reactor.(EnvelopeReceiver); ok { - nr.ReceiveEnvelope(Envelope{ - ChannelID: chID, - Src: p, - Message: msg, - }) - } else { - reactor.Receive(chID, p, msgBytes) +func (p *peer) StartReceiving() error { + for { + stream, err := p.conn.AcceptStream(context.Background()) + if err != nil { + p.Logger.Debug("failed to accept stream", "err", err.Error()) + return err + } + var chID byte + err = binary.Read(stream, binary.BigEndian, &chID) + if err != nil { + p.Logger.Debug("failed to read channel ID", "err", err.Error()) + return err } + // start accepting data + go func() { + for { + var packet p2p.Packet + _, err := protoio.NewDelimitedReader(stream, math.MaxInt32).ReadMsg(&packet) + if err != nil { + p.Logger.Debug("failed to read data from stream", "err", err.Error()) + return + } + + dd := packet.Sum.(*p2p.Packet_PacketMsg) + if dd.PacketMsg.ChannelID != int32(chID) { + p.Logger.Error("received message on wrong channel", "expected channel id", chID, "received message channel id", dd.PacketMsg.ChannelID) + } + p.onReceive(chID, dd.PacketMsg.Data) + } + }() } +} - onError := func(r interface{}) { - onPeerError(p, r) +func someLogData(data []byte) []byte { + beginning := data + numBytes := 10 + if len(data) > numBytes { + beginning = data[:numBytes] } - return cmtconn.NewMConnectionWithConfig( - conn, - chDescs, - onReceive, - onError, - config, - ) + // Get the last `numBytes` bytes from the end + end := data + if len(data) > numBytes { + end = data[len(data)-numBytes:] + } + return append(beginning, end...) +} + +func (p *peer) SendDatagram(bytes []byte) error { + return p.conn.SendDatagram(bytes) +} + +func (p *peer) ReceiveDatagram(ctx context.Context) ([]byte, error) { + return p.conn.ReceiveDatagram(ctx) +} + +func (p *peer) GetConnectionContext() context.Context { + return p.conn.Context() } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 16fc653630..526992a729 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -94,7 +94,7 @@ func createOutboundPeerAndPerformHandshake( } timeout := 1 * time.Second ourNodeInfo := testNodeInfo(addr.ID, "host_peer") - peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo) + peerNodeInfo, err := exchangeNodeInfo(pc.conn, timeout, ourNodeInfo) if err != nil { return nil, err } @@ -195,7 +195,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { if err != nil { return nil, err } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = exchangeNodeInfo(pc.conn, time.Second, rp.nodeInfo()) if err != nil { return nil, err } @@ -220,7 +220,7 @@ func (rp *remotePeer) accept() { golog.Fatalf("Failed to create a peer: %+v", err) } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = exchangeNodeInfo(pc.conn, time.Second, rp.nodeInfo()) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) } diff --git a/p2p/switch.go b/p2p/switch.go index 8bdaa96175..ae4ffaeaa9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -32,17 +32,6 @@ const ( reconnectBackOffBaseSeconds = 3 ) -// MConnConfig returns an MConnConfig with fields updated -// from the P2PConfig. -func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig { - mConfig := conn.DefaultMConnConfig() - mConfig.FlushThrottle = cfg.FlushThrottleTimeout - mConfig.SendRate = cfg.SendRate - mConfig.RecvRate = cfg.RecvRate - mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize - return mConfig -} - //----------------------------------------------------------------------------- // An AddrBook represents an address book from the pex package, which is used @@ -132,6 +121,15 @@ func NewSwitch( traceClient: trace.NoOpTracer(), } + go func() { + for { + fmt.Println("=================================") + fmt.Println(sw.peers.list) + fmt.Println("=================================") + time.Sleep(5 * time.Second) + } + }() + // Ensure we have a completely undeterministic PRNG. sw.rng = rand.NewRand() @@ -377,7 +375,6 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { return } - sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { @@ -417,7 +414,7 @@ func (sw *Switch) getPeerAddress(peer Peer) (*NetAddress, error) { // StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer Peer) { - sw.Logger.Info("Stopping peer gracefully") + sw.Logger.Info("Stopping peer gracefully", "peer", peer.ID()) sw.stopAndRemovePeer(peer, nil) } @@ -911,6 +908,8 @@ func (sw *Switch) addPeer(p Peer) error { sw.metrics.Peers.Add(float64(1)) schema.WritePeerUpdate(sw.traceClient, string(p.ID()), schema.PeerJoin, "") + addr, _ := p.NodeInfo().NetAddress() + fmt.Println(addr) // Start all the reactor protocols on the peer. for _, reactor := range sw.reactors { reactor.AddPeer(p) @@ -918,5 +917,13 @@ func (sw *Switch) addPeer(p Peer) error { sw.Logger.Debug("Added peer", "peer", p) + go func() { + select { + case <-p.GetConnectionContext().Done(): + sw.Logger.Debug("counterparty connection closed", "peer_id", p.ID(), "remote_address", p.RemoteAddr().String()) + sw.StopPeerForError(p, p.GetConnectionContext().Err()) + } + }() + return nil } diff --git a/p2p/test_util.go b/p2p/test_util.go index c527863054..ae247ea030 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -1,3 +1,5 @@ +//go:build exclude + package p2p import ( @@ -142,7 +144,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return err } - ni, err := handshake(conn, time.Second, sw.nodeInfo) + ni, err := exchangeNodeInfo(conn, time.Second, sw.nodeInfo) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) @@ -150,16 +152,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return err } - p := newPeer( - pc, - MConnConfig(sw.config), - ni, - sw.reactorsByCh, - sw.msgTypeByChID, - sw.chDescs, - sw.StopPeerForError, - sw.mlc, - ) + p := newPeer(pc, ni, sw.reactorsByCh, sw.msgTypeByChID, sw.mlc) if err = sw.addPeer(p); err != nil { pc.CloseConn() diff --git a/p2p/transport.go b/p2p/transport.go index 5c82f1f757..d3e8c1dbcb 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -2,18 +2,22 @@ package p2p import ( "context" + "crypto/x509" + "errors" "fmt" + "github.com/quic-go/quic-go" + "github.com/quic-go/quic-go/http3" + "github.com/quic-go/quic-go/logging" + "github.com/tendermint/tendermint/libs/protoio" + tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "net" + "os" + "runtime/debug" "time" - "golang.org/x/net/netutil" - "github.com/gogo/protobuf/proto" - "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" "github.com/tendermint/tendermint/pkg/trace" - tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" ) const ( @@ -31,7 +35,7 @@ type IPResolver interface { // asynchronously running routine to the Accept method. type accept struct { netAddr *NetAddress - conn net.Conn + conn quic.Connection nodeInfo NodeInfo err error } @@ -83,12 +87,12 @@ type transportLifecycle interface { // ConnFilterFunc to be implemented by filter hooks after a new connection has // been established. The set of exisiting connections is passed along together // with all resolved IPs for the new connection. -type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error +type ConnFilterFunc func(ConnSet, quic.Connection, []net.IP) error // ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection // and refuses new ones if they come from a known ip. func ConnDuplicateIPFilter() ConnFilterFunc { - return func(cs ConnSet, c net.Conn, ips []net.IP) error { + return func(cs ConnSet, c quic.Connection, ips []net.IP) error { for _, ip := range ips { if cs.HasIP(ip) { return ErrRejected{ @@ -138,7 +142,7 @@ func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption { // multiplexed peers. type MultiplexTransport struct { netAddr NetAddress - listener net.Listener + listener *quic.Listener maxIncomingConnections int // see MaxIncomingConnections acceptc chan accept @@ -155,11 +159,6 @@ type MultiplexTransport struct { nodeKey NodeKey resolver IPResolver - // TODO(xla): This config is still needed as we parameterise peerConn and - // peer currently. All relevant configuration should be refactored into options - // with sane defaults. - mConfig conn.MConnConfig - // the tracer is passed to peers for collecting trace data tracer trace.Tracer } @@ -172,7 +171,6 @@ var _ transportLifecycle = (*MultiplexTransport)(nil) func NewMultiplexTransport( nodeInfo NodeInfo, nodeKey NodeKey, - mConfig conn.MConnConfig, tracer trace.Tracer, ) *MultiplexTransport { return &MultiplexTransport{ @@ -181,7 +179,6 @@ func NewMultiplexTransport( dialTimeout: defaultDialTimeout, filterTimeout: defaultFilterTimeout, handshakeTimeout: defaultHandshakeTimeout, - mConfig: mConfig, nodeInfo: nodeInfo, nodeKey: nodeKey, conns: NewConnSet(), @@ -218,7 +215,38 @@ func (mt *MultiplexTransport) Dial( addr NetAddress, cfg peerConfig, ) (Peer, error) { - c, err := addr.DialTimeout(mt.dialTimeout) + tlsConfig, err := NewTLSConfig(mt.nodeKey.PrivKey) + if err != nil { + return nil, err + } + tlsConfig.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) (err error) { + defer func() { + if rerr := recover(); rerr != nil { + fmt.Fprintf(os.Stderr, "panic when processing peer certificate in TLS exchangeNodeInfo: %s\n%s\n", rerr, debug.Stack()) + err = fmt.Errorf("panic when processing peer certificate in TLS exchangeNodeInfo: %s", rerr) + } + }() + + if len(rawCerts) != 1 { + return errors.New("expected one certificates in the chain") + } + cert, err := x509.ParseCertificate(rawCerts[0]) + if err != nil { + return err + } + + pubKey, err := VerifyCertificate(cert) + if err != nil { + return err + } + remoteID := PubKeyToID(pubKey) + if remoteID != addr.ID { + return fmt.Errorf("mismatch peer ID") + } + return nil + } + + c, err := addr.DialTimeout(mt.dialTimeout, tlsConfig) if err != nil { return nil, err } @@ -228,14 +256,14 @@ func (mt *MultiplexTransport) Dial( return nil, err } - secretConn, nodeInfo, err := mt.upgrade(c, &addr) + _, nodeInfo, err := mt.getNodeInfo(c) if err != nil { return nil, err } cfg.outbound = true - p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr) + p := mt.wrapPeer(c, nodeInfo, cfg, &addr) return p, nil } @@ -253,23 +281,83 @@ func (mt *MultiplexTransport) Close() error { // Listen implements transportLifecycle. func (mt *MultiplexTransport) Listen(addr NetAddress) error { - ln, err := net.Listen("tcp", addr.DialString()) + tlsConfig, err := NewTLSConfig(mt.nodeKey.PrivKey) if err != nil { return err } + tlsConfig.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) (err error) { + defer func() { + if rerr := recover(); rerr != nil { + fmt.Fprintf(os.Stderr, "panic when processing peer certificate in TLS exchangeNodeInfo: %s\n%s\n", rerr, debug.Stack()) + err = fmt.Errorf("panic when processing peer certificate in TLS exchangeNodeInfo: %s", rerr) + } + }() - if mt.maxIncomingConnections > 0 { - ln = netutil.LimitListener(ln, mt.maxIncomingConnections) - } + if len(rawCerts) != 1 { + return errors.New("expected one certificates in the chain") + } + cert, err := x509.ParseCertificate(rawCerts[0]) + if err != nil { + return err + } + _, err = VerifyCertificate(cert) + if err != nil { + return err + } + return nil + } + quickConfig := quic.Config{ + // TODO(rach-id): do we want to enable 0RTT? are the replay risks fine? + Allow0RTT: false, + MaxIdleTimeout: time.Minute, + MaxIncomingStreams: 10000, + MaxIncomingUniStreams: 10000, + KeepAlivePeriod: 10 * time.Second, + EnableDatagrams: true, + Tracer: func(ctx context.Context, perspective logging.Perspective, id quic.ConnectionID) *logging.ConnectionTracer { + return logging.NewMultiplexedConnectionTracer(GetNewTracer()) + }, + } + listener, err := quic.ListenAddr(addr.DialString(), tlsConfig, &quickConfig) + if err != nil { + return err + } + mt.listener = listener mt.netAddr = addr - mt.listener = ln - go mt.acceptPeers() + // TODO(rach-id): use a better context + go mt.acceptPeers(context.Background()) return nil } +func GetNewTracer() *logging.ConnectionTracer { + return &logging.ConnectionTracer{ + StartedConnection: func(local, remote net.Addr, srcConnID, destConnID quic.ConnectionID) { + fmt.Println(fmt.Sprintf("StartedConnection: local=%v, remote=%v, srcConnID=%v, destConnID=%v", local, remote, srcConnID, destConnID)) + }, + ClosedConnection: func(err error) { + fmt.Println(fmt.Sprintf("ClosedConnection: error=%v", err)) + }, + ReceivedRetry: func(header *logging.Header) { + //fmt.Println(fmt.Sprintf("ReceivedRetry: header=%v", header)) + }, + DroppedPacket: func(packetType logging.PacketType, packetNum logging.PacketNumber, byteCount logging.ByteCount, reason logging.PacketDropReason) { + //fmt.Println(fmt.Sprintf("DroppedPacket: packetType=%v, packetNum=%v, byteCount=%v, reason=%v", packetType, packetNum, byteCount, reason)) + }, + LostPacket: func(encLevel logging.EncryptionLevel, packetNum logging.PacketNumber, reason logging.PacketLossReason) { + //fmt.Println(fmt.Sprintf("LostPacket: encLevel=%v, packetNum=%v, reason=%v", encLevel, packetNum, reason)) + }, + Close: func() { + fmt.Println(fmt.Sprintf("ClosedConnection")) + }, + Debug: func(name, msg string) { + //fmt.Println(fmt.Sprintf("Debug: name=%v, msg=%v", name, msg)) + }, + } +} + // AddChannel registers a channel to nodeInfo. // NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated // This is a bit messy at the moment but is cleaned up in the following version @@ -283,9 +371,9 @@ func (mt *MultiplexTransport) AddChannel(chID byte) { } } -func (mt *MultiplexTransport) acceptPeers() { +func (mt *MultiplexTransport) acceptPeers(ctx context.Context) { for { - c, err := mt.listener.Accept() + c, err := mt.listener.Accept(ctx) if err != nil { // If Close() has been called, silently exit. select { @@ -306,7 +394,7 @@ func (mt *MultiplexTransport) acceptPeers() { // Reference: https://github.com/tendermint/tendermint/issues/2047 // // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking - go func(c net.Conn) { + go func(c quic.Connection) { defer func() { if r := recover(); r != nil { err := ErrRejected{ @@ -318,34 +406,32 @@ func (mt *MultiplexTransport) acceptPeers() { case mt.acceptc <- accept{err: err}: case <-mt.closec: // Give up if the transport was closed. - _ = c.Close() + _ = c.CloseWithError(quic.ApplicationErrorCode(http3.ErrCodeConnectError), err.Error()) return } } }() var ( - nodeInfo NodeInfo - secretConn *conn.SecretConnection - netAddr *NetAddress + nodeInfo NodeInfo + netAddr *NetAddress ) err := mt.filterConn(c) if err == nil { - secretConn, nodeInfo, err = mt.upgrade(c, nil) + _, nodeInfo, err = mt.getNodeInfo(c) if err == nil { addr := c.RemoteAddr() - id := PubKeyToID(secretConn.RemotePubKey()) - netAddr = NewNetAddress(id, addr) + netAddr = NewUDPNetAddress(nodeInfo.ID(), addr) } } select { - case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}: + case mt.acceptc <- accept{netAddr, c, nodeInfo, err}: // Make the upgraded peer available. case <-mt.closec: // Give up if the transport was closed. - _ = c.Close() + _ = c.CloseWithError(quic.ApplicationErrorCode(http3.ErrCodeConnectError), "closes transport") return } }(c) @@ -359,16 +445,17 @@ func (mt *MultiplexTransport) Cleanup(p Peer) { _ = p.CloseConn() } -func (mt *MultiplexTransport) cleanup(c net.Conn) error { +func (mt *MultiplexTransport) cleanup(c quic.Connection) error { mt.conns.Remove(c) - return c.Close() + // TODO(rach-id): valid error + return c.CloseWithError(quic.ApplicationErrorCode(quic.NoError), "closing for cleanup") } -func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { +func (mt *MultiplexTransport) filterConn(c quic.Connection) (err error) { defer func() { if err != nil { - _ = c.Close() + _ = c.CloseWithError(quic.ApplicationErrorCode(http3.ErrCodeConnectError), err.Error()) } }() @@ -386,7 +473,7 @@ func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { errc := make(chan error, len(mt.connFilters)) for _, f := range mt.connFilters { - go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { + go func(f ConnFilterFunc, c quic.Connection, ips []net.IP, errc chan<- error) { errc <- f(mt.conns, c, ips) }(f, c, ips, errc) } @@ -408,52 +495,25 @@ func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { return nil } -func (mt *MultiplexTransport) upgrade( - c net.Conn, - dialedAddr *NetAddress, -) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { +func (mt *MultiplexTransport) getNodeInfo(c quic.Connection) (conn quic.Connection, remoteNodeInfo NodeInfo, err error) { defer func() { if err != nil { _ = mt.cleanup(c) } }() - secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) + remoteNodeInfo, err = exchangeNodeInfo(c, mt.handshakeTimeout, mt.nodeInfo) if err != nil { return nil, nil, ErrRejected{ conn: c, - err: fmt.Errorf("secret conn failed: %v", err), + err: fmt.Errorf("exchangeNodeInfo failed: %v", err), isAuthFailure: true, } } + fmt.Println("remote node info") + fmt.Println(remoteNodeInfo) - // For outgoing conns, ensure connection key matches dialed key. - connID := PubKeyToID(secretConn.RemotePubKey()) - if dialedAddr != nil { - if dialedID := dialedAddr.ID; connID != dialedID { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) dialed ID (%v) mismatch", - connID, - dialedID, - ), - isAuthFailure: true, - } - } - } - - nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) - if err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: fmt.Errorf("handshake failed: %v", err), - isAuthFailure: true, - } - } - - if err := nodeInfo.Validate(); err != nil { + if err := remoteNodeInfo.Validate(); err != nil { return nil, nil, ErrRejected{ conn: c, err: err, @@ -461,44 +521,86 @@ func (mt *MultiplexTransport) upgrade( } } - // Ensure connection key matches self reported key. - if connID != nodeInfo.ID() { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) NodeInfo.ID (%v) mismatch", - connID, - nodeInfo.ID(), - ), - isAuthFailure: true, - } - } - // Reject self. - if mt.nodeInfo.ID() == nodeInfo.ID() { + if mt.nodeInfo.ID() == remoteNodeInfo.ID() { return nil, nil, ErrRejected{ - addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()), + addr: *NewUDPNetAddress(remoteNodeInfo.ID(), c.RemoteAddr()), conn: c, - id: nodeInfo.ID(), + id: remoteNodeInfo.ID(), isSelf: true, } } - if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { + if err := mt.nodeInfo.CompatibleWith(remoteNodeInfo); err != nil { return nil, nil, ErrRejected{ conn: c, err: err, - id: nodeInfo.ID(), + id: remoteNodeInfo.ID(), isIncompatible: true, } } - return secretConn, nodeInfo, nil + return c, remoteNodeInfo, nil +} + +func exchangeNodeInfo( + c quic.Connection, + timeout time.Duration, + nodeInfo NodeInfo, +) (NodeInfo, error) { + var ( + errc = make(chan error, 2) + + pbpeerNodeInfo tmp2p.DefaultNodeInfo + peerNodeInfo DefaultNodeInfo + ourNodeInfo = nodeInfo.(DefaultNodeInfo) + ) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + fmt.Println("our node info") + fmt.Println(ourNodeInfo) + + go func(errc chan<- error, c quic.Connection) { + stream, err := c.OpenStreamSync(ctx) + if err != nil { + errc <- err + return + } + _, err = protoio.NewDelimitedWriter(stream).WriteMsg(ourNodeInfo.ToProto()) + errc <- err + }(errc, c) + go func(errc chan<- error, c quic.Connection) { + stream, err := c.AcceptStream(ctx) + if err != nil { + errc <- err + return + } + protoReader := protoio.NewDelimitedReader(stream, MaxNodeInfoSize()) + _, err = protoReader.ReadMsg(&pbpeerNodeInfo) + errc <- err + }(errc, c) + + for i := 0; i < cap(errc); i++ { + err := <-errc + if err != nil { + return nil, err + } + } + + peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) + if err != nil { + return nil, err + } + + fmt.Println("peer node info") + fmt.Println(peerNodeInfo) + + return peerNodeInfo, nil } func (mt *MultiplexTransport) wrapPeer( - c net.Conn, + c quic.Connection, ni NodeInfo, cfg peerConfig, socketAddr *NetAddress, @@ -525,13 +627,11 @@ func (mt *MultiplexTransport) wrapPeer( p := newPeer( peerConn, - mt.mConfig, ni, cfg.reactorsByCh, cfg.msgTypeByChID, - cfg.chDescs, - cfg.onPeerError, cfg.mlc, + cfg.onPeerError, PeerMetrics(cfg.metrics), WithPeerTracer(mt.tracer), ) @@ -539,66 +639,7 @@ func (mt *MultiplexTransport) wrapPeer( return p } -func handshake( - c net.Conn, - timeout time.Duration, - nodeInfo NodeInfo, -) (NodeInfo, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err - } - - var ( - errc = make(chan error, 2) - - pbpeerNodeInfo tmp2p.DefaultNodeInfo - peerNodeInfo DefaultNodeInfo - ourNodeInfo = nodeInfo.(DefaultNodeInfo) - ) - - go func(errc chan<- error, c net.Conn) { - _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto()) - errc <- err - }(errc, c) - go func(errc chan<- error, c net.Conn) { - protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - _, err := protoReader.ReadMsg(&pbpeerNodeInfo) - errc <- err - }(errc, c) - - for i := 0; i < cap(errc); i++ { - err := <-errc - if err != nil { - return nil, err - } - } - - peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) - if err != nil { - return nil, err - } - - return peerNodeInfo, c.SetDeadline(time.Time{}) -} - -func upgradeSecretConn( - c net.Conn, - timeout time.Duration, - privKey crypto.PrivKey, -) (*conn.SecretConnection, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err - } - - sc, err := conn.MakeSecretConnection(c, privKey) - if err != nil { - return nil, err - } - - return sc, sc.SetDeadline(time.Time{}) -} - -func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { +func resolveIPs(resolver IPResolver, c quic.Connection) ([]net.IP, error) { host, _, err := net.SplitHostPort(c.RemoteAddr().String()) if err != nil { return nil, err diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 151ac7edf7..430da19d93 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -306,7 +306,7 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { return } - _, err = handshake(sc, 200*time.Millisecond, + _, err = exchangeNodeInfo(sc, 200*time.Millisecond, testNodeInfo( PubKeyToID(ed25519.GenPrivKey().PubKey()), "slow_peer", @@ -612,7 +612,7 @@ func TestTransportHandshake(t *testing.T) { t.Fatal(err) } - ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo()) + ni, err := exchangeNodeInfo(c, 20*time.Millisecond, emptyNodeInfo()) if err != nil { t.Fatal(err) } diff --git a/p2p/types.go b/p2p/types.go index 7a741bd295..d4fb170fe1 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -2,12 +2,18 @@ package p2p import ( "github.com/gogo/protobuf/proto" + "github.com/quic-go/quic-go" "github.com/tendermint/tendermint/p2p/conn" tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" + "time" ) type ChannelDescriptor = conn.ChannelDescriptor -type ConnectionStatus = conn.ConnectionStatus + +type ConnectionStatus struct { + Duration time.Duration + ConnectionState quic.ConnectionState +} // Envelope contains a message with sender routing info. type Envelope struct { diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index bcc198c668..bdf80eb295 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "github.com/quic-go/quic-go" + abci "github.com/tendermint/tendermint/abci/types" "net" "net/http" _ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port @@ -18,7 +20,6 @@ import ( dbm "github.com/cometbft/cometbft-db" - abci "github.com/tendermint/tendermint/abci/types" bcv0 "github.com/tendermint/tendermint/blockchain/v0" bcv1 "github.com/tendermint/tendermint/blockchain/v1" bcv2 "github.com/tendermint/tendermint/blockchain/v2" @@ -550,8 +551,7 @@ func createTransport( []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig, trace.NoOpTracer()) + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, trace.NoOpTracer()) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) @@ -566,7 +566,7 @@ func createTransport( connFilters = append( connFilters, // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + func(_ p2p.ConnSet, c quic.Connection, _ []net.IP) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), }) diff --git a/types/params.go b/types/params.go index 2fc704f115..d58906155e 100644 --- a/types/params.go +++ b/types/params.go @@ -12,7 +12,7 @@ import ( const ( // MaxBlockSizeBytes is the maximum permitted size of the blocks. - MaxBlockSizeBytes = 104857600 // 100MB + MaxBlockSizeBytes = 1048576000 // 100MB // BlockPartSizeBytes is the size of one block part. BlockPartSizeBytes uint32 = 65536 // 64kB