diff --git a/network/p2p/client.go b/network/p2p/client.go index 1c3c9bee01da..e4418cc87234 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/set" ) @@ -39,11 +40,19 @@ type CrossChainAppResponseCallback func( err error, ) +type clientMetrics struct { + appRequestFailedTime metric.Averager + appResponseTime metric.Averager + crossChainAppRequestFailedTime metric.Averager + crossChainAppResponseTime metric.Averager +} + type Client struct { handlerID uint64 handlerPrefix []byte router *router sender common.AppSender + clientMetrics *clientMetrics options *clientOptions } @@ -96,8 +105,8 @@ func (c *Client) AppRequest( } c.router.pendingAppRequests[requestID] = pendingAppRequest{ - AppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + callback: onResponse, + metrics: c.clientMetrics, } c.router.requestID += 2 } @@ -159,8 +168,8 @@ func (c *Client) CrossChainAppRequest( } c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{ - CrossChainAppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + callback: onResponse, + metrics: c.clientMetrics, } c.router.requestID += 2 diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index d30fac0008e7..6585e09d0310 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -130,8 +130,7 @@ func TestGossiperGossip(t *testing.T) { handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry()) require.NoError(err) - _, err = responseNetwork.NewAppProtocol(0x0, handler) - require.NoError(err) + require.NoError(responseNetwork.AddHandler(0x0, handler)) requestSender := &common.SenderTest{ SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error { @@ -162,7 +161,7 @@ func TestGossiperGossip(t *testing.T) { require.NoError(requestSet.Add(item)) } - requestClient, err := requestNetwork.NewAppProtocol(0x0, nil) + requestClient, err := requestNetwork.NewClient(0x0) require.NoError(err) config := Config{ diff --git a/network/p2p/network.go b/network/p2p/network.go index 444c2e4b9408..8c230e2a2abe 100644 --- a/network/p2p/network.go +++ b/network/p2p/network.go @@ -6,6 +6,7 @@ package p2p import ( "context" "encoding/binary" + "fmt" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" ) @@ -117,12 +119,47 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error { return nil } -// NewAppProtocol reserves an identifier for an application protocol handler and -// returns a Client that can be used to send messages for the corresponding -// protocol. -func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) { - if err := n.router.addHandler(handlerID, handler); err != nil { - return nil, err +// NewClient returns a Client that can be used to send messages for the +// corresponding protocol. +func (n *Network) NewClient(handlerID uint64, options ...ClientOption) (*Client, error) { + appRequestFailedTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("handler_%d_app_request_failed", handlerID), + "app request failed time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register app request failed metric for client %d: %w", handlerID, err) + } + + appResponseTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("handler_%d_app_response", handlerID), + "app response time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register app response metric for client %d: %w", handlerID, err) + } + + crossChainAppRequestFailedTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID), + "app request failed time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register cross-chain app request failed metric for client %d: %w", handlerID, err) + } + + crossChainAppResponseTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID), + "cross chain app response time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register cross-chain app response metric for client %d: %w", handlerID, err) } client := &Client{ @@ -135,6 +172,12 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C peers: n.Peers, }, }, + clientMetrics: &clientMetrics{ + appRequestFailedTime: appRequestFailedTime, + appResponseTime: appResponseTime, + crossChainAppRequestFailedTime: crossChainAppRequestFailedTime, + crossChainAppResponseTime: crossChainAppResponseTime, + }, } for _, option := range options { @@ -144,6 +187,11 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C return client, nil } +// AddHandler reserves an identifier for an application protocol +func (n *Network) AddHandler(handlerID uint64, handler Handler) error { + return n.router.addHandler(handlerID, handler) +} + // Peers contains metadata about the current set of connected peers type Peers struct { lock sync.RWMutex diff --git a/network/p2p/network_test.go b/network/p2p/network_test.go index 590858a0c467..e3c9ffe71b22 100644 --- a/network/p2p/network_test.go +++ b/network/p2p/network_test.go @@ -209,7 +209,8 @@ func TestAppRequestResponse(t *testing.T) { handler := mocks.NewMockHandler(ctrl) n := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(n.Connected(context.Background(), nodeID, nil)) - client, err := n.NewAppProtocol(handlerID, handler) + require.NoError(n.AddHandler(handlerID, handler)) + client, err := n.NewClient(handlerID) require.NoError(err) wg := &sync.WaitGroup{} @@ -350,7 +351,8 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { }).AnyTimes() require.NoError(network.Connected(context.Background(), nodeID, nil)) - client, err := network.NewAppProtocol(0x1, handler) + require.NoError(network.AddHandler(0x1, handler)) + client, err := network.NewClient(0x1) require.NoError(err) onResponse := func(ctx context.Context, nodeID ids.NodeID, got []byte, err error) { @@ -493,7 +495,7 @@ func TestAppRequestAnyNodeSelection(t *testing.T) { require.NoError(n.Connected(context.Background(), peer, &version.Application{})) } - client, err := n.NewAppProtocol(1, nil) + client, err := n.NewClient(1) require.NoError(err) err = client.AppRequestAny(context.Background(), []byte("foobar"), nil) @@ -582,7 +584,7 @@ func TestNodeSamplerClientOption(t *testing.T) { require.NoError(network.Connected(ctx, peer, nil)) } - client, err := network.NewAppProtocol(0x0, nil, tt.option(t, network)) + client, err := network.NewClient(0, tt.option(t, network)) require.NoError(err) if err = client.AppRequestAny(ctx, []byte("request"), nil); err != nil { diff --git a/network/p2p/router.go b/network/p2p/router.go index 110e9b6de627..137a344fb805 100644 --- a/network/p2p/router.go +++ b/network/p2p/router.go @@ -29,30 +29,26 @@ var ( _ common.AppHandler = (*router)(nil) ) -type metrics struct { - appRequestTime metric.Averager - appRequestFailedTime metric.Averager - appResponseTime metric.Averager - appGossipTime metric.Averager - crossChainAppRequestTime metric.Averager - crossChainAppRequestFailedTime metric.Averager - crossChainAppResponseTime metric.Averager +type serverMetrics struct { + appRequestTime metric.Averager + appGossipTime metric.Averager + crossChainAppRequestTime metric.Averager } type pendingAppRequest struct { - *metrics - AppResponseCallback + callback AppResponseCallback + metrics *clientMetrics } type pendingCrossChainAppRequest struct { - *metrics - CrossChainAppResponseCallback + callback CrossChainAppResponseCallback + metrics *clientMetrics } // meteredHandler emits metrics for a Handler type meteredHandler struct { *responder - *metrics + *serverMetrics } // router routes incoming application messages to the corresponding registered @@ -109,26 +105,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { return fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err) } - appRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err) - } - - appResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_response", handlerID), - "app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err) - } - appGossipTime, err := metric.NewAverager( r.namespace, fmt.Sprintf("handler_%d_app_gossip", handlerID), @@ -149,26 +125,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { return fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err) } - crossChainAppRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err) - } - - crossChainAppResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID), - "cross chain app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err) - } - r.handlers[handlerID] = &meteredHandler{ responder: &responder{ Handler: handler, @@ -176,14 +132,10 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { log: r.log, sender: r.sender, }, - metrics: &metrics{ - appRequestTime: appRequestTime, - appRequestFailedTime: appRequestFailedTime, - appResponseTime: appResponseTime, - appGossipTime: appGossipTime, - crossChainAppRequestTime: crossChainAppRequestTime, - crossChainAppRequestFailedTime: crossChainAppRequestFailedTime, - crossChainAppResponseTime: crossChainAppResponseTime, + serverMetrics: &serverMetrics{ + appRequestTime: appRequestTime, + appGossipTime: appGossipTime, + crossChainAppRequestTime: crossChainAppRequestTime, }, } @@ -214,7 +166,7 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui return err } - handler.metrics.appRequestTime.Observe(float64(time.Since(start))) + handler.serverMetrics.appRequestTime.Observe(float64(time.Since(start))) return nil } @@ -231,8 +183,8 @@ func (r *router) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reques return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, nil, ErrAppRequestFailed) - pending.appRequestFailedTime.Observe(float64(time.Since(start))) + pending.callback(ctx, nodeID, nil, ErrAppRequestFailed) + pending.metrics.appRequestFailedTime.Observe(float64(time.Since(start))) return nil } @@ -249,8 +201,8 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, response, nil) - pending.appResponseTime.Observe(float64(time.Since(start))) + pending.callback(ctx, nodeID, response, nil) + pending.metrics.appResponseTime.Observe(float64(time.Since(start))) return nil } @@ -273,7 +225,7 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte handler.AppGossip(ctx, nodeID, parsedMsg) - handler.metrics.appGossipTime.Observe(float64(time.Since(start))) + handler.serverMetrics.appGossipTime.Observe(float64(time.Since(start))) return nil } @@ -307,7 +259,7 @@ func (r *router) CrossChainAppRequest( return err } - handler.metrics.crossChainAppRequestTime.Observe(float64(time.Since(start))) + handler.serverMetrics.crossChainAppRequestTime.Observe(float64(time.Since(start))) return nil } @@ -324,8 +276,8 @@ func (r *router) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, nil, ErrAppRequestFailed) - pending.crossChainAppRequestFailedTime.Observe(float64(time.Since(start))) + pending.callback(ctx, chainID, nil, ErrAppRequestFailed) + pending.metrics.crossChainAppRequestFailedTime.Observe(float64(time.Since(start))) return nil } @@ -342,8 +294,8 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, response, nil) - pending.crossChainAppResponseTime.Observe(float64(time.Since(start))) + pending.callback(ctx, chainID, response, nil) + pending.metrics.crossChainAppResponseTime.Observe(float64(time.Since(start))) return nil }