From c7b677d94db45668d8a425ca1280017c0aebe1db Mon Sep 17 00:00:00 2001 From: Vishal <1117327+vishalchangrani@users.noreply.github.com> Date: Mon, 8 Mar 2021 21:59:40 -0800 Subject: [PATCH] passing in the preferred execution node ids as an argument to the access node to choose from --- cmd/access/main.go | 1 + engine/access/access_test.go | 3 +++ engine/access/rpc/backend/backend.go | 26 +++++++++---------- engine/access/rpc/backend/backend_test.go | 19 ++++++++++++++ .../rpc/backend/historical_access_test.go | 2 ++ engine/access/rpc/backend/retry_test.go | 4 +-- engine/access/rpc/engine.go | 19 ++++++++------ 7 files changed, 51 insertions(+), 23 deletions(-) diff --git a/cmd/access/main.go b/cmd/access/main.go index 5276c3cf936..89029b68a96 100644 --- a/cmd/access/main.go +++ b/cmd/access/main.go @@ -91,6 +91,7 @@ func main() { flags.StringVarP(&rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", "", "comma separated rpc addresses for historical access nodes") flags.DurationVar(&rpcConf.CollectionClientTimeout, "collection-client-timeout", 3*time.Second, "grpc client timeout for a collection node") flags.DurationVar(&rpcConf.ExecutionClientTimeout, "execution-client-timeout", 3*time.Second, "grpc client timeout for an execution node") + flags.StringSliceVar(&rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", nil, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.") flags.BoolVar(&logTxTimeToFinalized, "log-tx-time-to-finalized", false, "log transaction time to finalized") flags.BoolVar(&logTxTimeToExecuted, "log-tx-time-to-executed", false, "log transaction time to executed") flags.BoolVar(&logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", false, "log transaction time to finalized and executed") diff --git a/engine/access/access_test.go b/engine/access/access_test.go index ba3459108a5..3b374c3211b 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -111,6 +111,7 @@ func (suite *Suite) RunTest( suite.metrics, nil, false, + nil, suite.log, ) @@ -284,6 +285,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() { metrics, connFactory, // passing in the connection factory false, + nil, suite.log, ) @@ -514,6 +516,7 @@ func (suite *Suite) TestExecuteScript() { suite.metrics, connFactory, false, + nil, suite.log, ) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 276442d761d..fbf3c13d3ee 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -22,12 +22,6 @@ import ( // maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request const maxExecutionNodesCnt = 3 -var validENIDs = []string{"9686399a8a5418a12e762cfaeff2ea348c2137f554560917760e0d47acf2cda4", - "160241f88cbfaa0f361cf64adb0a1c9fc19dec1daf4b96550cd67b7a9fb26cd9", - "4ab025ab974e7ad7f344fbd16e5fbcb17fb8769fc8849b9d241ae518787695bd", - "0ca407c1da940952ebcc02283b60cd97c9a008e111a48ea6cf1ce8f36f1e0153", -} - var validENMap map[flow.Identifier]bool // Backends implements the Access API. @@ -72,6 +66,7 @@ func New( transactionMetrics module.TransactionMetrics, connFactory ConnectionFactory, retryEnabled bool, + preferredExecutionNodeIDs []string, log zerolog.Logger, ) *Backend { retry := newRetry() @@ -139,11 +134,11 @@ func New( retry.SetBackend(b) - validENMap = make(map[flow.Identifier]bool, len(validENIDs)) - for _, idStr := range validENIDs { + validENMap = make(map[flow.Identifier]bool, len(preferredExecutionNodeIDs)) + for _, idStr := range preferredExecutionNodeIDs { id, err := flow.HexStringToIdentifier(idStr) if err != nil { - panic(err) + log.Fatal().Err(err).Str("node_id", idStr).Msg("failed to convert node id string to Flow Identifier") } validENMap[id] = true } @@ -282,14 +277,14 @@ func executionNodesForBlockID( return flow.IdentityList{}, nil } - // for now only query Dapperlabs ENs - allENs, err := validENs(state) + // choose one of the preferred execution nodes + subsetENs, err := chooseExecutionNodes(state) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } // find the node identities of these execution nodes - executionIdentities := allENs.Filter(filter.HasNodeID(executorIDs...)) + executionIdentities := subsetENs.Filter(filter.HasNodeID(executorIDs...)) // randomly choose upto maxExecutionNodesCnt identities executionIdentitiesRandom := executionIdentities.Sample(maxExecutionNodesCnt) @@ -297,12 +292,17 @@ func executionNodesForBlockID( return executionIdentitiesRandom, nil } -func validENs(state protocol.State) (flow.IdentityList, error) { +func chooseExecutionNodes(state protocol.State) (flow.IdentityList, error) { allENs, err := state.Final().Identities(filter.HasRole(flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retreive all execution IDs: %w", err) } + // if the preferred list of execution ids is not defined, choose from any of the execution node + if len(validENMap) == 0 { + return allENs, nil + } + filterFn := func(identity *flow.Identity) bool { return validENMap[identity.ID()] } diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 57b0ebb6145..c0e10a39616 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -87,6 +87,7 @@ func (suite *Suite) TestPing() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -109,6 +110,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -137,6 +139,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -170,6 +173,7 @@ func (suite *Suite) TestGetTransaction() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -199,6 +203,7 @@ func (suite *Suite) TestGetCollection() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -277,6 +282,7 @@ func (suite *Suite) TestTransactionStatusTransition() { metrics.NewNoopCollector(), connFactory, false, + nil, suite.log, ) @@ -389,6 +395,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -470,6 +477,7 @@ func (suite *Suite) TestTransactionResultUnknown() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -505,6 +513,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -624,6 +633,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -648,6 +658,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { metrics.NewNoopCollector(), connFactory, // the connection factory should be used to get the execution node client false, + nil, suite.log, ) @@ -674,6 +685,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { metrics.NewNoopCollector(), connFactory, // the connection factory should be used to get the execution node client false, + nil, suite.log, ) @@ -774,6 +786,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -805,6 +818,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -835,6 +849,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -867,6 +882,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -933,6 +949,7 @@ func (suite *Suite) TestGetAccount() { metrics.NewNoopCollector(), connFactory, false, + nil, suite.log, ) @@ -998,6 +1015,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -1021,6 +1039,7 @@ func (suite *Suite) TestGetNetworkParameters() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) diff --git a/engine/access/rpc/backend/historical_access_test.go b/engine/access/rpc/backend/historical_access_test.go index c489a83818b..b154763891d 100644 --- a/engine/access/rpc/backend/historical_access_test.go +++ b/engine/access/rpc/backend/historical_access_test.go @@ -51,6 +51,7 @@ func (suite *Suite) TestHistoricalTransactionResult() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) @@ -105,6 +106,7 @@ func (suite *Suite) TestHistoricalTransaction() { metrics.NewNoopCollector(), nil, false, + nil, suite.log, ) diff --git a/engine/access/rpc/backend/retry_test.go b/engine/access/rpc/backend/retry_test.go index 698069de6ef..4188ab26483 100644 --- a/engine/access/rpc/backend/retry_test.go +++ b/engine/access/rpc/backend/retry_test.go @@ -42,7 +42,7 @@ func (suite *Suite) TestTransactionRetry() { // Setup Handler + Retry backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil, - false, suite.log) + false, nil, suite.log) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry @@ -105,7 +105,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() { // Setup Handler + Retry backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil, - false, suite.log) + false, nil, suite.log) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index 54375e5a1e8..fa2a5887361 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -28,14 +28,16 @@ import ( // Config defines the configurable options for the access node server type Config struct { - GRPCListenAddr string // the GRPC server address as ip:port - HTTPListenAddr string // the HTTP web proxy address as ip:port - ExecutionAddr string // the address of the upstream execution node - CollectionAddr string // the address of the upstream collection node - HistoricalAccessAddrs string // the list of all access nodes from previous spork - MaxMsgSize int // GRPC max message size - ExecutionClientTimeout time.Duration // execution API GRPC client timeout - CollectionClientTimeout time.Duration // collection API GRPC client timeout + GRPCListenAddr string // the GRPC server address as ip:port + HTTPListenAddr string // the HTTP web proxy address as ip:port + ExecutionAddr string // the address of the upstream execution node + CollectionAddr string // the address of the upstream collection node + HistoricalAccessAddrs string // the list of all access nodes from previous spork + MaxMsgSize int // GRPC max message size + ExecutionClientTimeout time.Duration // execution API GRPC client timeout + CollectionClientTimeout time.Duration // collection API GRPC client timeout + PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs + } // Engine implements a gRPC server with a simplified version of the Observation API. @@ -131,6 +133,7 @@ func New(log zerolog.Logger, transactionMetrics, connectionFactory, retryEnabled, + config.PreferredExecutionNodeIDs, log, )