From 3c26d85f63c96531a8f65f34388cd2e615679dbd Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Sat, 9 Dec 2023 04:20:46 -0500 Subject: [PATCH] refactor client Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- network/p2p/client.go | 25 +++++---- network/p2p/gossip/gossip_test.go | 5 +- network/p2p/network.go | 58 +++++++++++++++++--- network/p2p/network_test.go | 10 ++-- network/p2p/router.go | 90 +++++++------------------------ 7 files changed, 98 insertions(+), 96 deletions(-) diff --git a/go.mod b/go.mod index b793394d8d24..a5344b7e101a 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/DataDog/zstd v1.5.2 github.com/Microsoft/go-winio v0.5.2 github.com/NYTimes/gziphandler v1.1.1 - github.com/ava-labs/coreth v0.12.10-rc.0 + github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211031827-b15e16a50e48 github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 diff --git a/go.sum b/go.sum index 7b9a76de9663..f46170e93542 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/coreth v0.12.10-rc.0 h1:qmuom7rtH5hc1E3lnqrMFNLFL1TMnEVa/2O8poB1YLU= -github.com/ava-labs/coreth v0.12.10-rc.0/go.mod h1:plFm/xzvWmx1+qJ3JQSTzF8+FdaA2xu7GgY/AdaZDfk= +github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211031827-b15e16a50e48 h1:43Tldt5Jk1zyEEkiShGl+qQsIs5qxroPAKyT+sNMDQ0= +github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211031827-b15e16a50e48/go.mod h1:MYAGEjAJo8dy3LYITGdSwwlp71ZtBiXByCXjTr/q2eU= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/network/p2p/client.go b/network/p2p/client.go index 1c3c9bee01da..0cb9a6cc620d 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/set" ) @@ -40,11 +41,15 @@ type CrossChainAppResponseCallback func( ) type Client struct { - handlerID uint64 - handlerPrefix []byte - router *router - sender common.AppSender - options *clientOptions + handlerID uint64 + handlerPrefix []byte + router *router + sender common.AppSender + appRequestFailedTime metric.Averager + appResponseTime metric.Averager + crossChainAppRequestFailedTime metric.Averager + crossChainAppResponseTime metric.Averager + options *clientOptions } // AppRequestAny issues an AppRequest to an arbitrary node decided by Client. @@ -96,8 +101,9 @@ func (c *Client) AppRequest( } c.router.pendingAppRequests[requestID] = pendingAppRequest{ - AppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + callback: onResponse, + appRequestFailedTime: c.appRequestFailedTime, + appResponseTime: c.appResponseTime, } c.router.requestID += 2 } @@ -159,8 +165,9 @@ func (c *Client) CrossChainAppRequest( } c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{ - CrossChainAppResponseCallback: onResponse, - metrics: c.router.handlers[c.handlerID].metrics, + callback: onResponse, + crossChainAppRequestFailedTime: c.crossChainAppRequestFailedTime, + crossChainAppResponseTime: c.crossChainAppResponseTime, } c.router.requestID += 2 diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index d30fac0008e7..6585e09d0310 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -130,8 +130,7 @@ func TestGossiperGossip(t *testing.T) { handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry()) require.NoError(err) - _, err = responseNetwork.NewAppProtocol(0x0, handler) - require.NoError(err) + require.NoError(responseNetwork.AddHandler(0x0, handler)) requestSender := &common.SenderTest{ SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error { @@ -162,7 +161,7 @@ func TestGossiperGossip(t *testing.T) { require.NoError(requestSet.Add(item)) } - requestClient, err := requestNetwork.NewAppProtocol(0x0, nil) + requestClient, err := requestNetwork.NewClient(0x0) require.NoError(err) config := Config{ diff --git a/network/p2p/network.go b/network/p2p/network.go index 444c2e4b9408..a17501ebb63d 100644 --- a/network/p2p/network.go +++ b/network/p2p/network.go @@ -6,6 +6,7 @@ package p2p import ( "context" "encoding/binary" + "fmt" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" ) @@ -117,12 +119,47 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error { return nil } -// NewAppProtocol reserves an identifier for an application protocol handler and -// returns a Client that can be used to send messages for the corresponding -// protocol. -func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) { - if err := n.router.addHandler(handlerID, handler); err != nil { - return nil, err +// NewClient returns a Client that can be used to send messages for the +// corresponding protocol. +func (n *Network) NewClient(handlerID uint64, options ...ClientOption) (*Client, error) { + appRequestFailedTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("client_%d_app_request_failed_time", handlerID), + "app request failed time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register app request failed metric for client %d: %w", handlerID, err) + } + + appResponseTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("client_%d_app_response_time", handlerID), + "app response time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register app response metric for client %d: %w", handlerID, err) + } + + crossChainAppRequestFailedTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("client_%d_cross_chain_app_request_failed_time", handlerID), + "app request failed time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register cross-chain app request failed metric for client %d: %w", handlerID, err) + } + + crossChainAppResponseTime, err := metric.NewAverager( + n.namespace, + fmt.Sprintf("client_%d_cross_chain_app_response_time", handlerID), + "cross chain app response time (ns)", + n.metrics, + ) + if err != nil { + return nil, fmt.Errorf("failed to register cross-chain app response metric for client %d: %w", handlerID, err) } client := &Client{ @@ -135,6 +172,10 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C peers: n.Peers, }, }, + appRequestFailedTime: appRequestFailedTime, + appResponseTime: appResponseTime, + crossChainAppRequestFailedTime: crossChainAppRequestFailedTime, + crossChainAppResponseTime: crossChainAppResponseTime, } for _, option := range options { @@ -144,6 +185,11 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C return client, nil } +// AddHandler reserves an identifier for an application protocol +func (n *Network) AddHandler(handlerID uint64, handler Handler) error { + return n.router.addHandler(handlerID, handler) +} + // Peers contains metadata about the current set of connected peers type Peers struct { lock sync.RWMutex diff --git a/network/p2p/network_test.go b/network/p2p/network_test.go index 590858a0c467..e3c9ffe71b22 100644 --- a/network/p2p/network_test.go +++ b/network/p2p/network_test.go @@ -209,7 +209,8 @@ func TestAppRequestResponse(t *testing.T) { handler := mocks.NewMockHandler(ctrl) n := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") require.NoError(n.Connected(context.Background(), nodeID, nil)) - client, err := n.NewAppProtocol(handlerID, handler) + require.NoError(n.AddHandler(handlerID, handler)) + client, err := n.NewClient(handlerID) require.NoError(err) wg := &sync.WaitGroup{} @@ -350,7 +351,8 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) { }).AnyTimes() require.NoError(network.Connected(context.Background(), nodeID, nil)) - client, err := network.NewAppProtocol(0x1, handler) + require.NoError(network.AddHandler(0x1, handler)) + client, err := network.NewClient(0x1) require.NoError(err) onResponse := func(ctx context.Context, nodeID ids.NodeID, got []byte, err error) { @@ -493,7 +495,7 @@ func TestAppRequestAnyNodeSelection(t *testing.T) { require.NoError(n.Connected(context.Background(), peer, &version.Application{})) } - client, err := n.NewAppProtocol(1, nil) + client, err := n.NewClient(1) require.NoError(err) err = client.AppRequestAny(context.Background(), []byte("foobar"), nil) @@ -582,7 +584,7 @@ func TestNodeSamplerClientOption(t *testing.T) { require.NoError(network.Connected(ctx, peer, nil)) } - client, err := network.NewAppProtocol(0x0, nil, tt.option(t, network)) + client, err := network.NewClient(0, tt.option(t, network)) require.NoError(err) if err = client.AppRequestAny(ctx, []byte("request"), nil); err != nil { diff --git a/network/p2p/router.go b/network/p2p/router.go index 110e9b6de627..ec0ecea830d1 100644 --- a/network/p2p/router.go +++ b/network/p2p/router.go @@ -29,30 +29,24 @@ var ( _ common.AppHandler = (*router)(nil) ) -type metrics struct { - appRequestTime metric.Averager - appRequestFailedTime metric.Averager - appResponseTime metric.Averager - appGossipTime metric.Averager - crossChainAppRequestTime metric.Averager - crossChainAppRequestFailedTime metric.Averager - crossChainAppResponseTime metric.Averager -} - type pendingAppRequest struct { - *metrics - AppResponseCallback + callback AppResponseCallback + appRequestFailedTime metric.Averager + appResponseTime metric.Averager } type pendingCrossChainAppRequest struct { - *metrics - CrossChainAppResponseCallback + callback CrossChainAppResponseCallback + crossChainAppRequestFailedTime metric.Averager + crossChainAppResponseTime metric.Averager } // meteredHandler emits metrics for a Handler type meteredHandler struct { *responder - *metrics + appRequestTime metric.Averager + appGossipTime metric.Averager + crossChainAppRequestTime metric.Averager } // router routes incoming application messages to the corresponding registered @@ -109,26 +103,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { return fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err) } - appRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err) - } - - appResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_app_response", handlerID), - "app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err) - } - appGossipTime, err := metric.NewAverager( r.namespace, fmt.Sprintf("handler_%d_app_gossip", handlerID), @@ -149,26 +123,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { return fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err) } - crossChainAppRequestFailedTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID), - "app request failed time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err) - } - - crossChainAppResponseTime, err := metric.NewAverager( - r.namespace, - fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID), - "cross chain app response time (ns)", - r.metrics, - ) - if err != nil { - return fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err) - } - r.handlers[handlerID] = &meteredHandler{ responder: &responder{ Handler: handler, @@ -176,15 +130,9 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error { log: r.log, sender: r.sender, }, - metrics: &metrics{ - appRequestTime: appRequestTime, - appRequestFailedTime: appRequestFailedTime, - appResponseTime: appResponseTime, - appGossipTime: appGossipTime, - crossChainAppRequestTime: crossChainAppRequestTime, - crossChainAppRequestFailedTime: crossChainAppRequestFailedTime, - crossChainAppResponseTime: crossChainAppResponseTime, - }, + appRequestTime: appRequestTime, + appGossipTime: appGossipTime, + crossChainAppRequestTime: crossChainAppRequestTime, } return nil @@ -214,7 +162,7 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui return err } - handler.metrics.appRequestTime.Observe(float64(time.Since(start))) + handler.appRequestTime.Observe(float64(time.Since(start))) return nil } @@ -231,7 +179,7 @@ func (r *router) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reques return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, nil, ErrAppRequestFailed) + pending.callback(ctx, nodeID, nil, ErrAppRequestFailed) pending.appRequestFailedTime.Observe(float64(time.Since(start))) return nil } @@ -249,7 +197,7 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u return ErrUnrequestedResponse } - pending.AppResponseCallback(ctx, nodeID, response, nil) + pending.callback(ctx, nodeID, response, nil) pending.appResponseTime.Observe(float64(time.Since(start))) return nil } @@ -273,7 +221,7 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte handler.AppGossip(ctx, nodeID, parsedMsg) - handler.metrics.appGossipTime.Observe(float64(time.Since(start))) + handler.appGossipTime.Observe(float64(time.Since(start))) return nil } @@ -307,7 +255,7 @@ func (r *router) CrossChainAppRequest( return err } - handler.metrics.crossChainAppRequestTime.Observe(float64(time.Since(start))) + handler.crossChainAppRequestTime.Observe(float64(time.Since(start))) return nil } @@ -324,7 +272,7 @@ func (r *router) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, nil, ErrAppRequestFailed) + pending.callback(ctx, chainID, nil, ErrAppRequestFailed) pending.crossChainAppRequestFailedTime.Observe(float64(time.Since(start))) return nil } @@ -342,7 +290,7 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ return ErrUnrequestedResponse } - pending.CrossChainAppResponseCallback(ctx, chainID, response, nil) + pending.callback(ctx, chainID, response, nil) pending.crossChainAppResponseTime.Observe(float64(time.Since(start))) return nil }