Skip to content

Commit

Permalink
passing in the preferred execution node ids as an argument to the acc…
Browse files Browse the repository at this point in the history
…ess node to choose from
  • Loading branch information
vishalchangrani authored Mar 9, 2021
1 parent 10e92c8 commit c7b677d
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/access/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func main() {
flags.StringVarP(&rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", "", "comma separated rpc addresses for historical access nodes")
flags.DurationVar(&rpcConf.CollectionClientTimeout, "collection-client-timeout", 3*time.Second, "grpc client timeout for a collection node")
flags.DurationVar(&rpcConf.ExecutionClientTimeout, "execution-client-timeout", 3*time.Second, "grpc client timeout for an execution node")
flags.StringSliceVar(&rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", nil, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.BoolVar(&logTxTimeToFinalized, "log-tx-time-to-finalized", false, "log transaction time to finalized")
flags.BoolVar(&logTxTimeToExecuted, "log-tx-time-to-executed", false, "log transaction time to executed")
flags.BoolVar(&logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", false, "log transaction time to finalized and executed")
Expand Down
3 changes: 3 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (suite *Suite) RunTest(
suite.metrics,
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -284,6 +285,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
metrics,
connFactory, // passing in the connection factory
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -514,6 +516,7 @@ func (suite *Suite) TestExecuteScript() {
suite.metrics,
connFactory,
false,
nil,
suite.log,
)

Expand Down
26 changes: 13 additions & 13 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
// maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request
const maxExecutionNodesCnt = 3

var validENIDs = []string{"9686399a8a5418a12e762cfaeff2ea348c2137f554560917760e0d47acf2cda4",
"160241f88cbfaa0f361cf64adb0a1c9fc19dec1daf4b96550cd67b7a9fb26cd9",
"4ab025ab974e7ad7f344fbd16e5fbcb17fb8769fc8849b9d241ae518787695bd",
"0ca407c1da940952ebcc02283b60cd97c9a008e111a48ea6cf1ce8f36f1e0153",
}

var validENMap map[flow.Identifier]bool

// Backends implements the Access API.
Expand Down Expand Up @@ -72,6 +66,7 @@ func New(
transactionMetrics module.TransactionMetrics,
connFactory ConnectionFactory,
retryEnabled bool,
preferredExecutionNodeIDs []string,
log zerolog.Logger,
) *Backend {
retry := newRetry()
Expand Down Expand Up @@ -139,11 +134,11 @@ func New(

retry.SetBackend(b)

validENMap = make(map[flow.Identifier]bool, len(validENIDs))
for _, idStr := range validENIDs {
validENMap = make(map[flow.Identifier]bool, len(preferredExecutionNodeIDs))
for _, idStr := range preferredExecutionNodeIDs {
id, err := flow.HexStringToIdentifier(idStr)
if err != nil {
panic(err)
log.Fatal().Err(err).Str("node_id", idStr).Msg("failed to convert node id string to Flow Identifier")
}
validENMap[id] = true
}
Expand Down Expand Up @@ -282,27 +277,32 @@ func executionNodesForBlockID(
return flow.IdentityList{}, nil
}

// for now only query Dapperlabs ENs
allENs, err := validENs(state)
// choose one of the preferred execution nodes
subsetENs, err := chooseExecutionNodes(state)
if err != nil {
return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err)
}

// find the node identities of these execution nodes
executionIdentities := allENs.Filter(filter.HasNodeID(executorIDs...))
executionIdentities := subsetENs.Filter(filter.HasNodeID(executorIDs...))

// randomly choose upto maxExecutionNodesCnt identities
executionIdentitiesRandom := executionIdentities.Sample(maxExecutionNodesCnt)

return executionIdentitiesRandom, nil
}

func validENs(state protocol.State) (flow.IdentityList, error) {
func chooseExecutionNodes(state protocol.State) (flow.IdentityList, error) {
allENs, err := state.Final().Identities(filter.HasRole(flow.RoleExecution))
if err != nil {
return nil, fmt.Errorf("failed to retreive all execution IDs: %w", err)
}

// if the preferred list of execution ids is not defined, choose from any of the execution node
if len(validENMap) == 0 {
return allENs, nil
}

filterFn := func(identity *flow.Identity) bool {
return validENMap[identity.ID()]
}
Expand Down
19 changes: 19 additions & 0 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (suite *Suite) TestPing() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand All @@ -109,6 +110,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -137,6 +139,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -170,6 +173,7 @@ func (suite *Suite) TestGetTransaction() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -199,6 +203,7 @@ func (suite *Suite) TestGetCollection() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -277,6 +282,7 @@ func (suite *Suite) TestTransactionStatusTransition() {
metrics.NewNoopCollector(),
connFactory,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -389,6 +395,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -470,6 +477,7 @@ func (suite *Suite) TestTransactionResultUnknown() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -505,6 +513,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -624,6 +633,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand All @@ -648,6 +658,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
metrics.NewNoopCollector(),
connFactory, // the connection factory should be used to get the execution node client
false,
nil,
suite.log,
)

Expand All @@ -674,6 +685,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
metrics.NewNoopCollector(),
connFactory, // the connection factory should be used to get the execution node client
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -774,6 +786,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -805,6 +818,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -835,6 +849,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -867,6 +882,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -933,6 +949,7 @@ func (suite *Suite) TestGetAccount() {
metrics.NewNoopCollector(),
connFactory,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -998,6 +1015,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand All @@ -1021,6 +1039,7 @@ func (suite *Suite) TestGetNetworkParameters() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down
2 changes: 2 additions & 0 deletions engine/access/rpc/backend/historical_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (suite *Suite) TestHistoricalTransactionResult() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down Expand Up @@ -105,6 +106,7 @@ func (suite *Suite) TestHistoricalTransaction() {
metrics.NewNoopCollector(),
nil,
false,
nil,
suite.log,
)

Expand Down
4 changes: 2 additions & 2 deletions engine/access/rpc/backend/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (suite *Suite) TestTransactionRetry() {
// Setup Handler + Retry
backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers,
suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil,
false, suite.log)
false, nil, suite.log)
retry := newRetry().SetBackend(backend).Activate()
backend.retry = retry

Expand Down Expand Up @@ -105,7 +105,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() {
// Setup Handler + Retry
backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers,
suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil,
false, suite.log)
false, nil, suite.log)
retry := newRetry().SetBackend(backend).Activate()
backend.retry = retry

Expand Down
19 changes: 11 additions & 8 deletions engine/access/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (

// Config defines the configurable options for the access node server
type Config struct {
GRPCListenAddr string // the GRPC server address as ip:port
HTTPListenAddr string // the HTTP web proxy address as ip:port
ExecutionAddr string // the address of the upstream execution node
CollectionAddr string // the address of the upstream collection node
HistoricalAccessAddrs string // the list of all access nodes from previous spork
MaxMsgSize int // GRPC max message size
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
GRPCListenAddr string // the GRPC server address as ip:port
HTTPListenAddr string // the HTTP web proxy address as ip:port
ExecutionAddr string // the address of the upstream execution node
CollectionAddr string // the address of the upstream collection node
HistoricalAccessAddrs string // the list of all access nodes from previous spork
MaxMsgSize int // GRPC max message size
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs

}

// Engine implements a gRPC server with a simplified version of the Observation API.
Expand Down Expand Up @@ -131,6 +133,7 @@ func New(log zerolog.Logger,
transactionMetrics,
connectionFactory,
retryEnabled,
config.PreferredExecutionNodeIDs,
log,
)

Expand Down

0 comments on commit c7b677d

Please sign in to comment.