diff --git a/admin/commands/execution/stop_at_height_test.go b/admin/commands/execution/stop_at_height_test.go index f4b8c1d1832..c2ebe4cc93d 100644 --- a/admin/commands/execution/stop_at_height_test.go +++ b/admin/commands/execution/stop_at_height_test.go @@ -3,11 +3,13 @@ package execution import ( "context" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/admin" + "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/execution/ingestion/stop" "github.com/onflow/flow-go/model/flow" ) @@ -90,6 +92,8 @@ func TestCommandParsing(t *testing.T) { func TestCommandsSetsValues(t *testing.T) { stopControl := stop.NewStopControl( + engine.NewUnit(), + time.Second, zerolog.Nop(), nil, nil, diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 52092b80541..63463fd2d45 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -38,6 +38,7 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff/validator" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" + "github.com/onflow/flow-go/engine" followereng "github.com/onflow/flow-go/engine/common/follower" "github.com/onflow/flow-go/engine/common/provider" "github.com/onflow/flow-go/engine/common/requester" @@ -111,6 +112,8 @@ type ExecutionNode struct { builder *FlowNodeBuilder // This is needed for accessing the ShutdownFunc exeConf *ExecutionConfig + ingestionUnit *engine.Unit + collector module.ExecutionMetrics executionState state.ExecutionState followerState protocol.FollowerState @@ -152,6 +155,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { builder: builder.FlowNodeBuilder, exeConf: builder.exeConf, toTriggerCheckpoint: atomic.NewBool(false), + ingestionUnit: engine.NewUnit(), } builder.FlowNodeBuilder. @@ -679,6 +683,8 @@ func (exeNode *ExecutionNode) LoadStopControl( } stopControl := stop.NewStopControl( + exeNode.ingestionUnit, + exeNode.exeConf.maxGracefulStopDuration, exeNode.builder.Logger, exeNode.executionState, node.Storage.Headers, @@ -820,6 +826,7 @@ func (exeNode *ExecutionNode) LoadIngestionEngine( } exeNode.ingestionEng, err = ingestion.New( + exeNode.ingestionUnit, node.Logger, node.Network, node.Me, diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 83e697dff76..996d06c9913 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/engine/execution/computation" + "github.com/onflow/flow-go/engine/execution/ingestion/stop" "github.com/onflow/flow-go/engine/execution/rpc" "github.com/onflow/flow-go/fvm/storage/derived" storage "github.com/onflow/flow-go/storage/badger" @@ -49,6 +50,7 @@ type ExecutionConfig struct { blobstoreRateLimit int blobstoreBurstLimit int chunkDataPackRequestWorkers uint + maxGracefulStopDuration time.Duration computationConfig computation.ComputationConfig receiptRequestWorkers uint // common provider engine workers @@ -98,6 +100,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.StringToIntVar(&exeConf.apiBurstlimits, "api-burst-limits", map[string]int{}, "burst limits for gRPC API methods e.g. Ping=100,ExecuteScriptAtBlockID=100 etc. note limits apply globally to all clients.") flags.IntVar(&exeConf.blobstoreRateLimit, "blobstore-rate-limit", 0, "per second outgoing rate limit for Execution Data blobstore") flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore") + flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing") } func (exeConf *ExecutionConfig) ValidateFlags() error { diff --git a/engine/execution/ingestion/engine.go b/engine/execution/ingestion/engine.go index ac0d714a570..1ccbb73dd93 100644 --- a/engine/execution/ingestion/engine.go +++ b/engine/execution/ingestion/engine.go @@ -63,6 +63,7 @@ type Engine struct { } func New( + unit *engine.Unit, logger zerolog.Logger, net network.Network, me module.Local, @@ -90,7 +91,7 @@ func New( mempool := newMempool() eng := Engine{ - unit: engine.NewUnit(), + unit: unit, log: log, me: me, request: request, diff --git a/engine/execution/ingestion/engine_test.go b/engine/execution/ingestion/engine_test.go index 3baba30b0c5..7ec37a1eaed 100644 --- a/engine/execution/ingestion/engine_test.go +++ b/engine/execution/ingestion/engine_test.go @@ -18,6 +18,7 @@ import ( "github.com/onflow/flow-go/crypto" + enginePkg "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/execution" computation "github.com/onflow/flow-go/engine/execution/computation/mock" "github.com/onflow/flow-go/engine/execution/ingestion/stop" @@ -202,7 +203,10 @@ func runWithEngine(t *testing.T, f func(testingContext)) { return stateProtocol.IsNodeAuthorizedAt(protocolState.AtBlockID(blockID), myIdentity.NodeID) } + unit := enginePkg.NewUnit() stopControl := stop.NewStopControl( + unit, + time.Second, zerolog.Nop(), executionState, headers, @@ -216,6 +220,7 @@ func runWithEngine(t *testing.T, f func(testingContext)) { uploadMgr := uploader.NewManager(trace.NewNoopTracer()) engine, err = New( + unit, log, net, me, @@ -1492,7 +1497,9 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution return stateProtocol.IsNodeAuthorizedAt(ps.AtBlockID(blockID), myIdentity.NodeID) } + unit := enginePkg.NewUnit() engine, err = New( + unit, log, net, me, @@ -1514,6 +1521,8 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution nil, nil, stop.NewStopControl( + unit, + time.Second, zerolog.Nop(), nil, headers, diff --git a/engine/execution/ingestion/stop/stop_control.go b/engine/execution/ingestion/stop/stop_control.go index 4e8794aac20..10e2fc35120 100644 --- a/engine/execution/ingestion/stop/stop_control.go +++ b/engine/execution/ingestion/stop/stop_control.go @@ -6,10 +6,12 @@ import ( "math" "strings" "sync" + "time" "github.com/coreos/go-semver/semver" "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/component" @@ -19,6 +21,11 @@ import ( "github.com/onflow/flow-go/storage" ) +const ( + // TODO: figure out an appropriate graceful stop time (is 10 min. enough?) + DefaultMaxGracefulStopDuration = 10 * time.Minute +) + // StopControl is a specialized component used by ingestion.Engine to encapsulate // control of stopping blocks execution. // It is intended to work tightly with the Engine, not as a general mechanism or interface. @@ -39,6 +46,9 @@ import ( // This means version boundaries were edited. The resulting stop // height is the new one. type StopControl struct { + unit *engine.Unit + maxGracefulStopDuration time.Duration + // Stop control needs to consume BlockFinalized events. // adding psEvents.Noop makes it a protocol.Consumer psEvents.Noop @@ -147,6 +157,8 @@ type StopControlHeaders interface { // See build.SemverV2 for more details. That is why nil is a valid input for node version // without a node version, the stop control can still be used for manual stopping. func NewStopControl( + unit *engine.Unit, + maxGracefulStopDuration time.Duration, log zerolog.Logger, exeState state.ReadOnlyExecutionState, headers StopControlHeaders, @@ -161,6 +173,8 @@ func NewStopControl( blockFinalizedChan := make(chan *flow.Header, 1000) sc := &StopControl{ + unit: unit, + maxGracefulStopDuration: maxGracefulStopDuration, log: log.With(). Str("component", "stop_control"). Logger(), @@ -539,7 +553,17 @@ func (s *StopControl) stopExecution() { log.Warn().Msg("Stopping as finalization reached requested stop") if s.stopBoundary.ShouldCrash { - // TODO: crash more gracefully or at least in a more explicit way + log.Info(). + Dur("max-graceful-stop-duration", s.maxGracefulStopDuration). + Msg("Attempting graceful stop as finalization reached requested stop") + doneChan := s.unit.Done() + select { + case <-doneChan: + log.Info().Msg("Engine gracefully stopped") + case <-time.After(s.maxGracefulStopDuration): + log.Info(). + Msg("Engine did not stop within max graceful stop duration") + } log.Fatal().Msg("Crashing as finalization reached requested stop") return } diff --git a/engine/execution/ingestion/stop/stop_control_test.go b/engine/execution/ingestion/stop/stop_control_test.go index eaa3ff39d84..b4fb326c061 100644 --- a/engine/execution/ingestion/stop/stop_control_test.go +++ b/engine/execution/ingestion/stop/stop_control_test.go @@ -10,6 +10,7 @@ import ( testifyMock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/execution/state/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" @@ -24,6 +25,8 @@ func TestCannotSetNewValuesAfterStoppingCommenced(t *testing.T) { t.Run("when processing block at stop height", func(t *testing.T) { sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), nil, nil, @@ -68,6 +71,8 @@ func TestCannotSetNewValuesAfterStoppingCommenced(t *testing.T) { execState := mock.NewExecutionState(t) sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, nil, @@ -122,6 +127,8 @@ func TestExecutionFallingBehind(t *testing.T) { headerD := unittest.BlockHeaderWithParentFixture(headerC) // 23 sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, nil, @@ -184,6 +191,8 @@ func TestAddStopForPastBlocks(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -240,6 +249,8 @@ func TestAddStopForPastBlocksExecutionFallingBehind(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -293,6 +304,8 @@ func TestStopControlWithVersionControl(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -395,6 +408,8 @@ func TestStopControlWithVersionControl(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -471,6 +486,8 @@ func TestStopControlWithVersionControl(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -547,6 +564,8 @@ func TestStopControlWithVersionControl(t *testing.T) { } sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -599,6 +618,8 @@ func TestStopControlWithVersionControl(t *testing.T) { func TestStartingStopped(t *testing.T) { sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), nil, nil, @@ -618,6 +639,8 @@ func TestStoppedStateRejectsAllBlocksAndChanged(t *testing.T) { execState := mock.NewExecutionState(t) sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, nil, @@ -646,6 +669,8 @@ func Test_StopControlWorkers(t *testing.T) { t.Run("start and stop, stopped = true", func(t *testing.T) { sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), nil, nil, @@ -671,6 +696,8 @@ func Test_StopControlWorkers(t *testing.T) { t.Run("start and stop, stopped = false", func(t *testing.T) { sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), nil, nil, @@ -732,6 +759,8 @@ func Test_StopControlWorkers(t *testing.T) { // boundary but was restarted without being upgraded to the new version. // In this case, the node should start as stopped. sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, @@ -805,6 +834,8 @@ func Test_StopControlWorkers(t *testing.T) { // The stop is set by a previous version beacon and is in one blocks time. sc := NewStopControl( + engine.NewUnit(), + time.Second, unittest.Logger(), execState, headers, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 5f871916540..98f2d39b1c7 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff/notifications" "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/crypto" + "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/collection/epochmgr" "github.com/onflow/flow-go/engine/collection/epochmgr/factories" collectioningest "github.com/onflow/flow-go/engine/collection/ingest" @@ -692,7 +693,10 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit latestFinalizedBlock, err := node.State.Final().Head() require.NoError(t, err) + unit := engine.NewUnit() stopControl := stop.NewStopControl( + unit, + time.Second, node.Log, execState, node.Headers, @@ -705,6 +709,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit rootHead, rootQC := getRoot(t, &node) ingestionEngine, err := ingestion.New( + unit, node.Log, node.Net, node.Me,