Skip to content

Commit

Permalink
Merge onflow#4501
Browse files Browse the repository at this point in the history
4501: Add graceful stop to stop control r=pattyshack a=pattyshack

Allow ingestion engine to gracefully stop before crashing.

Co-authored-by: Patrick Lee <[email protected]>
  • Loading branch information
bors[bot] and pattyshack authored Jun 23, 2023
2 parents b8659cf + 3015796 commit f733f47
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 2 deletions.
4 changes: 4 additions & 0 deletions admin/commands/execution/stop_at_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package execution
import (
"context"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
"github.com/onflow/flow-go/model/flow"
)
Expand Down Expand Up @@ -90,6 +92,8 @@ func TestCommandParsing(t *testing.T) {
func TestCommandsSetsValues(t *testing.T) {

stopControl := stop.NewStopControl(
engine.NewUnit(),
time.Second,
zerolog.Nop(),
nil,
nil,
Expand Down
7 changes: 7 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/validator"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/engine"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/provider"
"github.com/onflow/flow-go/engine/common/requester"
Expand Down Expand Up @@ -111,6 +112,8 @@ type ExecutionNode struct {
builder *FlowNodeBuilder // This is needed for accessing the ShutdownFunc
exeConf *ExecutionConfig

ingestionUnit *engine.Unit

collector module.ExecutionMetrics
executionState state.ExecutionState
followerState protocol.FollowerState
Expand Down Expand Up @@ -152,6 +155,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
builder: builder.FlowNodeBuilder,
exeConf: builder.exeConf,
toTriggerCheckpoint: atomic.NewBool(false),
ingestionUnit: engine.NewUnit(),
}

builder.FlowNodeBuilder.
Expand Down Expand Up @@ -679,6 +683,8 @@ func (exeNode *ExecutionNode) LoadStopControl(
}

stopControl := stop.NewStopControl(
exeNode.ingestionUnit,
exeNode.exeConf.maxGracefulStopDuration,
exeNode.builder.Logger,
exeNode.executionState,
node.Storage.Headers,
Expand Down Expand Up @@ -820,6 +826,7 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
}

exeNode.ingestionEng, err = ingestion.New(
exeNode.ingestionUnit,
node.Logger,
node.Network,
node.Me,
Expand Down
3 changes: 3 additions & 0 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/onflow/flow-go/utils/grpcutils"

"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
"github.com/onflow/flow-go/engine/execution/rpc"
"github.com/onflow/flow-go/fvm/storage/derived"
storage "github.com/onflow/flow-go/storage/badger"
Expand Down Expand Up @@ -49,6 +50,7 @@ type ExecutionConfig struct {
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration

computationConfig computation.ComputationConfig
receiptRequestWorkers uint // common provider engine workers
Expand Down Expand Up @@ -98,6 +100,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.StringToIntVar(&exeConf.apiBurstlimits, "api-burst-limits", map[string]int{}, "burst limits for gRPC API methods e.g. Ping=100,ExecuteScriptAtBlockID=100 etc. note limits apply globally to all clients.")
flags.IntVar(&exeConf.blobstoreRateLimit, "blobstore-rate-limit", 0, "per second outgoing rate limit for Execution Data blobstore")
flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore")
flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing")
}

func (exeConf *ExecutionConfig) ValidateFlags() error {
Expand Down
3 changes: 2 additions & 1 deletion engine/execution/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Engine struct {
}

func New(
unit *engine.Unit,
logger zerolog.Logger,
net network.Network,
me module.Local,
Expand Down Expand Up @@ -90,7 +91,7 @@ func New(
mempool := newMempool()

eng := Engine{
unit: engine.NewUnit(),
unit: unit,
log: log,
me: me,
request: request,
Expand Down
9 changes: 9 additions & 0 deletions engine/execution/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/onflow/flow-go/crypto"

enginePkg "github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/execution"
computation "github.com/onflow/flow-go/engine/execution/computation/mock"
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
Expand Down Expand Up @@ -202,7 +203,10 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
return stateProtocol.IsNodeAuthorizedAt(protocolState.AtBlockID(blockID), myIdentity.NodeID)
}

unit := enginePkg.NewUnit()
stopControl := stop.NewStopControl(
unit,
time.Second,
zerolog.Nop(),
executionState,
headers,
Expand All @@ -216,6 +220,7 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
uploadMgr := uploader.NewManager(trace.NewNoopTracer())

engine, err = New(
unit,
log,
net,
me,
Expand Down Expand Up @@ -1492,7 +1497,9 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution
return stateProtocol.IsNodeAuthorizedAt(ps.AtBlockID(blockID), myIdentity.NodeID)
}

unit := enginePkg.NewUnit()
engine, err = New(
unit,
log,
net,
me,
Expand All @@ -1514,6 +1521,8 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution
nil,
nil,
stop.NewStopControl(
unit,
time.Second,
zerolog.Nop(),
nil,
headers,
Expand Down
26 changes: 25 additions & 1 deletion engine/execution/ingestion/stop/stop_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/execution/state"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
Expand All @@ -19,6 +21,11 @@ import (
"github.com/onflow/flow-go/storage"
)

const (
// TODO: figure out an appropriate graceful stop time (is 10 min. enough?)
DefaultMaxGracefulStopDuration = 10 * time.Minute
)

// StopControl is a specialized component used by ingestion.Engine to encapsulate
// control of stopping blocks execution.
// It is intended to work tightly with the Engine, not as a general mechanism or interface.
Expand All @@ -39,6 +46,9 @@ import (
// This means version boundaries were edited. The resulting stop
// height is the new one.
type StopControl struct {
unit *engine.Unit
maxGracefulStopDuration time.Duration

// Stop control needs to consume BlockFinalized events.
// adding psEvents.Noop makes it a protocol.Consumer
psEvents.Noop
Expand Down Expand Up @@ -147,6 +157,8 @@ type StopControlHeaders interface {
// See build.SemverV2 for more details. That is why nil is a valid input for node version
// without a node version, the stop control can still be used for manual stopping.
func NewStopControl(
unit *engine.Unit,
maxGracefulStopDuration time.Duration,
log zerolog.Logger,
exeState state.ReadOnlyExecutionState,
headers StopControlHeaders,
Expand All @@ -161,6 +173,8 @@ func NewStopControl(
blockFinalizedChan := make(chan *flow.Header, 1000)

sc := &StopControl{
unit: unit,
maxGracefulStopDuration: maxGracefulStopDuration,
log: log.With().
Str("component", "stop_control").
Logger(),
Expand Down Expand Up @@ -539,7 +553,17 @@ func (s *StopControl) stopExecution() {
log.Warn().Msg("Stopping as finalization reached requested stop")

if s.stopBoundary.ShouldCrash {
// TODO: crash more gracefully or at least in a more explicit way
log.Info().
Dur("max-graceful-stop-duration", s.maxGracefulStopDuration).
Msg("Attempting graceful stop as finalization reached requested stop")
doneChan := s.unit.Done()
select {
case <-doneChan:
log.Info().Msg("Engine gracefully stopped")
case <-time.After(s.maxGracefulStopDuration):
log.Info().
Msg("Engine did not stop within max graceful stop duration")
}
log.Fatal().Msg("Crashing as finalization reached requested stop")
return
}
Expand Down
31 changes: 31 additions & 0 deletions engine/execution/ingestion/stop/stop_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
testifyMock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/execution/state/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
Expand All @@ -24,6 +25,8 @@ func TestCannotSetNewValuesAfterStoppingCommenced(t *testing.T) {

t.Run("when processing block at stop height", func(t *testing.T) {
sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
nil,
nil,
Expand Down Expand Up @@ -68,6 +71,8 @@ func TestCannotSetNewValuesAfterStoppingCommenced(t *testing.T) {
execState := mock.NewExecutionState(t)

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
nil,
Expand Down Expand Up @@ -122,6 +127,8 @@ func TestExecutionFallingBehind(t *testing.T) {
headerD := unittest.BlockHeaderWithParentFixture(headerC) // 23

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
nil,
Expand Down Expand Up @@ -184,6 +191,8 @@ func TestAddStopForPastBlocks(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -240,6 +249,8 @@ func TestAddStopForPastBlocksExecutionFallingBehind(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -293,6 +304,8 @@ func TestStopControlWithVersionControl(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -395,6 +408,8 @@ func TestStopControlWithVersionControl(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -471,6 +486,8 @@ func TestStopControlWithVersionControl(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -547,6 +564,8 @@ func TestStopControlWithVersionControl(t *testing.T) {
}

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -599,6 +618,8 @@ func TestStopControlWithVersionControl(t *testing.T) {
func TestStartingStopped(t *testing.T) {

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
nil,
nil,
Expand All @@ -618,6 +639,8 @@ func TestStoppedStateRejectsAllBlocksAndChanged(t *testing.T) {
execState := mock.NewExecutionState(t)

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
nil,
Expand Down Expand Up @@ -646,6 +669,8 @@ func Test_StopControlWorkers(t *testing.T) {
t.Run("start and stop, stopped = true", func(t *testing.T) {

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
nil,
nil,
Expand All @@ -671,6 +696,8 @@ func Test_StopControlWorkers(t *testing.T) {
t.Run("start and stop, stopped = false", func(t *testing.T) {

sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
nil,
nil,
Expand Down Expand Up @@ -732,6 +759,8 @@ func Test_StopControlWorkers(t *testing.T) {
// boundary but was restarted without being upgraded to the new version.
// In this case, the node should start as stopped.
sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down Expand Up @@ -805,6 +834,8 @@ func Test_StopControlWorkers(t *testing.T) {

// The stop is set by a previous version beacon and is in one blocks time.
sc := NewStopControl(
engine.NewUnit(),
time.Second,
unittest.Logger(),
execState,
headers,
Expand Down
5 changes: 5 additions & 0 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/collection/epochmgr"
"github.com/onflow/flow-go/engine/collection/epochmgr/factories"
collectioningest "github.com/onflow/flow-go/engine/collection/ingest"
Expand Down Expand Up @@ -692,7 +693,10 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
latestFinalizedBlock, err := node.State.Final().Head()
require.NoError(t, err)

unit := engine.NewUnit()
stopControl := stop.NewStopControl(
unit,
time.Second,
node.Log,
execState,
node.Headers,
Expand All @@ -705,6 +709,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit

rootHead, rootQC := getRoot(t, &node)
ingestionEngine, err := ingestion.New(
unit,
node.Log,
node.Net,
node.Me,
Expand Down

0 comments on commit f733f47

Please sign in to comment.