Skip to content

Commit

Permalink
refactor client
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
  • Loading branch information
joshua-kim committed Dec 9, 2023
1 parent dd2c6ef commit b867a6a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 89 deletions.
17 changes: 13 additions & 4 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/set"
)

Expand Down Expand Up @@ -39,11 +40,19 @@ type CrossChainAppResponseCallback func(
err error,
)

type clientMetrics struct {
appRequestFailedTime metric.Averager
appResponseTime metric.Averager
crossChainAppRequestFailedTime metric.Averager
crossChainAppResponseTime metric.Averager
}

type Client struct {
handlerID uint64
handlerPrefix []byte
router *router
sender common.AppSender
clientMetrics *clientMetrics
options *clientOptions
}

Expand Down Expand Up @@ -96,8 +105,8 @@ func (c *Client) AppRequest(
}

c.router.pendingAppRequests[requestID] = pendingAppRequest{
AppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
callback: onResponse,
metrics: c.clientMetrics,
}
c.router.requestID += 2
}
Expand Down Expand Up @@ -159,8 +168,8 @@ func (c *Client) CrossChainAppRequest(
}

c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{
CrossChainAppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
callback: onResponse,
metrics: c.clientMetrics,
}
c.router.requestID += 2

Expand Down
5 changes: 2 additions & 3 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func TestGossiperGossip(t *testing.T) {

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseNetwork.NewAppProtocol(0x0, handler)
require.NoError(err)
require.NoError(responseNetwork.AddHandler(0x0, handler))

requestSender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error {
Expand Down Expand Up @@ -162,7 +161,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestNetwork.NewAppProtocol(0x0, nil)
requestClient, err := requestNetwork.NewClient(0x0)
require.NoError(err)

config := Config{
Expand Down
60 changes: 54 additions & 6 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package p2p
import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"

Expand All @@ -15,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)
Expand Down Expand Up @@ -117,12 +119,47 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
return nil
}

// NewAppProtocol reserves an identifier for an application protocol handler and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
if err := n.router.addHandler(handlerID, handler); err != nil {
return nil, err
// NewClient returns a Client that can be used to send messages for the
// corresponding protocol.
func (n *Network) NewClient(handlerID uint64, options ...ClientOption) (*Client, error) {
appRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_request_failed", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app request failed metric for client %d: %w", handlerID, err)
}

appResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_response", handlerID),
"app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app response metric for client %d: %w", handlerID, err)
}

crossChainAppRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app request failed metric for client %d: %w", handlerID, err)
}

crossChainAppResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID),
"cross chain app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app response metric for client %d: %w", handlerID, err)
}

client := &Client{
Expand All @@ -135,6 +172,12 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C
peers: n.Peers,
},
},
clientMetrics: &clientMetrics{
appRequestFailedTime: appRequestFailedTime,
appResponseTime: appResponseTime,
crossChainAppRequestFailedTime: crossChainAppRequestFailedTime,
crossChainAppResponseTime: crossChainAppResponseTime,
},
}

for _, option := range options {
Expand All @@ -144,6 +187,11 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C
return client, nil
}

// AddHandler reserves an identifier for an application protocol
func (n *Network) AddHandler(handlerID uint64, handler Handler) error {
return n.router.addHandler(handlerID, handler)
}

// Peers contains metadata about the current set of connected peers
type Peers struct {
lock sync.RWMutex
Expand Down
10 changes: 6 additions & 4 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func TestAppRequestResponse(t *testing.T) {
handler := mocks.NewMockHandler(ctrl)
n := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(n.Connected(context.Background(), nodeID, nil))
client, err := n.NewAppProtocol(handlerID, handler)
require.NoError(n.AddHandler(handlerID, handler))
client, err := n.NewClient(handlerID)
require.NoError(err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -350,7 +351,8 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) {
}).AnyTimes()

require.NoError(network.Connected(context.Background(), nodeID, nil))
client, err := network.NewAppProtocol(0x1, handler)
require.NoError(network.AddHandler(0x1, handler))
client, err := network.NewClient(0x1)
require.NoError(err)

onResponse := func(ctx context.Context, nodeID ids.NodeID, got []byte, err error) {
Expand Down Expand Up @@ -493,7 +495,7 @@ func TestAppRequestAnyNodeSelection(t *testing.T) {
require.NoError(n.Connected(context.Background(), peer, &version.Application{}))
}

client, err := n.NewAppProtocol(1, nil)
client, err := n.NewClient(1)
require.NoError(err)

err = client.AppRequestAny(context.Background(), []byte("foobar"), nil)
Expand Down Expand Up @@ -582,7 +584,7 @@ func TestNodeSamplerClientOption(t *testing.T) {
require.NoError(network.Connected(ctx, peer, nil))
}

client, err := network.NewAppProtocol(0x0, nil, tt.option(t, network))
client, err := network.NewClient(0, tt.option(t, network))
require.NoError(err)

if err = client.AppRequestAny(ctx, []byte("request"), nil); err != nil {
Expand Down
96 changes: 24 additions & 72 deletions network/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,26 @@ var (
_ common.AppHandler = (*router)(nil)
)

type metrics struct {
appRequestTime metric.Averager
appRequestFailedTime metric.Averager
appResponseTime metric.Averager
appGossipTime metric.Averager
crossChainAppRequestTime metric.Averager
crossChainAppRequestFailedTime metric.Averager
crossChainAppResponseTime metric.Averager
type serverMetrics struct {
appRequestTime metric.Averager
appGossipTime metric.Averager
crossChainAppRequestTime metric.Averager
}

type pendingAppRequest struct {
*metrics
AppResponseCallback
callback AppResponseCallback
metrics *clientMetrics
}

type pendingCrossChainAppRequest struct {
*metrics
CrossChainAppResponseCallback
callback CrossChainAppResponseCallback
metrics *clientMetrics
}

// meteredHandler emits metrics for a Handler
type meteredHandler struct {
*responder
*metrics
*serverMetrics
}

// router routes incoming application messages to the corresponding registered
Expand Down Expand Up @@ -109,26 +105,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error {
return fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err)
}

appRequestFailedTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_request_failed", handlerID),
"app request failed time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err)
}

appResponseTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_response", handlerID),
"app response time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err)
}

appGossipTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_gossip", handlerID),
Expand All @@ -149,41 +125,17 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error {
return fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err)
}

crossChainAppRequestFailedTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID),
"app request failed time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err)
}

crossChainAppResponseTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID),
"cross chain app response time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err)
}

r.handlers[handlerID] = &meteredHandler{
responder: &responder{
Handler: handler,
handlerID: handlerID,
log: r.log,
sender: r.sender,
},
metrics: &metrics{
appRequestTime: appRequestTime,
appRequestFailedTime: appRequestFailedTime,
appResponseTime: appResponseTime,
appGossipTime: appGossipTime,
crossChainAppRequestTime: crossChainAppRequestTime,
crossChainAppRequestFailedTime: crossChainAppRequestFailedTime,
crossChainAppResponseTime: crossChainAppResponseTime,
serverMetrics: &serverMetrics{
appRequestTime: appRequestTime,
appGossipTime: appGossipTime,
crossChainAppRequestTime: crossChainAppRequestTime,
},
}

Expand Down Expand Up @@ -214,7 +166,7 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui
return err
}

handler.metrics.appRequestTime.Observe(float64(time.Since(start)))
handler.serverMetrics.appRequestTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -231,8 +183,8 @@ func (r *router) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reques
return ErrUnrequestedResponse
}

pending.AppResponseCallback(ctx, nodeID, nil, ErrAppRequestFailed)
pending.appRequestFailedTime.Observe(float64(time.Since(start)))
pending.callback(ctx, nodeID, nil, ErrAppRequestFailed)
pending.metrics.appRequestFailedTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -249,8 +201,8 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u
return ErrUnrequestedResponse
}

pending.AppResponseCallback(ctx, nodeID, response, nil)
pending.appResponseTime.Observe(float64(time.Since(start)))
pending.callback(ctx, nodeID, response, nil)
pending.metrics.appResponseTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -273,7 +225,7 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte

handler.AppGossip(ctx, nodeID, parsedMsg)

handler.metrics.appGossipTime.Observe(float64(time.Since(start)))
handler.serverMetrics.appGossipTime.Observe(float64(time.Since(start)))
return nil
}

Expand Down Expand Up @@ -307,7 +259,7 @@ func (r *router) CrossChainAppRequest(
return err
}

handler.metrics.crossChainAppRequestTime.Observe(float64(time.Since(start)))
handler.serverMetrics.crossChainAppRequestTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -324,8 +276,8 @@ func (r *router) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID,
return ErrUnrequestedResponse
}

pending.CrossChainAppResponseCallback(ctx, chainID, nil, ErrAppRequestFailed)
pending.crossChainAppRequestFailedTime.Observe(float64(time.Since(start)))
pending.callback(ctx, chainID, nil, ErrAppRequestFailed)
pending.metrics.crossChainAppRequestFailedTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -342,8 +294,8 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ
return ErrUnrequestedResponse
}

pending.CrossChainAppResponseCallback(ctx, chainID, response, nil)
pending.crossChainAppResponseTime.Observe(float64(time.Since(start)))
pending.callback(ctx, chainID, response, nil)
pending.metrics.crossChainAppResponseTime.Observe(float64(time.Since(start)))
return nil
}

Expand Down

0 comments on commit b867a6a

Please sign in to comment.