diff --git a/go.mod b/go.mod index e9afffd7d9e7..15052ca3f366 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/DataDog/zstd v1.5.2 github.com/Microsoft/go-winio v0.5.2 github.com/NYTimes/gziphandler v1.1.1 - github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714 + github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 diff --git a/go.sum b/go.sum index b7d1d9c26982..b74c5dbbb979 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714 h1:dhuxCYjB+4isvJMHViHkS50NgkQ/dHY2ZZmk8ESAyxw= -github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714/go.mod h1:LwQhIuKmd8JPemahE1f7TvsE3WRzCFdjvNWBPxXSaNo= +github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c h1:bWPdqoi+J6ztfVhEl7iexFSaKyaFlMpIltIMVTpXDQY= +github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c/go.mod h1:v8pqR8wC9VuyPTEbI6/wmflXPIAmUr6SUwEKP+hi9iU= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/network/p2p/client.go b/network/p2p/client.go index 6f2e35c26896..982b7c50f1f0 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -40,6 +40,7 @@ type CrossChainAppResponseCallback func( type Client struct { handlerID uint64 + handlerIDStr string handlerPrefix []byte router *router sender common.AppSender @@ -95,8 +96,8 @@ func (c *Client) AppRequest( } c.router.pendingAppRequests[requestID] = pendingAppRequest{ - AppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + handlerID: c.handlerIDStr, + callback: onResponse, } c.router.requestID += 2 } @@ -158,8 +159,8 @@ func (c *Client) CrossChainAppRequest( } c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{ - CrossChainAppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + handlerID: c.handlerIDStr, + callback: onResponse, } c.router.requestID += 2 diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index f2150e8f6afd..a55cec20da59 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -120,7 +120,9 @@ func TestGossiperGossip(t *testing.T) { responseSender := &common.FakeSender{ SentAppResponse: make(chan []byte, 1), } - responseNetwork := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "") + responseNetwork, err := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "") + require.NoError(err) + responseBloom, err := NewBloomFilter(1000, 0.01) require.NoError(err) responseSet := testSet{ @@ -133,14 +135,14 @@ func TestGossiperGossip(t *testing.T) { handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry()) require.NoError(err) - _, err = responseNetwork.NewAppProtocol(0x0, handler) - require.NoError(err) + require.NoError(responseNetwork.AddHandler(0x0, handler)) requestSender := &common.FakeSender{ SentAppRequest: make(chan []byte, 1), } - requestNetwork := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "") + requestNetwork, err := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "") + require.NoError(err) require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil)) bloom, err := NewBloomFilter(1000, 0.01) @@ -153,7 +155,7 @@ func TestGossiperGossip(t *testing.T) { require.NoError(requestSet.Add(item)) } - requestClient, err := requestNetwork.NewAppProtocol(0x0, nil) + requestClient := requestNetwork.NewClient(0x0) require.NoError(err) config := Config{ diff --git a/network/p2p/handler.go b/network/p2p/handler.go index b85195a5255c..27c220565a04 100644 --- a/network/p2p/handler.go +++ b/network/p2p/handler.go @@ -63,28 +63,48 @@ func (NoOpHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []by return nil, nil } +func NewValidatorHandler( + handler Handler, + validatorSet ValidatorSet, + log logging.Logger, +) *ValidatorHandler { + return &ValidatorHandler{ + handler: handler, + validatorSet: validatorSet, + log: log, + } +} + // ValidatorHandler drops messages from non-validators type ValidatorHandler struct { - Handler - ValidatorSet ValidatorSet - Log logging.Logger + handler Handler + validatorSet ValidatorSet + log logging.Logger } func (v ValidatorHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { - if !v.ValidatorSet.Has(ctx, nodeID) { - v.Log.Debug("dropping message", zap.Stringer("nodeID", nodeID)) + if !v.validatorSet.Has(ctx, nodeID) { + v.log.Debug( + "dropping message", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "not a validator"), + ) return } - v.Handler.AppGossip(ctx, nodeID, gossipBytes) + v.handler.AppGossip(ctx, nodeID, gossipBytes) } func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) { - if !v.ValidatorSet.Has(ctx, nodeID) { + if !v.validatorSet.Has(ctx, nodeID) { return nil, ErrNotValidator } - return v.Handler.AppRequest(ctx, nodeID, deadline, requestBytes) + return v.handler.AppRequest(ctx, nodeID, deadline, requestBytes) +} + +func (v ValidatorHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) { + return v.handler.CrossChainAppRequest(ctx, chainID, deadline, requestBytes) } // responder automatically sends the response for a given request diff --git a/network/p2p/handler_test.go b/network/p2p/handler_test.go index b7a1b6bec899..3bbb6bc46711 100644 --- a/network/p2p/handler_test.go +++ b/network/p2p/handler_test.go @@ -55,15 +55,15 @@ func TestValidatorHandlerAppGossip(t *testing.T) { require := require.New(t) called := false - handler := ValidatorHandler{ - Handler: testHandler{ + handler := NewValidatorHandler( + &testHandler{ appGossipF: func(context.Context, ids.NodeID, []byte) { called = true }, }, - ValidatorSet: tt.validatorSet, - Log: logging.NoLog{}, - } + tt.validatorSet, + logging.NoLog{}, + ) handler.AppGossip(context.Background(), tt.nodeID, []byte("foobar")) require.Equal(tt.expected, called) @@ -100,11 +100,11 @@ func TestValidatorHandlerAppRequest(t *testing.T) { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - handler := ValidatorHandler{ - Handler: NoOpHandler{}, - ValidatorSet: tt.validatorSet, - Log: logging.NoLog{}, - } + handler := NewValidatorHandler( + NoOpHandler{}, + tt.validatorSet, + logging.NoLog{}, + ) _, err := handler.AppRequest(context.Background(), tt.nodeID, time.Time{}, []byte("foobar")) require.ErrorIs(err, tt.expected) diff --git a/network/p2p/network.go b/network/p2p/network.go index bed562b1204d..76b5c13e76ad 100644 --- a/network/p2p/network.go +++ b/network/p2p/network.go @@ -6,6 +6,7 @@ package p2p import ( "context" "encoding/binary" + "strconv" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -23,6 +25,9 @@ var ( _ validators.Connector = (*Network)(nil) _ common.AppHandler = (*Network)(nil) _ NodeSampler = (*peerSampler)(nil) + + handlerLabel = "handlerID" + labelNames = []string{handlerLabel} ) // ClientOption configures Client @@ -53,17 +58,108 @@ type clientOptions struct { func NewNetwork( log logging.Logger, sender common.AppSender, - metrics prometheus.Registerer, + registerer prometheus.Registerer, namespace string, -) *Network { - return &Network{ - Peers: &Peers{}, - log: log, - sender: sender, - metrics: metrics, - namespace: namespace, - router: newRouter(log, sender, metrics, namespace), +) (*Network, error) { + metrics := metrics{ + appRequestTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_request_time", + Help: "app request time (ns)", + }, labelNames), + appRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_request_count", + Help: "app request count (n)", + }, labelNames), + appResponseTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_response_time", + Help: "app response time (ns)", + }, labelNames), + appResponseCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_response_count", + Help: "app response count (n)", + }, labelNames), + appRequestFailedTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_request_failed_time", + Help: "app request failed time (ns)", + }, labelNames), + appRequestFailedCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_request_failed_count", + Help: "app request failed count (ns)", + }, labelNames), + appGossipTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_gossip_time", + Help: "app gossip time (ns)", + }, labelNames), + appGossipCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "app_gossip_count", + Help: "app gossip count (n)", + }, labelNames), + crossChainAppRequestTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_request_time", + Help: "cross chain app request time (ns)", + }, labelNames), + crossChainAppRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_request_count", + Help: "cross chain app request count (n)", + }, labelNames), + crossChainAppResponseTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_response_time", + Help: "cross chain app response time (ns)", + }, labelNames), + crossChainAppResponseCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_response_count", + Help: "cross chain app response count (n)", + }, labelNames), + crossChainAppRequestFailedTime: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_request_failed_time", + Help: "cross chain app request failed time (ns)", + }, labelNames), + crossChainAppRequestFailedCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "cross_chain_app_request_failed_count", + Help: "cross chain app request failed count (n)", + }, labelNames), + } + + err := utils.Err( + registerer.Register(metrics.appRequestTime), + registerer.Register(metrics.appRequestCount), + registerer.Register(metrics.appResponseTime), + registerer.Register(metrics.appResponseCount), + registerer.Register(metrics.appRequestFailedTime), + registerer.Register(metrics.appRequestFailedCount), + registerer.Register(metrics.appGossipTime), + registerer.Register(metrics.appGossipCount), + registerer.Register(metrics.crossChainAppRequestTime), + registerer.Register(metrics.crossChainAppRequestCount), + registerer.Register(metrics.crossChainAppResponseTime), + registerer.Register(metrics.crossChainAppResponseCount), + registerer.Register(metrics.crossChainAppRequestFailedTime), + registerer.Register(metrics.crossChainAppRequestFailedCount), + ) + if err != nil { + return nil, err } + + return &Network{ + Peers: &Peers{}, + log: log, + sender: sender, + router: newRouter(log, sender, metrics), + }, nil } // Network exposes networking state and supports building p2p application @@ -71,10 +167,8 @@ func NewNetwork( type Network struct { Peers *Peers - log logging.Logger - sender common.AppSender - metrics prometheus.Registerer - namespace string + log logging.Logger + sender common.AppSender router *router } @@ -117,16 +211,12 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error { return nil } -// NewAppProtocol reserves an identifier for an application protocol handler and -// returns a Client that can be used to send messages for the corresponding -// protocol. -func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) { - if err := n.router.addHandler(handlerID, handler); err != nil { - return nil, err - } - +// NewClient returns a Client that can be used to send messages for the +// corresponding protocol. +func (n *Network) NewClient(handlerID uint64, options ...ClientOption) *Client { client := &Client{ handlerID: handlerID, + handlerIDStr: strconv.FormatUint(handlerID, 10), handlerPrefix: binary.AppendUvarint(nil, handlerID), sender: n.sender, router: n.router, @@ -141,7 +231,12 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C option.apply(client.options) } - return client, nil + return client +} + +// AddHandler reserves an identifier for an application protocol +func (n *Network) AddHandler(handlerID uint64, handler Handler) error { + return n.router.addHandler(handlerID, handler) } // Peers contains metadata about the current set of connected peers diff --git a/network/p2p/network_test.go b/network/p2p/network_test.go index 1adb25f81371..b8775c112d9c 100644 --- a/network/p2p/network_test.go +++ b/network/p2p/network_test.go @@ -63,9 +63,11 @@ func TestMessageRouting(t *testing.T) { SentAppRequest: make(chan []byte, 1), SentCrossChainAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - client, err := network.NewAppProtocol(1, testHandler) + + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + require.NoError(network.AddHandler(1, testHandler)) + client := network.NewClient(1) require.NoError(client.AppGossip(ctx, wantMsg)) require.NoError(network.AppGossip(ctx, wantNodeID, <-sender.SentAppGossip)) @@ -91,11 +93,11 @@ func TestClientPrefixesMessages(t *testing.T) { SentAppGossipSpecific: make(chan []byte, 1), SentCrossChainAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - require.NoError(network.Connected(ctx, ids.EmptyNodeID, nil)) - client, err := network.NewAppProtocol(handlerID, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + require.NoError(network.Connected(ctx, ids.EmptyNodeID, nil)) + client := network.NewClient(handlerID) want := []byte("message") @@ -147,10 +149,9 @@ func TestAppRequestResponse(t *testing.T) { sender := common.FakeSender{ SentAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - - client, err := network.NewAppProtocol(handlerID, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + client := network.NewClient(handlerID) wantResponse := []byte("response") wantNodeID := ids.GenerateTestNodeID() @@ -179,10 +180,9 @@ func TestAppRequestFailed(t *testing.T) { sender := common.FakeSender{ SentAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - - client, err := network.NewAppProtocol(handlerID, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + client := network.NewClient(handlerID) wantNodeID := ids.GenerateTestNodeID() done := make(chan struct{}) @@ -210,10 +210,9 @@ func TestCrossChainAppRequestResponse(t *testing.T) { sender := common.FakeSender{ SentCrossChainAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - - client, err := network.NewAppProtocol(handlerID, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + client := network.NewClient(handlerID) wantChainID := ids.GenerateTestID() wantResponse := []byte("response") @@ -242,10 +241,9 @@ func TestCrossChainAppRequestFailed(t *testing.T) { sender := common.FakeSender{ SentCrossChainAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - - client, err := network.NewAppProtocol(handlerID, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + client := network.NewClient(handlerID) wantChainID := ids.GenerateTestID() done := make(chan struct{}) @@ -302,9 +300,9 @@ func TestMessageForUnregisteredHandler(t *testing.T) { return nil, nil }, } - network := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "") - _, err := network.NewAppProtocol(handlerID, handler) + network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "") require.NoError(err) + require.NoError(network.AddHandler(handlerID, handler)) require.Nil(network.AppRequest(ctx, ids.EmptyNodeID, 0, time.Time{}, tt.msg)) require.Nil(network.AppGossip(ctx, ids.EmptyNodeID, tt.msg)) @@ -350,17 +348,18 @@ func TestResponseForUnrequestedRequest(t *testing.T) { return nil, nil }, } - network := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "") - _, err := network.NewAppProtocol(handlerID, handler) + network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "") require.NoError(err) + require.NoError(network.AddHandler(handlerID, handler)) err = network.AppResponse(ctx, ids.EmptyNodeID, 0, []byte("foobar")) require.ErrorIs(err, ErrUnrequestedResponse) - err = network.AppRequestFailed(ctx, ids.EmptyNodeID, 0, errFoo) + err = network.AppRequestFailed(ctx, ids.EmptyNodeID, 0, common.ErrTimeout) require.ErrorIs(err, ErrUnrequestedResponse) err = network.CrossChainAppResponse(ctx, ids.Empty, 0, []byte("foobar")) require.ErrorIs(err, ErrUnrequestedResponse) - err = network.CrossChainAppRequestFailed(ctx, ids.Empty, 0, errFoo) + err = network.CrossChainAppRequestFailed(ctx, ids.Empty, 0, common.ErrTimeout) + require.ErrorIs(err, ErrUnrequestedResponse) }) } @@ -377,12 +376,13 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { SentAppRequest: make(chan []byte, 1), } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - client, err := network.NewAppProtocol(0x1, &NoOpHandler{}) + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(err) + client := network.NewClient(0x1) noOpCallback := func(context.Context, ids.NodeID, []byte, error) {} // create a request that never gets a response + network.router.requestID = 1 require.NoError(client.AppRequest(ctx, set.Of(ids.EmptyNodeID), []byte{}, noOpCallback)) <-sender.SentAppRequest @@ -458,7 +458,8 @@ func TestPeersSample(t *testing.T) { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - network := NewNetwork(logging.NoLog{}, &common.FakeSender{}, prometheus.NewRegistry(), "") + network, err := NewNetwork(logging.NoLog{}, &common.FakeSender{}, prometheus.NewRegistry(), "") + require.NoError(err) for connected := range tt.connected { require.NoError(network.Connected(context.Background(), connected, nil)) @@ -507,13 +508,13 @@ func TestAppRequestAnyNodeSelection(t *testing.T) { }, } - n := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + n, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) for _, peer := range tt.peers { require.NoError(n.Connected(context.Background(), peer, &version.Application{})) } - client, err := n.NewAppProtocol(1, nil) - require.NoError(err) + client := n.NewClient(1) err = client.AppRequestAny(context.Background(), []byte("foobar"), nil) require.ErrorIs(err, tt.expected) @@ -596,14 +597,14 @@ func TestNodeSamplerClientOption(t *testing.T) { return nil }, } - network := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) ctx := context.Background() for _, peer := range tt.peers { require.NoError(network.Connected(ctx, peer, nil)) } - client, err := network.NewAppProtocol(0x0, nil, tt.option(t, network)) - require.NoError(err) + client := network.NewClient(0, tt.option(t, network)) if err = client.AppRequestAny(ctx, []byte("request"), nil); err != nil { close(done) @@ -614,3 +615,13 @@ func TestNodeSamplerClientOption(t *testing.T) { }) } } + +// Tests that a given protocol can have more than one client +func TestMultipleClients(t *testing.T) { + require := require.New(t) + + n, err := NewNetwork(logging.NoLog{}, &common.SenderTest{}, prometheus.NewRegistry(), "") + require.NoError(err) + _ = n.NewClient(0) + _ = n.NewClient(0) +} diff --git a/network/p2p/router.go b/network/p2p/router.go index 0b4eff4eb576..5c52b3d1aaa2 100644 --- a/network/p2p/router.go +++ b/network/p2p/router.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "errors" "fmt" + "strconv" "sync" "time" @@ -19,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/metric" ) var ( @@ -29,40 +29,46 @@ var ( _ common.AppHandler = (*router)(nil) ) -type metrics struct { - appRequestTime metric.Averager - appRequestFailedTime metric.Averager - appResponseTime metric.Averager - appGossipTime metric.Averager - crossChainAppRequestTime metric.Averager - crossChainAppRequestFailedTime metric.Averager - crossChainAppResponseTime metric.Averager -} - type pendingAppRequest struct { - *metrics - AppResponseCallback + handlerID string + callback AppResponseCallback } type pendingCrossChainAppRequest struct { - *metrics - CrossChainAppResponseCallback + handlerID string + callback CrossChainAppResponseCallback } // meteredHandler emits metrics for a Handler type meteredHandler struct { *responder - *metrics + metrics +} + +type metrics struct { + appRequestTime *prometheus.CounterVec + appRequestCount *prometheus.CounterVec + appResponseTime *prometheus.CounterVec + appResponseCount *prometheus.CounterVec + appRequestFailedTime *prometheus.CounterVec + appRequestFailedCount *prometheus.CounterVec + appGossipTime *prometheus.CounterVec + appGossipCount *prometheus.CounterVec + crossChainAppRequestTime *prometheus.CounterVec + crossChainAppRequestCount *prometheus.CounterVec + crossChainAppResponseTime *prometheus.CounterVec + crossChainAppResponseCount *prometheus.CounterVec + crossChainAppRequestFailedTime *prometheus.CounterVec + crossChainAppRequestFailedCount *prometheus.CounterVec } // router routes incoming application messages to the corresponding registered // app handler. App messages must be made using the registered handler's // corresponding Client. type router struct { - log logging.Logger - sender common.AppSender - metrics prometheus.Registerer - namespace string + log logging.Logger + sender common.AppSender + metrics metrics lock sync.RWMutex handlers map[uint64]*meteredHandler @@ -75,14 +81,12 @@ type router struct { func newRouter( log logging.Logger, sender common.AppSender, - metrics prometheus.Registerer, - namespace string, + metrics metrics, ) *router { return &router{ log: log, sender: sender, metrics: metrics, - namespace: namespace, handlers: make(map[uint64]*meteredHandler), pendingAppRequests: make(map[uint32]pendingAppRequest), pendingCrossChainAppRequests: make(map[uint32]pendingCrossChainAppRequest), @@ -99,76 +103,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { return fmt.Errorf("failed to register handler id %d: %w", handlerID, ErrExistingAppProtocol) } - appRequestTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_request", handlerID), - "app request time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err) - } - - appRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err) - } - - appResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_response", handlerID), - "app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err) - } - - appGossipTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_gossip", handlerID), - "app gossip time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app gossip metric for handler_%d: %w", handlerID, err) - } - - crossChainAppRequestTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_request", handlerID), - "cross chain app request time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err) - } - - crossChainAppRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err) - } - - crossChainAppResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID), - "cross chain app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err) - } - r.handlers[handlerID] = &meteredHandler{ responder: &responder{ Handler: handler, @@ -176,15 +110,7 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { log: r.log, sender: r.sender, }, - metrics: &metrics{ - appRequestTime: appRequestTime, - appRequestFailedTime: appRequestFailedTime, - appResponseTime: appResponseTime, - appGossipTime: appGossipTime, - crossChainAppRequestTime: crossChainAppRequestTime, - crossChainAppRequestFailedTime: crossChainAppRequestFailedTime, - crossChainAppResponseTime: crossChainAppResponseTime, - }, + metrics: r.metrics, } return nil @@ -197,7 +123,7 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { // considered fatal func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error { start := time.Now() - parsedMsg, handler, ok := r.parse(request) + parsedMsg, handler, handlerID, ok := r.parse(request) if !ok { r.log.Debug("failed to process message", zap.Stringer("messageOp", message.AppRequestOp), @@ -214,7 +140,23 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui return err } - handler.metrics.appRequestTime.Observe(float64(time.Since(start))) + labels := prometheus.Labels{ + handlerLabel: handlerID, + } + + metricCount, err := r.metrics.appRequestCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.appRequestTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -231,8 +173,25 @@ func (r *router) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reques return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, nil, appErr) - pending.appRequestFailedTime.Observe(float64(time.Since(start))) + pending.callback(ctx, nodeID, nil, appErr) + + labels := prometheus.Labels{ + handlerLabel: pending.handlerID, + } + + metricCount, err := r.metrics.appRequestFailedCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.appRequestFailedTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -249,8 +208,25 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, response, nil) - pending.appResponseTime.Observe(float64(time.Since(start))) + pending.callback(ctx, nodeID, response, nil) + + labels := prometheus.Labels{ + handlerLabel: pending.handlerID, + } + + metricCount, err := r.metrics.appResponseCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.appResponseTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -261,7 +237,7 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u // considered fatal func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte) error { start := time.Now() - parsedMsg, handler, ok := r.parse(gossip) + parsedMsg, handler, handlerID, ok := r.parse(gossip) if !ok { r.log.Debug("failed to process message", zap.Stringer("messageOp", message.AppGossipOp), @@ -273,7 +249,23 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte handler.AppGossip(ctx, nodeID, parsedMsg) - handler.metrics.appGossipTime.Observe(float64(time.Since(start))) + labels := prometheus.Labels{ + handlerLabel: handlerID, + } + + metricCount, err := r.metrics.appGossipCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.appGossipTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -291,7 +283,7 @@ func (r *router) CrossChainAppRequest( msg []byte, ) error { start := time.Now() - parsedMsg, handler, ok := r.parse(msg) + parsedMsg, handler, handlerID, ok := r.parse(msg) if !ok { r.log.Debug("failed to process message", zap.Stringer("messageOp", message.CrossChainAppRequestOp), @@ -307,7 +299,23 @@ func (r *router) CrossChainAppRequest( return err } - handler.metrics.crossChainAppRequestTime.Observe(float64(time.Since(start))) + labels := prometheus.Labels{ + handlerLabel: handlerID, + } + + metricCount, err := r.metrics.crossChainAppRequestCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.crossChainAppRequestTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -324,8 +332,25 @@ func (r *router) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, nil, appErr) - pending.crossChainAppRequestFailedTime.Observe(float64(time.Since(start))) + pending.callback(ctx, chainID, nil, appErr) + + labels := prometheus.Labels{ + handlerLabel: pending.handlerID, + } + + metricCount, err := r.metrics.crossChainAppRequestFailedCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.crossChainAppRequestFailedTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -342,8 +367,25 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, response, nil) - pending.crossChainAppResponseTime.Observe(float64(time.Since(start))) + pending.callback(ctx, chainID, response, nil) + + labels := prometheus.Labels{ + handlerLabel: pending.handlerID, + } + + metricCount, err := r.metrics.crossChainAppResponseCount.GetMetricWith(labels) + if err != nil { + return err + } + + metricTime, err := r.metrics.crossChainAppResponseTime.GetMetricWith(labels) + if err != nil { + return err + } + + metricCount.Inc() + metricTime.Add(float64(time.Since(start))) + return nil } @@ -353,20 +395,22 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ // Returns: // - The unprefixed protocol message. // - The protocol responder. +// - The protocol metric name. // - A boolean indicating that parsing succeeded. // // Invariant: Assumes [r.lock] isn't held. -func (r *router) parse(msg []byte) ([]byte, *meteredHandler, bool) { +func (r *router) parse(msg []byte) ([]byte, *meteredHandler, string, bool) { handlerID, bytesRead := binary.Uvarint(msg) if bytesRead <= 0 { - return nil, nil, false + return nil, nil, "", false } r.lock.RLock() defer r.lock.RUnlock() + handlerStr := strconv.FormatUint(handlerID, 10) handler, ok := r.handlers[handlerID] - return msg[bytesRead:], handler, ok + return msg[bytesRead:], handler, handlerStr, ok } // Invariant: Assumes [r.lock] isn't held. diff --git a/network/p2p/throttler_handler.go b/network/p2p/throttler_handler.go index 4dd142c400c1..aee5bee70795 100644 --- a/network/p2p/throttler_handler.go +++ b/network/p2p/throttler_handler.go @@ -20,25 +20,41 @@ var ( _ Handler = (*ThrottlerHandler)(nil) ) +func NewThrottlerHandler(handler Handler, throttler Throttler, log logging.Logger) *ThrottlerHandler { + return &ThrottlerHandler{ + handler: handler, + throttler: throttler, + log: log, + } +} + type ThrottlerHandler struct { - Handler - Throttler Throttler - Log logging.Logger + handler Handler + throttler Throttler + log logging.Logger } func (t ThrottlerHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { - if !t.Throttler.Handle(nodeID) { - t.Log.Debug("dropping message", zap.Stringer("nodeID", nodeID)) + if !t.throttler.Handle(nodeID) { + t.log.Debug( + "dropping message", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "throttled"), + ) return } - t.Handler.AppGossip(ctx, nodeID, gossipBytes) + t.handler.AppGossip(ctx, nodeID, gossipBytes) } func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) { - if !t.Throttler.Handle(nodeID) { + if !t.throttler.Handle(nodeID) { return nil, fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled) } - return t.Handler.AppRequest(ctx, nodeID, deadline, requestBytes) + return t.handler.AppRequest(ctx, nodeID, deadline, requestBytes) +} + +func (t ThrottlerHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) { + return t.handler.CrossChainAppRequest(ctx, chainID, deadline, requestBytes) } diff --git a/network/p2p/throttler_handler_test.go b/network/p2p/throttler_handler_test.go index 1e4ed9578bf6..79b1dc88665d 100644 --- a/network/p2p/throttler_handler_test.go +++ b/network/p2p/throttler_handler_test.go @@ -37,15 +37,15 @@ func TestThrottlerHandlerAppGossip(t *testing.T) { require := require.New(t) called := false - handler := ThrottlerHandler{ - Handler: testHandler{ + handler := NewThrottlerHandler( + testHandler{ appGossipF: func(context.Context, ids.NodeID, []byte) { called = true }, }, - Throttler: tt.Throttler, - Log: logging.NoLog{}, - } + tt.Throttler, + logging.NoLog{}, + ) handler.AppGossip(context.Background(), ids.GenerateTestNodeID(), []byte("foobar")) require.Equal(tt.expected, called) @@ -73,11 +73,11 @@ func TestThrottlerHandlerAppRequest(t *testing.T) { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - handler := ThrottlerHandler{ - Handler: NoOpHandler{}, - Throttler: tt.Throttler, - Log: logging.NoLog{}, - } + handler := NewThrottlerHandler( + NoOpHandler{}, + tt.Throttler, + logging.NoLog{}, + ) _, err := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, []byte("foobar")) require.ErrorIs(err, tt.expectedErr) }) diff --git a/network/p2p/validators_test.go b/network/p2p/validators_test.go index d1ce9dd1735e..9e90242e201d 100644 --- a/network/p2p/validators_test.go +++ b/network/p2p/validators_test.go @@ -179,7 +179,9 @@ func TestValidatorsSample(t *testing.T) { } gomock.InOrder(calls...) - network := NewNetwork(logging.NoLog{}, &common.FakeSender{}, prometheus.NewRegistry(), "") + network, err := NewNetwork(logging.NoLog{}, &common.FakeSender{}, prometheus.NewRegistry(), "") + require.NoError(err) + ctx := context.Background() require.NoError(network.Connected(ctx, nodeID1, nil)) require.NoError(network.Connected(ctx, nodeID2, nil)) @@ -187,7 +189,7 @@ func TestValidatorsSample(t *testing.T) { v := NewValidators(network.Peers, network.log, subnetID, mockValidators, tt.maxStaleness) for _, call := range tt.calls { v.lastUpdated = call.time - sampled := v.Sample(context.Background(), call.limit) + sampled := v.Sample(ctx, call.limit) require.LessOrEqual(len(sampled), call.limit) require.Subset(call.expected, sampled) }