diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 791973f6478..f083aaed0fd 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -162,6 +162,12 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { PreferredExecutionNodeIDs: nil, FixedExecutionNodeIDs: nil, ArchiveAddressList: nil, + CircuitBreakerConfig: rpcConnection.CircuitBreakerConfig{ + Enabled: false, + RestoreTimeout: 60 * time.Second, + MaxFailures: 5, + MaxRequests: 1, + }, }, MaxMsgSize: grpcutils.DefaultMaxMsgSize, }, @@ -690,7 +696,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.") flags.BoolVar(&builder.supportsObserver, "supports-observer", defaultConfig.supportsObserver, "true if this staked access node supports observer or follower connections") flags.StringVar(&builder.PublicNetworkConfig.BindAddress, "public-network-address", defaultConfig.PublicNetworkConfig.BindAddress, "staked access node's public network bind address") - + flags.BoolVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "circuit-breaker-enabled", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "specifies whether the circuit breaker is enabled for collection and execution API clients.") + flags.DurationVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "circuit-breaker-restore-timeout", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "duration after which the circuit breaker will restore the connection to the client after closing it due to failures. Default value is 60s") + flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "circuit-breaker-max-failures", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "maximum number of failed calls to the client that will cause the circuit breaker to close the connection. Default value is 5") + flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "circuit-breaker-max-requests", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "maximum number of requests to check if connection restored after timeout. Default value is 1") // ExecutionDataRequester config flags.BoolVar(&builder.executionDataSyncEnabled, "execution-data-sync-enabled", defaultConfig.executionDataSyncEnabled, "whether to enable the execution data sync protocol") flags.StringVar(&builder.executionDataDir, "execution-data-dir", defaultConfig.executionDataDir, "directory to use for Execution Data database") @@ -754,6 +763,17 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { return errors.New("state-stream-response-limit must be greater than or equal to 0") } } + if builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled { + if builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures == 0 { + return errors.New("circuit-breaker-max-failures must be greater than 0") + } + if builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests == 0 { + return errors.New("circuit-breaker-max-requests must be greater than 0") + } + if builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout <= 0 { + return errors.New("circuit-breaker-restore-timeout must be greater than 0") + } + } return nil }) @@ -1065,6 +1085,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { node.Logger, accessMetrics, config.MaxMsgSize, + backendConfig.CircuitBreakerConfig, ), } @@ -1087,7 +1108,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { backendConfig.FixedExecutionNodeIDs, node.Logger, backend.DefaultSnapshotHistoryLimit, - backendConfig.ArchiveAddressList) + backendConfig.ArchiveAddressList, + backendConfig.CircuitBreakerConfig.Enabled) engineBuilder, err := rpc.NewBuilder( node.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 151df430ad3..1e7687578c2 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -922,6 +922,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { node.Logger, accessMetrics, config.MaxMsgSize, + backendConfig.CircuitBreakerConfig, ), } @@ -944,7 +945,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { backendConfig.FixedExecutionNodeIDs, node.Logger, backend.DefaultSnapshotHistoryLimit, - backendConfig.ArchiveAddressList) + backendConfig.ArchiveAddressList, + backendConfig.CircuitBreakerConfig.Enabled) observerCollector := metrics.NewObserverCollector() restHandler, err := restapiproxy.NewRestProxyHandler( diff --git a/cmd/util/cmd/execution-state-extract/export_report.json b/cmd/util/cmd/execution-state-extract/export_report.json index 478522206f2..f33cbf40cb9 100644 --- a/cmd/util/cmd/execution-state-extract/export_report.json +++ b/cmd/util/cmd/execution-state-extract/export_report.json @@ -1,6 +1,6 @@ { - "EpochCounter": 0, - "PreviousStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b", - "CurrentStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b", - "ReportSucceeded": true + "EpochCounter": 0, + "PreviousStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b", + "CurrentStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b", + "ReportSucceeded": true } diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 63d7af2d76c..b6d4051e9ba 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -157,6 +157,7 @@ func (suite *Suite) RunTest( suite.log, backend.DefaultSnapshotHistoryLimit, nil, + false, ) handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, access.WithBlockSignerDecoder(suite.signerIndicesDecoder)) f(handler, db, all) @@ -329,6 +330,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() { suite.log, backend.DefaultSnapshotHistoryLimit, nil, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -655,6 +657,7 @@ func (suite *Suite) TestGetSealedTransaction() { suite.log, backend.DefaultSnapshotHistoryLimit, nil, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -794,6 +797,7 @@ func (suite *Suite) TestGetTransactionResult() { suite.log, backend.DefaultSnapshotHistoryLimit, nil, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -985,6 +989,7 @@ func (suite *Suite) TestExecuteScript() { suite.log, backend.DefaultSnapshotHistoryLimit, nil, + false, ) handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go index af658b390fd..e2cf78ade5a 100644 --- a/engine/access/integration_unsecure_grpc_server_test.go +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -188,7 +188,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { nil, suite.log, 0, - nil) + nil, + false) // create rpc engine builder rpcEngBuilder, err := rpc.NewBuilder( diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 24ecf554627..091c5e2e3ad 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -169,7 +169,8 @@ func (suite *RestAPITestSuite) SetupTest() { nil, suite.log, 0, - nil) + nil, + false) rpcEngBuilder, err := rpc.NewBuilder( suite.log, diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 37277c4dc1f..b6720242717 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -7,11 +7,10 @@ import ( "strconv" "time" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - lru "github.com/hashicorp/golang-lru" "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/cmd/build" @@ -27,9 +26,6 @@ import ( accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) -// maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request -const maxExecutionNodesCnt = 3 - // minExecutionNodesCnt is the minimum number of execution nodes expected to have sent the execution receipt for a block const minExecutionNodesCnt = 2 @@ -84,13 +80,14 @@ type Backend struct { // Config defines the configurable options for creating Backend type Config struct { - ExecutionClientTimeout time.Duration // execution API GRPC client timeout - CollectionClientTimeout time.Duration // collection API GRPC client timeout - ConnectionPoolSize uint // size of the cache for storing collection and execution connections - MaxHeightRange uint // max size of height range requests - PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs - FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs - ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node + ExecutionClientTimeout time.Duration // execution API GRPC client timeout + CollectionClientTimeout time.Duration // collection API GRPC client timeout + ConnectionPoolSize uint // size of the cache for storing collection and execution connections + MaxHeightRange uint // max size of height range requests + PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs + FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs + ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node + CircuitBreakerConfig connection.CircuitBreakerConfig // the configuration for circuit breaker } func New( @@ -113,6 +110,7 @@ func New( log zerolog.Logger, snapshotHistoryLimit int, archiveAddressList []string, + circuitBreakerEnabled bool, ) *Backend { retry := newRetry() if retryEnabled { @@ -133,6 +131,9 @@ func New( archivePorts[idx] = port } + // create node communicator, that will be used in sub-backend logic for interacting with API calls + nodeCommunicator := NewNodeCommunicator(circuitBreakerEnabled) + b := &Backend{ state: state, // create the sub-backends @@ -146,6 +147,7 @@ func New( loggedScripts: loggedScripts, archiveAddressList: archiveAddressList, archivePorts: archivePorts, + nodeCommunicator: nodeCommunicator, }, backendTransactions: backendTransactions{ staticCollectionRPC: collectionRPC, @@ -161,6 +163,7 @@ func New( connFactory: connFactory, previousAccessNodes: historicalAccessNodes, log: log, + nodeCommunicator: nodeCommunicator, }, backendEvents: backendEvents{ state: state, @@ -169,6 +172,7 @@ func New( connFactory: connFactory, log: log, maxHeightRange: maxHeightRange, + nodeCommunicator: nodeCommunicator, }, backendBlockHeaders: backendBlockHeaders{ headers: headers, @@ -184,6 +188,7 @@ func New( executionReceipts: executionReceipts, connFactory: connFactory, log: log, + nodeCommunicator: nodeCommunicator, }, backendExecutionResults: backendExecutionResults{ executionResults: executionResults, @@ -338,7 +343,7 @@ func (b *Backend) GetLatestProtocolStateSnapshot(_ context.Context) ([]byte, err return convert.SnapshotToBytes(validSnapshot) } -// executionNodesForBlockID returns upto maxExecutionNodesCnt number of randomly chosen execution node identities +// executionNodesForBlockID returns upto maxNodesCnt number of randomly chosen execution node identities // which have executed the given block ID. // If no such execution node is found, an InsufficientExecutionReceipts error is returned. func executionNodesForBlockID( @@ -409,17 +414,11 @@ func executionNodesForBlockID( return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } - // randomly choose upto maxExecutionNodesCnt identities - executionIdentitiesRandom, err := subsetENs.Sample(maxExecutionNodesCnt) - if err != nil { - return nil, fmt.Errorf("sampling failed: %w", err) - } - - if len(executionIdentitiesRandom) == 0 { + if len(subsetENs) == 0 { return nil, fmt.Errorf("no matching execution node found for block ID %v", blockID) } - return executionIdentitiesRandom, nil + return subsetENs, nil } // findAllExecutionNodes find all the execution nodes ids from the execution receipts that have been received for the diff --git a/engine/access/rpc/backend/backend_accounts.go b/engine/access/rpc/backend/backend_accounts.go index a3cf47bb00d..35f8f0bf4df 100644 --- a/engine/access/rpc/backend/backend_accounts.go +++ b/engine/access/rpc/backend/backend_accounts.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/hashicorp/go-multierror" "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -25,6 +24,7 @@ type backendAccounts struct { executionReceipts storage.ExecutionReceipts connFactory connection.ConnectionFactory log zerolog.Logger + nodeCommunicator *NodeCommunicator } func (b *backendAccounts) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) { @@ -109,34 +109,39 @@ func (b *backendAccounts) getAccountAtBlockID( // other ENs are logged and swallowed. If all ENs fail to return a valid response, then an // error aggregating all failures is returned. func (b *backendAccounts) getAccountFromAnyExeNode(ctx context.Context, execNodes flow.IdentityList, req *execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) { - var errors *multierror.Error - for _, execNode := range execNodes { - // TODO: use the GRPC Client interceptor - start := time.Now() - - resp, err := b.tryGetAccount(ctx, execNode, req) - duration := time.Since(start) - if err == nil { - // return if any execution node replied successfully - b.log.Debug(). - Str("execution_node", execNode.String()). + var resp *execproto.GetAccountAtBlockIDResponse + errToReturn := b.nodeCommunicator.CallAvailableNode( + execNodes, + func(node *flow.Identity) error { + var err error + // TODO: use the GRPC Client interceptor + start := time.Now() + + resp, err = b.tryGetAccount(ctx, node, req) + duration := time.Since(start) + if err == nil { + // return if any execution node replied successfully + b.log.Debug(). + Str("execution_node", node.String()). + Hex("block_id", req.GetBlockId()). + Hex("address", req.GetAddress()). + Int64("rtt_ms", duration.Milliseconds()). + Msg("Successfully got account info") + return nil + } + b.log.Error(). + Str("execution_node", node.String()). Hex("block_id", req.GetBlockId()). Hex("address", req.GetAddress()). Int64("rtt_ms", duration.Milliseconds()). - Msg("Successfully got account info") - return resp, nil - } - b.log.Error(). - Str("execution_node", execNode.String()). - Hex("block_id", req.GetBlockId()). - Hex("address", req.GetAddress()). - Int64("rtt_ms", duration.Milliseconds()). - Err(err). - Msg("failed to execute GetAccount") - errors = multierror.Append(errors, err) - } - - return nil, errors.ErrorOrNil() + Err(err). + Msg("failed to execute GetAccount") + return err + }, + nil, + ) + + return resp, errToReturn } func (b *backendAccounts) tryGetAccount(ctx context.Context, execNode *flow.Identity, req *execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) { @@ -148,9 +153,6 @@ func (b *backendAccounts) tryGetAccount(ctx context.Context, execNode *flow.Iden resp, err := execRPCClient.GetAccountAtBlockID(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(execNode.Address) - } return nil, err } return resp, nil diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 2745d92bf00..d0b52820ee4 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -7,7 +7,6 @@ import ( "fmt" "time" - "github.com/hashicorp/go-multierror" execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "google.golang.org/grpc/codes" @@ -28,6 +27,7 @@ type backendEvents struct { connFactory connection.ConnectionFactory log zerolog.Logger maxHeightRange uint + nodeCommunicator *NodeCommunicator } // GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and @@ -210,31 +210,37 @@ func verifyAndConvertToAccessEvents( func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context, execNodes flow.IdentityList, req *execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) { - var errors *multierror.Error - // try to get events from one of the execution nodes - for _, execNode := range execNodes { - start := time.Now() - resp, err := b.tryGetEvents(ctx, execNode, req) - duration := time.Since(start) - - logger := b.log.With(). - Str("execution_node", execNode.String()). - Str("event", req.GetType()). - Int("blocks", len(req.BlockIds)). - Int64("rtt_ms", duration.Milliseconds()). - Logger() - - if err == nil { - // return if any execution node replied successfully - logger.Debug().Msg("Successfully got events") - return resp, execNode, nil - } - - logger.Err(err).Msg("failed to execute GetEvents") - - errors = multierror.Append(errors, err) - } - return nil, nil, errors.ErrorOrNil() + var resp *execproto.GetEventsForBlockIDsResponse + var execNode *flow.Identity + errToReturn := b.nodeCommunicator.CallAvailableNode( + execNodes, + func(node *flow.Identity) error { + var err error + start := time.Now() + resp, err = b.tryGetEvents(ctx, node, req) + duration := time.Since(start) + + logger := b.log.With(). + Str("execution_node", node.String()). + Str("event", req.GetType()). + Int("blocks", len(req.BlockIds)). + Int64("rtt_ms", duration.Milliseconds()). + Logger() + + if err == nil { + // return if any execution node replied successfully + logger.Debug().Msg("Successfully got events") + execNode = node + return nil + } + + logger.Err(err).Msg("failed to execute GetEvents") + return err + }, + nil, + ) + + return resp, execNode, errToReturn } func (b *backendEvents) tryGetEvents(ctx context.Context, @@ -248,9 +254,6 @@ func (b *backendEvents) tryGetEvents(ctx context.Context, resp, err := execRPCClient.GetEventsForBlockIDs(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(execNode.Address) - } return nil, err } return resp, nil diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 6ac4d2085f6..62d32c56211 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -9,7 +9,6 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/onflow/flow/protobuf/go/flow/access" - "github.com/hashicorp/go-multierror" execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "google.golang.org/grpc/codes" @@ -36,6 +35,7 @@ type backendScripts struct { loggedScripts *lru.Cache archiveAddressList []string archivePorts []uint + nodeCommunicator *NodeCommunicator } func (b *backendScripts) ExecuteScriptAtLatestBlock( @@ -86,21 +86,6 @@ func (b *backendScripts) ExecuteScriptAtBlockHeight( return b.executeScriptOnExecutor(ctx, blockID, script, arguments) } -func (b *backendScripts) findScriptExecutors( - ctx context.Context, - blockID flow.Identifier, -) ([]string, error) { - executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) - if err != nil { - return nil, err - } - executorAddrs := make([]string, 0, len(executors)) - for _, executor := range executors { - executorAddrs = append(executorAddrs, executor.Address) - } - return executorAddrs, nil -} - // executeScriptOnExecutionNode forwards the request to the execution node using the execution node // grpc client and converts the response back to the access node api response format func (b *backendScripts) executeScriptOnExecutor( @@ -110,7 +95,7 @@ func (b *backendScripts) executeScriptOnExecutor( arguments [][]byte, ) ([]byte, error) { // find few execution nodes which have executed the block earlier and provided an execution receipt for it - scriptExecutors, err := b.findScriptExecutors(ctx, blockID) + executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) if err != nil { return nil, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", blockID.String(), err) } @@ -119,7 +104,7 @@ func (b *backendScripts) executeScriptOnExecutor( // *DO NOT* use this hash for any protocol-related or cryptographic functions. insecureScriptHash := md5.Sum(script) //nolint:gosec - // try execution on Archive nodes first + // try execution on Archive nodes if len(b.archiveAddressList) > 0 { startTime := time.Now() for idx, rnAddr := range b.archiveAddressList { @@ -154,54 +139,65 @@ func (b *backendScripts) executeScriptOnExecutor( } } } - // try execution nodes if the script wasn't executed - var errors *multierror.Error + // try to execute the script on one of the execution nodes found - for _, executorAddress := range scriptExecutors { - execStartTime := time.Now() // record start time - result, err := b.tryExecuteScriptOnExecutionNode(ctx, executorAddress, blockID, script, arguments) - if err == nil { - if b.log.GetLevel() == zerolog.DebugLevel { - executionTime := time.Now() - if b.shouldLogScript(executionTime, insecureScriptHash) { - b.log.Debug(). - Str("script_executor_addr", executorAddress). - Hex("block_id", blockID[:]). - Hex("script_hash", insecureScriptHash[:]). - Str("script", string(script)). - Msg("Successfully executed script") - b.loggedScripts.Add(insecureScriptHash, executionTime) + var result []byte + hasInvalidArgument := false + errToReturn := b.nodeCommunicator.CallAvailableNode( + executors, + func(node *flow.Identity) error { + execStartTime := time.Now() + result, err = b.tryExecuteScriptOnExecutionNode(ctx, node.Address, blockID, script, arguments) + if err == nil { + if b.log.GetLevel() == zerolog.DebugLevel { + executionTime := time.Now() + if b.shouldLogScript(executionTime, insecureScriptHash) { + b.log.Debug(). + Str("script_executor_addr", node.Address). + Hex("block_id", blockID[:]). + Hex("script_hash", insecureScriptHash[:]). + Str("script", string(script)). + Msg("Successfully executed script") + b.loggedScripts.Add(insecureScriptHash, executionTime) + } } + + // log execution time + b.metrics.ScriptExecuted( + time.Since(execStartTime), + len(script), + ) + + return nil } - // log execution time - b.metrics.ScriptExecuted( - time.Since(execStartTime), - len(script), - ) + return err + }, + func(node *flow.Identity, err error) bool { + hasInvalidArgument = status.Code(err) == codes.InvalidArgument + if hasInvalidArgument { + b.log.Debug().Err(err). + Str("script_executor_addr", node.Address). + Hex("block_id", blockID[:]). + Hex("script_hash", insecureScriptHash[:]). + Str("script", string(script)). + Msg("script failed to execute on the execution node") + } + return hasInvalidArgument + }, + ) - return result, nil - } - // return if it's just a script failure as opposed to an EN/RN failure and skip trying other ENs/RNs - if status.Code(err) == codes.InvalidArgument { - b.log.Debug().Err(err). - Str("script_executor_addr", executorAddress). - Hex("block_id", blockID[:]). - Hex("script_hash", insecureScriptHash[:]). - Str("script", string(script)). - Msg("script failed to execute on the execution node") - return nil, err - } - errors = multierror.Append(errors, err) + if hasInvalidArgument { + return nil, errToReturn } - errToReturn := errors.ErrorOrNil() - if errToReturn != nil { + if errToReturn == nil { + return result, nil + } else { b.metrics.ScriptExecutionErrorOnExecutionNode() b.log.Error().Err(err).Msg("script execution failed for execution node internal reasons") + return nil, rpc.ConvertError(errToReturn, "failed to execute script on execution nodes", codes.Internal) } - - return nil, rpc.ConvertMultiError(errors, "failed to execute script on execution nodes", codes.Internal) } // shouldLogScript checks if the script hash is unique in the time window @@ -242,11 +238,7 @@ func (b *backendScripts) tryExecuteScriptOnExecutionNode( execResp, err := execRPCClient.ExecuteScriptAtBlockID(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(executorAddress) - } - return nil, status.Errorf(status.Code(err), "failed to execute the script on the execution node %s: %v", - executorAddress, err) + return nil, status.Errorf(status.Code(err), "failed to execute the script on the execution node %s: %v", executorAddress, err) } return execResp.GetValue(), nil } diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 99969a93fc1..d40ff45890e 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -110,6 +110,7 @@ func (suite *Suite) TestPing() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) err := backend.Ping(context.Background()) @@ -145,6 +146,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized block @@ -210,6 +212,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_NoTransitionSpan() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized snapshot @@ -282,6 +285,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_TransitionSpans() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized snapshot @@ -347,6 +351,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_PhaseTransitionSpan() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized snapshot @@ -423,6 +428,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_EpochTransitionSpan() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized snapshot @@ -483,6 +489,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_HistoryLimit() { suite.log, snapshotHistoryLimit, nil, + false, ) // the handler should return a snapshot history limit error @@ -521,6 +528,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest sealed block @@ -567,6 +575,7 @@ func (suite *Suite) TestGetTransaction() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) actual, err := backend.GetTransaction(context.Background(), transaction.ID()) @@ -607,6 +616,7 @@ func (suite *Suite) TestGetCollection() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) actual, err := backend.GetCollectionByID(context.Background(), expected.ID()) @@ -670,6 +680,7 @@ func (suite *Suite) TestGetTransactionResultByIndex() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) suite.execClient. On("GetTransactionResultByIndex", ctx, exeEventReq). @@ -733,6 +744,7 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) suite.execClient. On("GetTransactionResultsByBlockID", ctx, exeEventReq). @@ -824,6 +836,7 @@ func (suite *Suite) TestTransactionStatusTransition() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // Successfully return empty event list @@ -944,6 +957,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // should return pending status when we have not observed an expiry block @@ -1111,6 +1125,7 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -1169,6 +1184,7 @@ func (suite *Suite) TestTransactionResultUnknown() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // first call - when block under test is greater height than the sealed head, but execution node does not know about Tx @@ -1223,6 +1239,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // query the handler for the latest finalized header @@ -1353,6 +1370,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1385,6 +1403,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request with an empty block id list and expect an empty list of events and no error @@ -1444,6 +1463,7 @@ func (suite *Suite) TestGetExecutionResultByID() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1474,6 +1494,7 @@ func (suite *Suite) TestGetExecutionResultByID() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1537,6 +1558,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1568,6 +1590,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1718,6 +1741,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), maxHeight, minHeight) @@ -1757,6 +1781,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // execute request @@ -1795,6 +1820,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) actualResp, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, maxHeight) @@ -1832,6 +1858,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, minHeight+1) @@ -1869,6 +1896,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, maxHeight) @@ -1946,6 +1974,7 @@ func (suite *Suite) TestGetAccount() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -2027,6 +2056,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -2066,6 +2096,7 @@ func (suite *Suite) TestGetNetworkParameters() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) params := backend.GetNetworkParameters(context.Background()) @@ -2134,12 +2165,24 @@ func (suite *Suite) TestExecutionNodesForBlockID() { if fixedENs != nil { fixedENIdentifiers = fixedENs.NodeIDs() } - actualList, err := executionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log) - require.NoError(suite.T(), err) + if expectedENs == nil { expectedENs = flow.IdentityList{} } - if len(expectedENs) > maxExecutionNodesCnt { + + allExecNodes, err := executionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) + require.NoError(suite.T(), err) + + actualList := flow.IdentityList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + if len(expectedENs) > maxNodesCnt { for _, actual := range actualList { require.Contains(suite.T(), expectedENs, actual) } @@ -2154,9 +2197,20 @@ func (suite *Suite) TestExecutionNodesForBlockID() { attempt2Receipts = flow.ExecutionReceiptList{} attempt3Receipts = flow.ExecutionReceiptList{} suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) - actualList, err := executionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log) + + allExecNodes, err := executionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) require.NoError(suite.T(), err) - require.Equal(suite.T(), len(actualList), maxExecutionNodesCnt) + + actualList := flow.IdentityList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + require.Equal(suite.T(), len(actualList), maxNodesCnt) }) // if no preferred or fixed ENs are specified, the ExecutionNodesForBlockID function should @@ -2245,6 +2299,7 @@ func (suite *Suite) TestExecuteScriptOnExecutionNode() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // mock parameters @@ -2321,6 +2376,7 @@ func (suite *Suite) TestExecuteScriptOnArchiveNode() { suite.log, DefaultSnapshotHistoryLimit, []string{fullArchiveAddress}, + false, ) // mock parameters diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 3c0ac616697..79579d420e6 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/hashicorp/go-multierror" accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" execproto "github.com/onflow/flow/protobuf/go/flow/execution" @@ -25,8 +24,6 @@ import ( "github.com/onflow/flow-go/storage" ) -const collectionNodesToTry uint = 3 - type backendTransactions struct { staticCollectionRPC accessproto.AccessAPIClient // rpc client tied to a fixed collection node transactions storage.Transactions @@ -42,6 +39,7 @@ type backendTransactions struct { previousAccessNodes []accessproto.AccessAPIClient log zerolog.Logger + nodeCommunicator *NodeCommunicator } // SendTransaction forwards the transaction to the collection node @@ -86,36 +84,39 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T return b.grpcTxSend(ctx, b.staticCollectionRPC, tx) } - // otherwise choose a random set of collections nodes to try - collAddrs, err := b.chooseCollectionNodes(tx, collectionNodesToTry) + // otherwise choose all collection nodes to try + collNodes, err := b.chooseCollectionNodes(tx) if err != nil { return fmt.Errorf("failed to determine collection node for tx %x: %w", tx, err) } - var sendErrors *multierror.Error + var sendError error logAnyError := func() { - err = sendErrors.ErrorOrNil() - if err != nil { + if sendError != nil { b.log.Info().Err(err).Msg("failed to send transactions to collector nodes") } } defer logAnyError() // try sending the transaction to one of the chosen collection nodes - for _, addr := range collAddrs { - err = b.sendTransactionToCollector(ctx, tx, addr) - if err == nil { + sendError = b.nodeCommunicator.CallAvailableNode( + collNodes, + func(node *flow.Identity) error { + err = b.sendTransactionToCollector(ctx, tx, node.Address) + if err != nil { + return err + } return nil - } - sendErrors = multierror.Append(sendErrors, err) - } + }, + nil, + ) - return sendErrors.ErrorOrNil() + return sendError } // chooseCollectionNodes finds a random subset of size sampleSize of collection node addresses from the // collection node cluster responsible for the given tx -func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody, sampleSize uint) ([]string, error) { +func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (flow.IdentityList, error) { // retrieve the set of collector clusters clusters, err := b.state.Final().Epochs().Current().Clustering() @@ -124,24 +125,12 @@ func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody, sa } // get the cluster responsible for the transaction - txCluster, ok := clusters.ByTxID(tx.ID()) + targetNodes, ok := clusters.ByTxID(tx.ID()) if !ok { return nil, fmt.Errorf("could not get local cluster by txID: %x", tx.ID()) } - // select a random subset of collection nodes from the cluster to be tried in order - targetNodes, err := txCluster.Sample(sampleSize) - if err != nil { - return nil, fmt.Errorf("sampling failed: %w", err) - } - - // collect the addresses of all the chosen collection nodes - var targetAddrs = make([]string, len(targetNodes)) - for i, id := range targetNodes { - targetAddrs[i] = id.Address - } - - return targetAddrs, nil + return targetNodes, nil } // sendTransactionToCollection sends the transaction to the given collection node via grpc @@ -157,9 +146,6 @@ func (b *backendTransactions) sendTransactionToCollector(ctx context.Context, err = b.grpcTxSend(ctx, collectionRPC, tx) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateAccessAPIClient(collectionNodeAddr) - } return fmt.Errorf("failed to send transaction to collection node at %s: %w", collectionNodeAddr, err) } return nil @@ -391,6 +377,7 @@ func (b *backendTransactions) GetTransactionResultsByBlockID( req := &execproto.GetTransactionsByBlockIDRequest{ BlockId: blockID[:], } + execNodes, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -522,6 +509,7 @@ func (b *backendTransactions) GetTransactionResultByIndex( BlockId: blockID[:], Index: index, } + execNodes, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -782,32 +770,36 @@ func (b *backendTransactions) getTransactionResultFromAnyExeNode( execNodes flow.IdentityList, req *execproto.GetTransactionResultRequest, ) (*execproto.GetTransactionResultResponse, error) { - var errs *multierror.Error - logAnyError := func() { - errToReturn := errs.ErrorOrNil() + var errToReturn error + + defer func() { if errToReturn != nil { b.log.Info().Err(errToReturn).Msg("failed to get transaction result from execution nodes") } - } - defer logAnyError() - // try to execute the script on one of the execution nodes - for _, execNode := range execNodes { - resp, err := b.tryGetTransactionResult(ctx, execNode, req) - if err == nil { - b.log.Debug(). - Str("execution_node", execNode.String()). - Hex("block_id", req.GetBlockId()). - Hex("transaction_id", req.GetTransactionId()). - Msg("Successfully got transaction results from any node") - return resp, nil - } - if status.Code(err) == codes.NotFound { - return nil, err - } - errs = multierror.Append(errs, err) - } + }() - return nil, errs.ErrorOrNil() + var resp *execproto.GetTransactionResultResponse + errToReturn = b.nodeCommunicator.CallAvailableNode( + execNodes, + func(node *flow.Identity) error { + var err error + resp, err = b.tryGetTransactionResult(ctx, node, req) + if err == nil { + b.log.Debug(). + Str("execution_node", node.String()). + Hex("block_id", req.GetBlockId()). + Hex("transaction_id", req.GetTransactionId()). + Msg("Successfully got transaction results from any node") + return nil + } + return err + }, + func(_ *flow.Identity, err error) bool { + return status.Code(err) == codes.NotFound + }, + ) + + return resp, errToReturn } func (b *backendTransactions) tryGetTransactionResult( @@ -823,9 +815,6 @@ func (b *backendTransactions) tryGetTransactionResult( resp, err := execRPCClient.GetTransactionResult(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(execNode.Address) - } return nil, err } @@ -837,12 +826,12 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromAnyExeNode( execNodes flow.IdentityList, req *execproto.GetTransactionsByBlockIDRequest, ) (*execproto.GetTransactionResultsResponse, error) { - var errs *multierror.Error + var errToReturn error defer func() { // log the errors - if err := errs.ErrorOrNil(); err != nil { - b.log.Err(errs).Msg("failed to get transaction results from execution nodes") + if errToReturn != nil { + b.log.Err(errToReturn).Msg("failed to get transaction results from execution nodes") } }() @@ -851,22 +840,27 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromAnyExeNode( return nil, errors.New("zero execution nodes") } - for _, execNode := range execNodes { - resp, err := b.tryGetTransactionResultsByBlockID(ctx, execNode, req) - if err == nil { - b.log.Debug(). - Str("execution_node", execNode.String()). - Hex("block_id", req.GetBlockId()). - Msg("Successfully got transaction results from any node") - return resp, nil - } - if status.Code(err) == codes.NotFound { - return nil, err - } - errs = multierror.Append(errs, err) - } + var resp *execproto.GetTransactionResultsResponse + errToReturn = b.nodeCommunicator.CallAvailableNode( + execNodes, + func(node *flow.Identity) error { + var err error + resp, err = b.tryGetTransactionResultsByBlockID(ctx, node, req) + if err == nil { + b.log.Debug(). + Str("execution_node", node.String()). + Hex("block_id", req.GetBlockId()). + Msg("Successfully got transaction results from any node") + return nil + } + return err + }, + func(_ *flow.Identity, err error) bool { + return status.Code(err) == codes.NotFound + }, + ) - return nil, errs.ErrorOrNil() + return resp, errToReturn } func (b *backendTransactions) tryGetTransactionResultsByBlockID( @@ -882,9 +876,6 @@ func (b *backendTransactions) tryGetTransactionResultsByBlockID( resp, err := execRPCClient.GetTransactionResultsByBlockID(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(execNode.Address) - } return nil, err } @@ -896,37 +887,39 @@ func (b *backendTransactions) getTransactionResultByIndexFromAnyExeNode( execNodes flow.IdentityList, req *execproto.GetTransactionByIndexRequest, ) (*execproto.GetTransactionResultResponse, error) { - var errs *multierror.Error - logAnyError := func() { - errToReturn := errs.ErrorOrNil() + var errToReturn error + defer func() { if errToReturn != nil { b.log.Info().Err(errToReturn).Msg("failed to get transaction result from execution nodes") } - } - defer logAnyError() + }() if len(execNodes) == 0 { return nil, errors.New("zero execution nodes provided") } - // try to execute the script on one of the execution nodes - for _, execNode := range execNodes { - resp, err := b.tryGetTransactionResultByIndex(ctx, execNode, req) - if err == nil { - b.log.Debug(). - Str("execution_node", execNode.String()). - Hex("block_id", req.GetBlockId()). - Uint32("index", req.GetIndex()). - Msg("Successfully got transaction results from any node") - return resp, nil - } - if status.Code(err) == codes.NotFound { - return nil, err - } - errs = multierror.Append(errs, err) - } + var resp *execproto.GetTransactionResultResponse + errToReturn = b.nodeCommunicator.CallAvailableNode( + execNodes, + func(node *flow.Identity) error { + var err error + resp, err = b.tryGetTransactionResultByIndex(ctx, node, req) + if err == nil { + b.log.Debug(). + Str("execution_node", node.String()). + Hex("block_id", req.GetBlockId()). + Uint32("index", req.GetIndex()). + Msg("Successfully got transaction results from any node") + return nil + } + return err + }, + func(_ *flow.Identity, err error) bool { + return status.Code(err) == codes.NotFound + }, + ) - return nil, errs.ErrorOrNil() + return resp, errToReturn } func (b *backendTransactions) tryGetTransactionResultByIndex( @@ -942,9 +935,6 @@ func (b *backendTransactions) tryGetTransactionResultByIndex( resp, err := execRPCClient.GetTransactionResultByIndex(ctx, req) if err != nil { - if status.Code(err) == codes.Unavailable { - b.connFactory.InvalidateExecutionAPIClient(execNode.Address) - } return nil, err } diff --git a/engine/access/rpc/backend/historical_access_test.go b/engine/access/rpc/backend/historical_access_test.go index b66904f6604..42dd829dbbc 100644 --- a/engine/access/rpc/backend/historical_access_test.go +++ b/engine/access/rpc/backend/historical_access_test.go @@ -56,6 +56,7 @@ func (suite *Suite) TestHistoricalTransactionResult() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // Successfully return the transaction from the historical node @@ -114,6 +115,7 @@ func (suite *Suite) TestHistoricalTransaction() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) // Successfully return the transaction from the historical node diff --git a/engine/access/rpc/backend/node_communicator.go b/engine/access/rpc/backend/node_communicator.go new file mode 100644 index 00000000000..d75432b0b29 --- /dev/null +++ b/engine/access/rpc/backend/node_communicator.go @@ -0,0 +1,75 @@ +package backend + +import ( + "github.com/hashicorp/go-multierror" + "github.com/sony/gobreaker" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-go/model/flow" +) + +// maxFailedRequestCount represents the maximum number of failed requests before returning errors. +const maxFailedRequestCount = 3 + +// NodeAction is a callback function type that represents an action to be performed on a node. +// It takes a node as input and returns an error indicating the result of the action. +type NodeAction func(node *flow.Identity) error + +// ErrorTerminator is a callback function that determines whether an error should terminate further execution. +// It takes an error as input and returns a boolean value indicating whether the error should be considered terminal. +type ErrorTerminator func(node *flow.Identity, err error) bool + +// NodeCommunicator is responsible for calling available nodes in the backend. +type NodeCommunicator struct { + nodeSelectorFactory NodeSelectorFactory +} + +// NewNodeCommunicator creates a new instance of NodeCommunicator. +func NewNodeCommunicator(circuitBreakerEnabled bool) *NodeCommunicator { + return &NodeCommunicator{ + nodeSelectorFactory: NodeSelectorFactory{circuitBreakerEnabled: circuitBreakerEnabled}, + } +} + +// CallAvailableNode calls the provided function on the available nodes. +// It iterates through the nodes and executes the function. +// If an error occurs, it applies the custom error terminator (if provided) and keeps track of the errors. +// If the error occurs in circuit breaker, it continues to the next node. +// If the maximum failed request count is reached, it returns the accumulated errors. +func (b *NodeCommunicator) CallAvailableNode( + nodes flow.IdentityList, + call NodeAction, + shouldTerminateOnError ErrorTerminator, +) error { + var errs *multierror.Error + nodeSelector, err := b.nodeSelectorFactory.SelectNodes(nodes) + if err != nil { + return err + } + + for node := nodeSelector.Next(); node != nil; node = nodeSelector.Next() { + err := call(node) + if err == nil { + return nil + } + + if shouldTerminateOnError != nil && shouldTerminateOnError(node, err) { + return err + } + + if err == gobreaker.ErrOpenState { + if !nodeSelector.HasNext() && len(errs.Errors) == 0 { + errs = multierror.Append(errs, status.Error(codes.Unavailable, "there are no available nodes")) + } + continue + } + + errs = multierror.Append(errs, err) + if len(errs.Errors) >= maxFailedRequestCount { + return errs.ErrorOrNil() + } + } + + return errs.ErrorOrNil() +} diff --git a/engine/access/rpc/backend/node_selector.go b/engine/access/rpc/backend/node_selector.go new file mode 100644 index 00000000000..f90f8271b2d --- /dev/null +++ b/engine/access/rpc/backend/node_selector.go @@ -0,0 +1,70 @@ +package backend + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// maxNodesCnt is the maximum number of nodes that will be contacted to complete an API request. +const maxNodesCnt = 3 + +// NodeSelector is an interface that represents the ability to select node identities that the access node is trying to reach. +// It encapsulates the internal logic of node selection and provides a way to change implementations for different types +// of nodes. Implementations of this interface should define the Next method, which returns the next node identity to be +// selected. HasNext checks if there is next node available. +type NodeSelector interface { + Next() *flow.Identity + HasNext() bool +} + +// NodeSelectorFactory is a factory for creating node selectors based on factory configuration and node type. +// Supported configurations: +// circuitBreakerEnabled = true - nodes will be pseudo-randomly sampled and picked in-order. +// circuitBreakerEnabled = false - nodes will be picked from proposed list in-order without any changes. +type NodeSelectorFactory struct { + circuitBreakerEnabled bool +} + +// SelectNodes selects the configured number of node identities from the provided list of nodes +// and returns the node selector to iterate through them. +func (n *NodeSelectorFactory) SelectNodes(nodes flow.IdentityList) (NodeSelector, error) { + var err error + // If the circuit breaker is disabled, the legacy logic should be used, which selects only a specified number of nodes. + if !n.circuitBreakerEnabled { + nodes, err = nodes.Sample(maxNodesCnt) + if err != nil { + return nil, fmt.Errorf("sampling failed: %w", err) + } + } + + return NewMainNodeSelector(nodes), nil +} + +// MainNodeSelector is a specific implementation of the node selector. +// Which performs in-order node selection using fixed list of pre-defined nodes. +type MainNodeSelector struct { + nodes flow.IdentityList + index int +} + +var _ NodeSelector = (*MainNodeSelector)(nil) + +func NewMainNodeSelector(nodes flow.IdentityList) *MainNodeSelector { + return &MainNodeSelector{nodes: nodes, index: 0} +} + +// HasNext returns true if next node is available. +func (e *MainNodeSelector) HasNext() bool { + return e.index < len(e.nodes) +} + +// Next returns the next node in the selector. +func (e *MainNodeSelector) Next() *flow.Identity { + if e.index < len(e.nodes) { + next := e.nodes[e.index] + e.index++ + return next + } + return nil +} diff --git a/engine/access/rpc/backend/retry_test.go b/engine/access/rpc/backend/retry_test.go index c10b66bbbc0..2189223118a 100644 --- a/engine/access/rpc/backend/retry_test.go +++ b/engine/access/rpc/backend/retry_test.go @@ -61,6 +61,7 @@ func (suite *Suite) TestTransactionRetry() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry @@ -150,6 +151,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() { suite.log, DefaultSnapshotHistoryLimit, nil, + false, ) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry diff --git a/engine/access/rpc/connection/connection.go b/engine/access/rpc/connection/connection.go index f1ac3270654..de81319276f 100644 --- a/engine/access/rpc/connection/connection.go +++ b/engine/access/rpc/connection/connection.go @@ -59,7 +59,7 @@ func (cf *ConnectionFactoryImpl) GetAccessAPIClientWithPort(address string, port return nil, nil, err } - conn, closer, err := cf.Manager.GetConnection(grpcAddress, cf.CollectionNodeGRPCTimeout) + conn, closer, err := cf.Manager.GetConnection(grpcAddress, cf.CollectionNodeGRPCTimeout, AccessClient) if err != nil { return nil, nil, err } @@ -85,7 +85,7 @@ func (cf *ConnectionFactoryImpl) GetExecutionAPIClient(address string) (executio return nil, nil, err } - conn, closer, err := cf.Manager.GetConnection(grpcAddress, cf.ExecutionNodeGRPCTimeout) + conn, closer, err := cf.Manager.GetConnection(grpcAddress, cf.ExecutionNodeGRPCTimeout, ExecutionClient) if err != nil { return nil, nil, err } diff --git a/engine/access/rpc/connection/connection_test.go b/engine/access/rpc/connection/connection_test.go index 75239a075ae..a961816605e 100644 --- a/engine/access/rpc/connection/connection_test.go +++ b/engine/access/rpc/connection/connection_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/sony/gobreaker" + "go.uber.org/atomic" "pgregory.net/rapid" @@ -50,6 +52,7 @@ func TestProxyAccessAPI(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) proxyConnectionFactory := ProxyConnectionFactory{ @@ -91,6 +94,7 @@ func TestProxyExecutionAPI(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) proxyConnectionFactory := ProxyConnectionFactory{ @@ -137,6 +141,7 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) proxyConnectionFactory := ProxyConnectionFactory{ @@ -190,6 +195,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) proxyConnectionFactory := ProxyConnectionFactory{ @@ -250,11 +256,12 @@ func TestExecutionNodeClientTimeout(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) // create the execution API client client, _, err := connectionFactory.GetExecutionAPIClient(en.listener.Addr().String()) - assert.NoError(t, err) + require.NoError(t, err) ctx := context.Background() // make the call to the execution node @@ -298,6 +305,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) // create the collection API client @@ -346,6 +354,7 @@ func TestConnectionPoolFull(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) cn1Address := "foo1:123" @@ -421,6 +430,7 @@ func TestConnectionPoolStale(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) proxyConnectionFactory := ProxyConnectionFactory{ @@ -509,6 +519,7 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) clientAddress := en.listener.Addr().String() @@ -592,6 +603,7 @@ func TestExecutionEvictingCacheClients(t *testing.T) { unittest.Logger(), connectionFactory.AccessMetrics, 0, + CircuitBreakerConfig{}, ) clientAddress := cn.listener.Addr().String() @@ -633,6 +645,174 @@ func TestExecutionEvictingCacheClients(t *testing.T) { assert.Equal(t, 0, cache.Len()) } +// TestCircuitBreakerExecutionNode tests the circuit breaker state changes for execution nodes. +func TestCircuitBreakerExecutionNode(t *testing.T) { + requestTimeout := 500 * time.Millisecond + circuitBreakerRestoreTimeout := 1500 * time.Millisecond + + // Create an execution node for testing. + en := new(executionNode) + en.start(t) + defer en.stop(t) + + // Set up the handler mock to not respond within the requestTimeout. + req := &execution.PingRequest{} + resp := &execution.PingResponse{} + en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + + // Create the connection factory. + connectionFactory := new(ConnectionFactoryImpl) + + // Set the execution gRPC port. + connectionFactory.ExecutionGRPCPort = en.port + + // Set the execution gRPC client requestTimeout. + connectionFactory.ExecutionNodeGRPCTimeout = requestTimeout + + // Set the connection pool cache size. + cacheSize := 1 + connectionCache, _ := lru.New(cacheSize) + + connectionFactory.Manager = NewManager( + NewCache(connectionCache, cacheSize), + unittest.Logger(), + connectionFactory.AccessMetrics, + 0, + CircuitBreakerConfig{ + Enabled: true, + MaxFailures: 1, + MaxRequests: 1, + RestoreTimeout: circuitBreakerRestoreTimeout, + }, + ) + + // Set metrics reporting. + connectionFactory.AccessMetrics = metrics.NewNoopCollector() + + // Create the execution API client. + client, _, err := connectionFactory.GetExecutionAPIClient(en.listener.Addr().String()) + require.NoError(t, err) + + ctx := context.Background() + + // Helper function to make the Ping call to the execution node and measure the duration. + callAndMeasurePingDuration := func() (time.Duration, error) { + start := time.Now() + + // Make the call to the execution node. + _, err = client.Ping(ctx, req) + en.handler.AssertCalled(t, "Ping", testifymock.Anything, req) + + return time.Since(start), err + } + + // Call and measure the duration for the first invocation. + duration, err := callAndMeasurePingDuration() + assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) + assert.LessOrEqual(t, requestTimeout, duration) + + // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). + duration, err = callAndMeasurePingDuration() + assert.Equal(t, gobreaker.ErrOpenState, err) + assert.Greater(t, requestTimeout, duration) + + // Reset the mock Ping for the next invocation to return response without delay + en.handler.On("Ping", testifymock.Anything, req).Unset() + en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + + // Wait until the circuit breaker transitions to the "HalfOpen" state. + time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) + + // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). + duration, err = callAndMeasurePingDuration() + assert.Greater(t, requestTimeout, duration) + assert.Equal(t, nil, err) +} + +// TestCircuitBreakerCollectionNode tests the circuit breaker state changes for collection nodes. +func TestCircuitBreakerCollectionNode(t *testing.T) { + requestTimeout := 500 * time.Millisecond + circuitBreakerRestoreTimeout := 1500 * time.Millisecond + + // Create a collection node for testing. + cn := new(collectionNode) + cn.start(t) + defer cn.stop(t) + + // Set up the handler mock to not respond within the requestTimeout. + req := &access.PingRequest{} + resp := &access.PingResponse{} + cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + + // Create the connection factory. + connectionFactory := new(ConnectionFactoryImpl) + + // Set the collection gRPC port. + connectionFactory.CollectionGRPCPort = cn.port + + // Set the collection gRPC client requestTimeout. + connectionFactory.CollectionNodeGRPCTimeout = requestTimeout + + // Set the connection pool cache size. + cacheSize := 1 + connectionCache, _ := lru.New(cacheSize) + + connectionFactory.Manager = NewManager( + NewCache(connectionCache, cacheSize), + unittest.Logger(), + connectionFactory.AccessMetrics, + 0, + CircuitBreakerConfig{ + Enabled: true, + MaxFailures: 1, + MaxRequests: 1, + RestoreTimeout: circuitBreakerRestoreTimeout, + }, + ) + + // Set metrics reporting. + connectionFactory.AccessMetrics = metrics.NewNoopCollector() + + // Create the collection API client. + client, _, err := connectionFactory.GetAccessAPIClient(cn.listener.Addr().String()) + assert.NoError(t, err) + + ctx := context.Background() + + // Helper function to make the Ping call to the collection node and measure the duration. + callAndMeasurePingDuration := func() (time.Duration, error) { + start := time.Now() + + // Make the call to the collection node. + _, err = client.Ping(ctx, req) + cn.handler.AssertCalled(t, "Ping", testifymock.Anything, req) + + return time.Since(start), err + } + + // Call and measure the duration for the first invocation. + duration, err := callAndMeasurePingDuration() + assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) + assert.LessOrEqual(t, requestTimeout, duration) + + // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). + duration, err = callAndMeasurePingDuration() + assert.Equal(t, gobreaker.ErrOpenState, err) + assert.Greater(t, requestTimeout, duration) + + // Reset the mock Ping for the next invocation to return response without delay + cn.handler.On("Ping", testifymock.Anything, req).Unset() + cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + + // Wait until the circuit breaker transitions to the "HalfOpen" state. + time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) + + // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). + duration, err = callAndMeasurePingDuration() + assert.Greater(t, requestTimeout, duration) + assert.Equal(t, nil, err) +} + // node mocks a flow node that runs a GRPC server type node struct { server *grpc.Server diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index 8fac0108139..018b08743c3 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -6,6 +6,8 @@ import ( "io" "time" + "github.com/sony/gobreaker" + "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -20,6 +22,14 @@ import ( // DefaultClientTimeout is used when making a GRPC request to a collection node or an execution node. const DefaultClientTimeout = 3 * time.Second +// clientType is an enumeration type used to differentiate between different types of gRPC clients. +type clientType int + +const ( + AccessClient clientType = iota + ExecutionClient +) + type noopCloser struct{} func (c *noopCloser) Close() error { @@ -28,10 +38,25 @@ func (c *noopCloser) Close() error { // Manager provides methods for getting and managing gRPC client connections. type Manager struct { - cache *Cache - logger zerolog.Logger - metrics module.AccessMetrics - maxMsgSize uint + cache *Cache + logger zerolog.Logger + metrics module.AccessMetrics + maxMsgSize uint + circuitBreakerConfig CircuitBreakerConfig +} + +// CircuitBreakerConfig is a configuration struct for the circuit breaker. +type CircuitBreakerConfig struct { + // Enabled specifies whether the circuit breaker is enabled for collection and execution API clients. + Enabled bool + // RestoreTimeout specifies the duration after which the circuit breaker will restore the connection to the client + // after closing it due to failures. + RestoreTimeout time.Duration + // MaxFailures specifies the maximum number of failed calls to the client that will cause the circuit breaker + // to close the connection. + MaxFailures uint32 + // MaxRequests specifies the maximum number of requests to check if connection restored after timeout. + MaxRequests uint32 } // NewManager creates a new Manager with the specified parameters. @@ -40,28 +65,30 @@ func NewManager( logger zerolog.Logger, metrics module.AccessMetrics, maxMsgSize uint, + circuitBreakerConfig CircuitBreakerConfig, ) Manager { return Manager{ - cache: cache, - logger: logger, - metrics: metrics, - maxMsgSize: maxMsgSize, + cache: cache, + logger: logger, + metrics: metrics, + maxMsgSize: maxMsgSize, + circuitBreakerConfig: circuitBreakerConfig, } } // GetConnection returns a gRPC client connection for the given grpcAddress and timeout. // If a cache is used, it retrieves a cached connection, otherwise creates a new connection. // It returns the client connection and an io.Closer to close the connection when done. -func (m *Manager) GetConnection(grpcAddress string, timeout time.Duration) (*grpc.ClientConn, io.Closer, error) { +func (m *Manager) GetConnection(grpcAddress string, timeout time.Duration, clientType clientType) (*grpc.ClientConn, io.Closer, error) { if m.cache != nil { - conn, err := m.retrieveConnection(grpcAddress, timeout) + conn, err := m.retrieveConnection(grpcAddress, timeout, clientType) if err != nil { return nil, nil, err } return conn, &noopCloser{}, err } - conn, err := m.createConnection(grpcAddress, timeout, nil) + conn, err := m.createConnection(grpcAddress, timeout, nil, clientType) if err != nil { return nil, nil, err } @@ -102,7 +129,7 @@ func (m *Manager) HasCache() bool { // retrieveConnection retrieves the CachedClient for the given grpcAddress from the cache or adds a new one if not present. // If the connection is already cached, it waits for the lock and returns the connection from the cache. // Otherwise, it creates a new connection and caches it. -func (m *Manager) retrieveConnection(grpcAddress string, timeout time.Duration) (*grpc.ClientConn, error) { +func (m *Manager) retrieveConnection(grpcAddress string, timeout time.Duration, clientType clientType) (*grpc.ClientConn, error) { client, ok := m.cache.GetOrAdd(grpcAddress, timeout) if ok { // The client was retrieved from the cache, wait for the lock @@ -124,7 +151,7 @@ func (m *Manager) retrieveConnection(grpcAddress string, timeout time.Duration) } // The connection is not cached or is closed, create a new connection and cache it - conn, err := m.createConnection(grpcAddress, timeout, client) + conn, err := m.createConnection(grpcAddress, timeout, client, clientType) if err != nil { return nil, err } @@ -141,7 +168,7 @@ func (m *Manager) retrieveConnection(grpcAddress string, timeout time.Duration) // createConnection creates a new gRPC connection to the remote node at the given address with the specified timeout. // If the cachedClient is not nil, it means a new entry in the cache is being created, so it's locked to give priority // to the caller working with the new client, allowing it to create the underlying connection. -func (m *Manager) createConnection(address string, timeout time.Duration, cachedClient *CachedClient) (*grpc.ClientConn, error) { +func (m *Manager) createConnection(address string, timeout time.Duration, cachedClient *CachedClient, clientType clientType) (*grpc.ClientConn, error) { if timeout == 0 { timeout = DefaultClientTimeout } @@ -151,16 +178,27 @@ func (m *Manager) createConnection(address string, timeout time.Duration, cached Timeout: timeout, // How long the client will wait for a response from the keepalive before closing. } + // The order in which interceptors are added to the `connInterceptors` slice is important since they will be called + // in the opposite order during gRPC requests. See documentation for more info: + // https://grpc.io/blog/grpc-web-interceptor/#binding-interceptors var connInterceptors []grpc.UnaryClientInterceptor - // The order in which interceptors are added to the connInterceptors slice is important as they will be called in - // the same order during gRPC requests. It is crucial to ensure that the request watcher interceptor is added first. + if !m.circuitBreakerConfig.Enabled { + connInterceptors = append(connInterceptors, m.createClientInvalidationInterceptor(address, clientType)) + } + + connInterceptors = append(connInterceptors, createClientTimeoutInterceptor(timeout)) + // This interceptor monitors ongoing requests before passing control to subsequent interceptors. if cachedClient != nil { connInterceptors = append(connInterceptors, createRequestWatcherInterceptor(cachedClient)) } - connInterceptors = append(connInterceptors, createClientTimeoutInterceptor(timeout)) + if m.circuitBreakerConfig.Enabled { + // If the circuit breaker interceptor is enabled, it should always be called first before passing control to + // subsequent interceptors. + connInterceptors = append(connInterceptors, m.createCircuitBreakerInterceptor()) + } // ClientConn's default KeepAlive on connections is indefinite, assuming the timeout isn't reached // The connections should be safe to be persisted and reused. @@ -179,11 +217,6 @@ func (m *Manager) createConnection(address string, timeout time.Duration, cached return conn, nil } -// WithClientTimeoutOption is a helper function to create a GRPC dial option with the specified client timeout interceptor. -func WithClientTimeoutOption(timeout time.Duration) grpc.DialOption { - return grpc.WithUnaryInterceptor(createClientTimeoutInterceptor(timeout)) -} - // createRequestWatcherInterceptor creates a request watcher interceptor to wait for unfinished requests before closing. func createRequestWatcherInterceptor(cachedClient *CachedClient) grpc.UnaryClientInterceptor { requestWatcherInterceptor := func( @@ -211,6 +244,12 @@ func createRequestWatcherInterceptor(cachedClient *CachedClient) grpc.UnaryClien return requestWatcherInterceptor } +// WithClientTimeoutOption is a helper function to create a GRPC dial option +// with the specified client timeout interceptor. +func WithClientTimeoutOption(timeout time.Duration) grpc.DialOption { + return grpc.WithUnaryInterceptor(createClientTimeoutInterceptor(timeout)) +} + // createClientTimeoutInterceptor creates a client interceptor with a context that expires after the timeout. func createClientTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor { clientTimeoutInterceptor := func( @@ -234,3 +273,115 @@ func createClientTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInter return clientTimeoutInterceptor } + +// createClientInvalidationInterceptor creates a client interceptor for client invalidation. It should only be created +// if the circuit breaker is disabled. If the response from the server indicates an unavailable status, it invalidates +// the corresponding client. +func (m *Manager) createClientInvalidationInterceptor( + address string, + clientType clientType, +) grpc.UnaryClientInterceptor { + if !m.circuitBreakerConfig.Enabled { + clientInvalidationInterceptor := func( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + err := invoker(ctx, method, req, reply, cc, opts...) + if status.Code(err) == codes.Unavailable { + switch clientType { + case AccessClient: + if m.Remove(address) { + m.logger.Debug().Str("cached_access_client_invalidated", address).Msg("invalidating cached access client") + if m.metrics != nil { + m.metrics.ConnectionFromPoolInvalidated() + } + } + case ExecutionClient: + if m.Remove(address) { + m.logger.Debug().Str("cached_execution_client_invalidated", address).Msg("invalidating cached execution client") + if m.metrics != nil { + m.metrics.ConnectionFromPoolInvalidated() + } + } + default: + m.logger.Info().Str("client_invalidation_interceptor", address).Msg(fmt.Sprintf("unexpected client type: %d", clientType)) + } + } + + return err + } + + return clientInvalidationInterceptor + } + + return nil +} + +// The simplified representation and description of circuit breaker pattern, that used to handle node connectivity: +// +// Circuit Open --> Circuit Half-Open --> Circuit Closed +// ^ | +// | | +// +--------------------------------------+ +// +// The "Circuit Open" state represents the circuit being open, indicating that the node is not available. +// This state is entered when the number of consecutive failures exceeds the maximum allowed failures. +// +// The "Circuit Half-Open" state represents the circuit transitioning from the open state to the half-open +// state after a configured restore timeout. In this state, the circuit allows a limited number of requests +// to test if the node has recovered. +// +// The "Circuit Closed" state represents the circuit being closed, indicating that the node is available. +// This state is initial or entered when the test requests in the half-open state succeed. + +// createCircuitBreakerInterceptor creates a client interceptor for circuit breaker functionality. It should only be +// created if the circuit breaker is enabled. All invocations will go through the circuit breaker to be tracked for +// success or failure of the call. +func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor { + if m.circuitBreakerConfig.Enabled { + circuitBreaker := gobreaker.NewCircuitBreaker(gobreaker.Settings{ + // Timeout defines how long the circuit breaker will remain open before transitioning to the HalfClose state. + Timeout: m.circuitBreakerConfig.RestoreTimeout, + // ReadyToTrip returns true when the circuit breaker should trip and transition to the Open state + ReadyToTrip: func(counts gobreaker.Counts) bool { + // The number of maximum failures is checked before the circuit breaker goes to the Open state. + return counts.ConsecutiveFailures >= m.circuitBreakerConfig.MaxFailures + }, + // MaxRequests defines the max number of concurrent requests while the circuit breaker is in the HalfClosed + // state. + MaxRequests: m.circuitBreakerConfig.MaxRequests, + }) + + circuitBreakerInterceptor := func( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + // The circuit breaker integration occurs here, where all invoked calls to the node pass through the + // CircuitBreaker.Execute method. This method counts successful and failed invocations, and switches to the + // "StateOpen" when the maximum failure threshold is reached. When the circuit breaker is in the "StateOpen" + // it immediately rejects connections and returns without waiting for the call timeout. After the + // "RestoreTimeout" period elapses, the circuit breaker transitions to the "StateHalfOpen" and attempts the + // invocation again. If the invocation fails, it returns to the "StateOpen"; otherwise, it transitions to + // the "StateClosed" and handles invocations as usual. + _, err := circuitBreaker.Execute(func() (interface{}, error) { + err := invoker(ctx, method, req, reply, cc, opts...) + return nil, err + }) + return err + } + + return circuitBreakerInterceptor + } + + return nil +} diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 2d210fc358a..8ff5695c3c6 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -167,7 +167,8 @@ func (suite *RateLimitTestSuite) SetupTest() { nil, suite.log, 0, - nil) + nil, + false) rpcEngBuilder, err := NewBuilder( suite.log, diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index ca403ef6391..783ed0d3110 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -150,7 +150,8 @@ func (suite *SecureGRPCTestSuite) SetupTest() { nil, suite.log, 0, - nil) + nil, + false) rpcEngBuilder, err := rpc.NewBuilder( suite.log, diff --git a/go.mod b/go.mod index ed9abda52b2..d5890cfa995 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/onflow/wal v0.0.0-20230529184820-bc9f8244608d github.com/slok/go-http-metrics v0.10.0 + github.com/sony/gobreaker v0.5.0 ) require ( diff --git a/go.sum b/go.sum index 97ca856b74d..73eadfddbf8 100644 --- a/go.sum +++ b/go.sum @@ -1457,6 +1457,8 @@ github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3 github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= diff --git a/insecure/go.mod b/insecure/go.mod index 9a64f440592..b727c8ecf6c 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -217,6 +217,7 @@ require ( github.com/sethvargo/go-retry v0.2.3 // indirect github.com/shirou/gopsutil/v3 v3.22.2 // indirect github.com/slok/go-http-metrics v0.10.0 // indirect + github.com/sony/gobreaker v0.5.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 25f044da73e..308ac3ab9f3 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1429,6 +1429,8 @@ github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3 github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= diff --git a/integration/testnet/client.go b/integration/testnet/client.go index ab2eb0b751e..51026702085 100644 --- a/integration/testnet/client.go +++ b/integration/testnet/client.go @@ -88,7 +88,7 @@ func NewClient(addr string, chain flow.Chain) (*Client, error) { //if err != nil { // return nil, fmt.Errorf("cannot marshal key json: %w", err) //} - + // //fmt.Printf("New client with private key: \n%s\n", json) //fmt.Printf("and public key: \n%s\n", publicJson) diff --git a/integration/tests/access/access_circuit_breaker_test.go b/integration/tests/access/access_circuit_breaker_test.go new file mode 100644 index 00000000000..569119c8469 --- /dev/null +++ b/integration/tests/access/access_circuit_breaker_test.go @@ -0,0 +1,184 @@ +package access + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + sdk "github.com/onflow/flow-go-sdk" + sdkcrypto "github.com/onflow/flow-go-sdk/crypto" + "github.com/onflow/flow-go-sdk/templates" + "github.com/onflow/flow-go/integration/testnet" + "github.com/onflow/flow-go/integration/tests/lib" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestAccessCircuitBreaker(t *testing.T) { + suite.Run(t, new(AccessCircuitBreakerSuite)) +} + +type AccessCircuitBreakerSuite struct { + suite.Suite + + log zerolog.Logger + + // root context for the current test + ctx context.Context + cancel context.CancelFunc + + net *testnet.FlowNetwork +} + +var requestTimeout = 1500 * time.Millisecond +var cbRestoreTimeout = 6 * time.Second + +func (s *AccessCircuitBreakerSuite) TearDownTest() { + s.log.Info().Msg("================> Start TearDownTest") + s.net.Remove() + s.cancel() + s.log.Info().Msg("================> Finish TearDownTest") +} + +func (s *AccessCircuitBreakerSuite) SetupTest() { + s.log = unittest.LoggerForTest(s.Suite.T(), zerolog.InfoLevel) + s.log.Info().Msg("================> SetupTest") + defer func() { + s.log.Info().Msg("================> Finish SetupTest") + }() + + // need one access node with enabled circuit breaker + nodeConfigs := []testnet.NodeConfig{ + testnet.NewNodeConfig( + flow.RoleAccess, + testnet.WithLogLevel(zerolog.InfoLevel), + testnet.WithAdditionalFlag("--circuit-breaker-enabled=true"), + testnet.WithAdditionalFlag(fmt.Sprintf("--circuit-breaker-restore-timeout=%s", cbRestoreTimeout.String())), + testnet.WithAdditionalFlag("--circuit-breaker-max-requests=1"), + testnet.WithAdditionalFlag("--circuit-breaker-max-failures=1"), + testnet.WithAdditionalFlag(fmt.Sprintf("--collection-client-timeout=%s", requestTimeout.String())), + ), + } + // need one execution node + exeConfig := testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)) + nodeConfigs = append(nodeConfigs, exeConfig) + + // need one dummy verification node (unused ghost) + verConfig := testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel), testnet.AsGhost()) + nodeConfigs = append(nodeConfigs, verConfig) + + // need one controllable collection node + collConfig := testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel), testnet.WithAdditionalFlag("--hotstuff-proposal-duration=100ms")) + nodeConfigs = append(nodeConfigs, collConfig) + + // need three consensus nodes (unused ghost) + for n := 0; n < 3; n++ { + conID := unittest.IdentifierFixture() + nodeConfig := testnet.NewNodeConfig(flow.RoleConsensus, + testnet.WithLogLevel(zerolog.FatalLevel), + testnet.WithID(conID), + testnet.AsGhost()) + nodeConfigs = append(nodeConfigs, nodeConfig) + } + + conf := testnet.NewNetworkConfig("access_api_test", nodeConfigs) + s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet) + + // start the network + s.T().Logf("starting flow network with docker containers") + s.ctx, s.cancel = context.WithCancel(context.Background()) + + s.net.Start(s.ctx) +} + +// TestCircuitBreaker tests the behavior of the circuit breaker. It verifies the circuit breaker's ability to open, +// prevent further requests, and restore after a timeout. It is done in a few steps: +// 1. Get the collection node and disconnect it from the network. +// 2. Try to send a transaction multiple times to observe the decrease in waiting time for a failed response. +// 3. Connect the collection node to the network and wait for the circuit breaker restore time. +// 4. Successfully send a transaction. +func (s *AccessCircuitBreakerSuite) TestCircuitBreaker() { + // 1. Get the collection node + collectionContainer := s.net.ContainerByName("collection_1") + + // 2. Get the Access Node container and client + accessContainer := s.net.ContainerByName(testnet.PrimaryAN) + + // Check if access node was created with circuit breaker flags + require.True(s.T(), accessContainer.IsFlagSet("circuit-breaker-enabled")) + require.True(s.T(), accessContainer.IsFlagSet("circuit-breaker-restore-timeout")) + require.True(s.T(), accessContainer.IsFlagSet("circuit-breaker-max-requests")) + require.True(s.T(), accessContainer.IsFlagSet("circuit-breaker-max-failures")) + + accessClient, err := accessContainer.TestnetClient() + require.NoError(s.T(), err, "failed to get access node client") + require.NotNil(s.T(), accessClient, "failed to get access node client") + + latestBlockID, err := accessClient.GetLatestBlockID(s.ctx) + require.NoError(s.T(), err) + + // Create a new account to deploy Counter to + accountPrivateKey := lib.RandomPrivateKey() + + accountKey := sdk.NewAccountKey(). + FromPrivateKey(accountPrivateKey). + SetHashAlgo(sdkcrypto.SHA3_256). + SetWeight(sdk.AccountKeyWeightThreshold) + + serviceAddress := sdk.Address(accessClient.Chain.ServiceAddress()) + + // Generate the account creation transaction + createAccountTx, err := templates.CreateAccount( + []*sdk.AccountKey{accountKey}, + []templates.Contract{ + { + Name: lib.CounterContract.Name, + Source: lib.CounterContract.ToCadence(), + }, + }, serviceAddress) + require.NoError(s.T(), err) + + createAccountTx. + SetReferenceBlockID(sdk.Identifier(latestBlockID)). + SetProposalKey(serviceAddress, 0, accessClient.GetSeqNumber()). + SetPayer(serviceAddress). + SetGasLimit(9999) + + // Sign the transaction + signedTx, err := accessClient.SignTransaction(createAccountTx) + require.NoError(s.T(), err) + + // 3. Disconnect the collection node from the network to activate the Circuit Breaker + err = collectionContainer.Disconnect() + require.NoError(s.T(), err, "failed to pause connection node") + + // 4. Send a couple of transactions to test if the circuit breaker opens correctly + // Try to send the transaction for the first time. It should wait at least the timeout time and return Unavailable error + err = accessClient.SendTransaction(s.ctx, signedTx) + assert.Equal(s.T(), codes.Unavailable, status.Code(err)) + + // Try to send the transaction for the second time. It should wait less than a second because the circuit breaker + // is configured to break after the first failure + err = accessClient.SendTransaction(s.ctx, signedTx) + //Here we catch the codes.Unknown error, as this is the one that comes from the Circuit Breaker when the state is Open. + assert.Equal(s.T(), codes.Unknown, status.Code(err)) + + // Reconnect the collection node + err = collectionContainer.Connect() + require.NoError(s.T(), err, "failed to start collection node") + + // Wait for the circuit breaker to restore + time.Sleep(cbRestoreTimeout) + + // Try to send the transaction for the third time. The transaction should be sent successfully + err = accessClient.SendTransaction(s.ctx, signedTx) + require.NoError(s.T(), err, "transaction should be sent") +}