From 0ec89477d36f5ac69a6ba02730c5a526ddf0db61 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 11:16:04 -0800 Subject: [PATCH 01/31] add executor aggregator --- module/block_iterator/executor/aggregator.go | 28 ++++++++ .../executor/aggregator_test.go | 68 +++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 module/block_iterator/executor/aggregator.go create mode 100644 module/block_iterator/executor/aggregator_test.go diff --git a/module/block_iterator/executor/aggregator.go b/module/block_iterator/executor/aggregator.go new file mode 100644 index 00000000000..4b3d3fbbd23 --- /dev/null +++ b/module/block_iterator/executor/aggregator.go @@ -0,0 +1,28 @@ +package executor + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +type AggregatedExecutor struct { + executors []IterationExecutor +} + +var _ IterationExecutor = (*AggregatedExecutor)(nil) + +func NewAggregatedExecutor(executors []IterationExecutor) IterationExecutor { + return &AggregatedExecutor{ + executors: executors, + } +} + +func (a *AggregatedExecutor) ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) { + for _, executor := range a.executors { + exception = executor.ExecuteByBlockID(blockID, batch) + if exception != nil { + return exception + } + } + return nil +} diff --git a/module/block_iterator/executor/aggregator_test.go b/module/block_iterator/executor/aggregator_test.go new file mode 100644 index 00000000000..b30c7d87768 --- /dev/null +++ b/module/block_iterator/executor/aggregator_test.go @@ -0,0 +1,68 @@ +package executor + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestAggregator(t *testing.T) { + // Create mock executors + mockExecutor1 := &MockExecutor{} + mockExecutor2 := &MockExecutor{} + mockExecutor3 := &MockExecutor{} + + // Create AggregatedExecutor + aggregator := NewAggregatedExecutor([]IterationExecutor{mockExecutor1, mockExecutor2, mockExecutor3}) + + // Test case 1: All executors succeed + blockID := unittest.IdentifierFixture() + batch := mock.NewReaderBatchWriter(t) + + err := aggregator.ExecuteByBlockID(blockID, batch) + + require.NoError(t, err) + require.Equal(t, 1, mockExecutor1.CallCount) + require.Equal(t, 1, mockExecutor2.CallCount) + require.Equal(t, 1, mockExecutor3.CallCount) + + // Test case 2: Second executor fails + mockExecutor1.Reset() + mockExecutor2.Reset() + mockExecutor3.Reset() + mockExecutor2.ShouldFail = true + + err = aggregator.ExecuteByBlockID(blockID, batch) + + require.Error(t, err) + require.Equal(t, 1, mockExecutor1.CallCount) + require.Equal(t, 1, mockExecutor2.CallCount) + require.Equal(t, 0, mockExecutor3.CallCount) +} + +// MockExecutor is a mock implementation of IterationExecutor +type MockExecutor struct { + CallCount int + ShouldFail bool +} + +var _ IterationExecutor = (*MockExecutor)(nil) + +func (m *MockExecutor) ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) error { + m.CallCount++ + if m.ShouldFail { + return fmt.Errorf("mock error") + } + return nil +} + +func (m *MockExecutor) Reset() { + m.CallCount = 0 + m.ShouldFail = false +} From 1e61b55160e4a369a9069a2189e9e5cd57e88000 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 11:22:02 -0800 Subject: [PATCH 02/31] refactor constructor --- module/block_iterator/executor/aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/block_iterator/executor/aggregator.go b/module/block_iterator/executor/aggregator.go index 4b3d3fbbd23..ad3958a8553 100644 --- a/module/block_iterator/executor/aggregator.go +++ b/module/block_iterator/executor/aggregator.go @@ -11,7 +11,7 @@ type AggregatedExecutor struct { var _ IterationExecutor = (*AggregatedExecutor)(nil) -func NewAggregatedExecutor(executors []IterationExecutor) IterationExecutor { +func NewAggregatedExecutor(executors []IterationExecutor) *AggregatedExecutor { return &AggregatedExecutor{ executors: executors, } From 5aadc6c5bab16a62a3b5edefa7ae58c1b9e51732 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:41:40 -0800 Subject: [PATCH 03/31] add prunable --- engine/execution/pruner/prunable.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 engine/execution/pruner/prunable.go diff --git a/engine/execution/pruner/prunable.go b/engine/execution/pruner/prunable.go new file mode 100644 index 00000000000..da2cc2582d1 --- /dev/null +++ b/engine/execution/pruner/prunable.go @@ -0,0 +1,18 @@ +package pruner + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/block_iterator/latest" +) + +// LatestPrunable decides which blocks are prunable +// we don't want to prune all the sealed blocks, but keep +// a certain number of them so that the data is still available for querying +type LatestPrunable struct { + *latest.LatestSealedAndExecuted + threshold uint64 // the number of blocks below the latest block +} + +func (l *LatestPrunable) Latest() (*flow.Header, error) { + return l.LatestSealedAndExecuted.BelowLatest(l.threshold) +} From 3f36620ac8a0da7ce06b7fa1e71768dbc2242498 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:41:53 -0800 Subject: [PATCH 04/31] add executor --- engine/execution/pruner/executor.go | 33 +++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 engine/execution/pruner/executor.go diff --git a/engine/execution/pruner/executor.go b/engine/execution/pruner/executor.go new file mode 100644 index 00000000000..bba7ac3c8f8 --- /dev/null +++ b/engine/execution/pruner/executor.go @@ -0,0 +1,33 @@ +package pruner + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/block_iterator/executor" + "github.com/onflow/flow-go/storage" +) + +type ExecutionDBPruner struct { + *executor.AggregatedExecutor +} + +var _ executor.IterationExecutor = (*ExecutionDBPruner)(nil) + +func NewExecutionDataPruner() *ExecutionDBPruner { + chunkDataPackPruner := &ChunkDataPackPruner{} // TODO add depenedencies + return &ExecutionDBPruner{ + executor.NewAggregatedExecutor([]executor.IterationExecutor{ + chunkDataPackPruner, + }), + } +} + +type ChunkDataPackPruner struct { + chunkDataPacks storage.ChunkDataPacks + results storage.ExecutionResults +} + +var _ executor.IterationExecutor = (*ChunkDataPackPruner)(nil) + +func (c *ChunkDataPackPruner) ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) { + return nil +} From 8bfd04119858ff171b3eefb11d70154c1aff6849 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:41:59 -0800 Subject: [PATCH 05/31] add config --- engine/execution/pruner/config.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 engine/execution/pruner/config.go diff --git a/engine/execution/pruner/config.go b/engine/execution/pruner/config.go new file mode 100644 index 00000000000..cd4701c390e --- /dev/null +++ b/engine/execution/pruner/config.go @@ -0,0 +1,17 @@ +package pruner + +import "time" + +type PruningConfig struct { + Threshold uint64 + BatchSize int + SleepAfterEachBatchCommit time.Duration + SleepAfterEachIteration time.Duration +} + +var DefaultConfig = &PruningConfig{ + Threshold: 30 * 60 * 60 * 24 * 1, // days * hours * minutes * seconds * block_per_second + BatchSize: 1000, + SleepAfterEachBatchCommit: 1 * time.Second, + SleepAfterEachIteration: 5 * time.Minute, +} From c9766bc499b893340899712c92566f6a64be0fe0 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:42:08 -0800 Subject: [PATCH 06/31] add engine core --- engine/execution/pruner/core.go | 96 +++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 engine/execution/pruner/core.go diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go new file mode 100644 index 00000000000..2f65def77b4 --- /dev/null +++ b/engine/execution/pruner/core.go @@ -0,0 +1,96 @@ +package pruner + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/block_iterator" + "github.com/onflow/flow-go/module/block_iterator/executor" + "github.com/onflow/flow-go/module/block_iterator/latest" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + pebblestorage "github.com/onflow/flow-go/storage/pebble" +) + +const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey" + +func LoopPruneExecutionDataFromRootToLatestSealed( + ctx context.Context, + state protocol.State, + badgerDB *badger.DB, + headers storage.Headers, + chunkDataPacks storage.ChunkDataPacks, + results storage.ExecutionResults, + chunkDataPacksDB *pebble.DB, + config PruningConfig, + callbackWhenOneIterationFinished func(), +) error { + if config.BatchSize <= 0 { + return fmt.Errorf("batch size must be greater than 0, %v", config.BatchSize) + } + + isBatchFull := func(counter int) bool { + return counter >= config.BatchSize + } + + sleeper := func() { + time.Sleep(config.SleepAfterEachBatchCommit) + } + + root := state.Params().SealedRoot() + sealed := latest.NewLatestSealedAndExecuted( + root, + state, + badgerDB, + ) + + latest := &LatestPrunable{ + LatestSealedAndExecuted: sealed, + threshold: config.Threshold, + } + + progress := pebblestorage.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey) + + // TODO: add real pruner + prune := &ChunkDataPackPruner{ + chunkDataPacks: chunkDataPacks, + results: results, + } + + creator, err := block_iterator.NewHeightBasedCreator( + headers.BlockIDByHeight, + progress, + root, + latest.Latest, + ) + + if err != nil { + return fmt.Errorf("failed to create height based block iterator creator: %w", err) + } + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(config.SleepAfterEachIteration): + } + + iter, err := creator.Create() + if err != nil { + return fmt.Errorf("failed to create block iterator: %w", err) + } + + err = executor.IterateExecuteAndCommitInBatch(iter, prune, pebbleimpl.ToDB(chunkDataPacksDB), isBatchFull, sleeper) + if err != nil { + return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) + } + + // call the callback to report a completion of a pruning iteration + callbackWhenOneIterationFinished() + } +} From 869f16b062dd35d91906fce20493ef8c0175bd2f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:42:15 -0800 Subject: [PATCH 07/31] add engine --- engine/execution/pruner/engine.go | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 engine/execution/pruner/engine.go diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go new file mode 100644 index 00000000000..6274a818a4b --- /dev/null +++ b/engine/execution/pruner/engine.go @@ -0,0 +1,35 @@ +package pruner + +import ( + "github.com/dgraph-io/badger/v2" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" +) + +func NewChunkDataPackPruningEngine( + log zerolog.Logger, + state protocol.State, + badgerDB *badger.DB, + headers storage.Headers, + chunkDataPacks storage.ChunkDataPacks, + results storage.ExecutionResults, + db storage.DB, + config PruningConfig, + callback func(), +) *component.ComponentManager { + return component.NewComponentManagerBuilder(). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + err := LoopPruneExecutionDataFromRootToLatestSealed( + ctx, state, badgerDB, headers, chunkDataPacks, results, db, config, callback) + if err != nil { + ctx.Throw(err) + } + }). + Build() +} From 39e04af8d1a78b3f072654c8cbc2d52857d71ee2 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 15:42:28 -0800 Subject: [PATCH 08/31] add latest sealed and executed --- .../latest/sealed_and_executed.go | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 module/block_iterator/latest/sealed_and_executed.go diff --git a/module/block_iterator/latest/sealed_and_executed.go b/module/block_iterator/latest/sealed_and_executed.go new file mode 100644 index 00000000000..19bf71c27c9 --- /dev/null +++ b/module/block_iterator/latest/sealed_and_executed.go @@ -0,0 +1,80 @@ +package latest + +import ( + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage/badger/procedure" +) + +type LatestSealedAndExecuted struct { + root *flow.Header + state protocol.State + executedBlockDB *badger.DB +} + +func NewLatestSealedAndExecuted( + root *flow.Header, + state protocol.State, + executedBlockDB *badger.DB, +) *LatestSealedAndExecuted { + return &LatestSealedAndExecuted{ + root: root, + state: state, + executedBlockDB: executedBlockDB, + } +} + +// BelowLatest returns the header at the given threshold below the latest sealed and executed block. +func (l *LatestSealedAndExecuted) BelowLatest(threshold uint64) (*flow.Header, error) { + + minHeight := l.root.Height + threshold + latest, err := l.Latest() + if err != nil { + return nil, err + } + + if minHeight > latest.Height { + return l.root, nil + } + + height := latest.Height - threshold + return l.state.AtHeight(height).Head() +} + +// Latest returns the latest sealed and executed block. +func (l *LatestSealedAndExecuted) Latest() (*flow.Header, error) { + height, err := LatestSealedAndExecutedHeight(l.state, l.executedBlockDB) + if err != nil { + return nil, err + } + + header, err := l.state.AtHeight(height).Head() + if err != nil { + return nil, err + } + + return header, nil +} + +// LatestSealedAndExecutedHeight returns the height of the latest sealed and executed block. +func LatestSealedAndExecutedHeight(state protocol.State, db *badger.DB) (uint64, error) { + lastSealed, err := state.Sealed().Head() + if err != nil { + return 0, err + } + + var blockID flow.Identifier + var lastExecuted uint64 + err = db.View(procedure.GetLastExecutedBlock(&lastExecuted, &blockID)) + if err != nil { + return 0, err + } + + // the last sealed executed is min(last_sealed, last_executed) + if lastExecuted < lastSealed.Height { + return lastExecuted, nil + } + return lastSealed.Height, nil +} From 2a8c2931b81922d77ced25fc1160d4c4d332de17 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 29 Jan 2025 12:37:33 -0800 Subject: [PATCH 09/31] add chunk data pack pruner engine --- cmd/execution_builder.go | 21 +++++ engine/execution/pruner/config.go | 14 +-- engine/execution/pruner/core.go | 89 +++++++++++++------ engine/execution/pruner/engine.go | 12 ++- engine/execution/pruner/executor.go | 23 ++--- module/block_iterator/executor/executor.go | 17 ++++ .../block_iterator/executor/executor_test.go | 7 +- 7 files changed, 127 insertions(+), 56 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 32ca366f5ea..ff42ef4eae0 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -13,6 +13,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/cockroachdb/pebble" "github.com/dgraph-io/badger/v2" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" @@ -55,6 +56,7 @@ import ( "github.com/onflow/flow-go/engine/execution/ingestion/stop" "github.com/onflow/flow-go/engine/execution/ingestion/uploader" exeprovider "github.com/onflow/flow-go/engine/execution/provider" + exepruner "github.com/onflow/flow-go/engine/execution/pruner" "github.com/onflow/flow-go/engine/execution/rpc" "github.com/onflow/flow-go/engine/execution/scripts" "github.com/onflow/flow-go/engine/execution/state" @@ -139,6 +141,8 @@ type ExecutionNode struct { txResults *storage.TransactionResults results *storage.ExecutionResults myReceipts *storage.MyExecutionReceipts + chunkDataPackDB *pebble.DB + chunkDataPacks storageerr.ChunkDataPacks providerEngine exeprovider.ProviderEngine checkerEng *checker.Engine syncCore *chainsync.Core @@ -226,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { // TODO: will re-visit this once storehouse has implemented new WAL for checkpoint file of // payloadless trie. // Component("execution data pruner", exeNode.LoadExecutionDataPruner). + Component("execution db pruner", exeNode.LoadExecutionDBPruner). Component("blob service", exeNode.LoadBlobService). Component("block data upload manager", exeNode.LoadBlockUploaderManager). Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader). @@ -743,6 +748,9 @@ func (exeNode *ExecutionNode) LoadExecutionState( chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache, pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize) + exeNode.chunkDataPackDB = chunkDataPackDB + exeNode.chunkDataPacks = chunkDataPacks + // Needed for gRPC server, make sure to assign to main scoped vars exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB) exeNode.serviceEvents = storage.NewServiceEvents(node.Metrics.Cache, node.DB) @@ -987,6 +995,19 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner( return exeNode.executionDataPruner, err } +func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.ReadyDoneAware, error) { + return exepruner.NewChunkDataPackPruningEngine( + node.Logger, + node.State, + node.DB, + node.Storage.Headers, + exeNode.chunkDataPacks, + exeNode.results, + exeNode.chunkDataPackDB, + exepruner.DefaultConfig, + ), nil +} + func (exeNode *ExecutionNode) LoadCheckerEngine( node *NodeConfig, ) ( diff --git a/engine/execution/pruner/config.go b/engine/execution/pruner/config.go index cd4701c390e..3fc7aa7489d 100644 --- a/engine/execution/pruner/config.go +++ b/engine/execution/pruner/config.go @@ -3,15 +3,15 @@ package pruner import "time" type PruningConfig struct { - Threshold uint64 - BatchSize int - SleepAfterEachBatchCommit time.Duration - SleepAfterEachIteration time.Duration + Threshold uint64 // The threshold is the number of blocks that we want to keep in the database. + BatchSize uint // The batch size is the number of blocks that we want to delete in one batch. + SleepAfterEachBatchCommit time.Duration // The sleep time after each batch commit. + SleepAfterEachIteration time.Duration // The sleep time after each iteration. } -var DefaultConfig = &PruningConfig{ - Threshold: 30 * 60 * 60 * 24 * 1, // days * hours * minutes * seconds * block_per_second +var DefaultConfig = PruningConfig{ + Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second BatchSize: 1000, SleepAfterEachBatchCommit: 1 * time.Second, - SleepAfterEachIteration: 5 * time.Minute, + SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out. } diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 2f65def77b4..6f8880c4f49 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/block_iterator" "github.com/onflow/flow-go/module/block_iterator/executor" "github.com/onflow/flow-go/module/block_iterator/latest" @@ -30,18 +31,53 @@ func LoopPruneExecutionDataFromRootToLatestSealed( config PruningConfig, callbackWhenOneIterationFinished func(), ) error { - if config.BatchSize <= 0 { - return fmt.Errorf("batch size must be greater than 0, %v", config.BatchSize) + // the creator can be reused to create new block iterator that can iterate from the last + // checkpoint to the new latest (sealed) block. + creator, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) + if err != nil { + return err } - isBatchFull := func(counter int) bool { - return counter >= config.BatchSize - } + // the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks + // and decides how to prune the chunk data packs. + iterateAndPruneAll := makeIterateAndPruneAll( + ctx, // for cancelling the iteration when the context is done + config, + chunkDataPacksDB, + NewChunKDataPackPruner(chunkDataPacks, results), + ) - sleeper := func() { - time.Sleep(config.SleepAfterEachBatchCommit) + for { + select { + case <-ctx.Done(): + return nil + // wait first so that we give the data pruning lower priority compare to other tasks. + // also we can disable this feature by setting the sleep time to a very large value. + case <-time.After(config.SleepAfterEachIteration): + } + + iter, err := creator.Create() + if err != nil { + return fmt.Errorf("failed to create block iterator: %w", err) + } + + err = iterateAndPruneAll(iter) + if err != nil { + return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) + } + + // call the callback to report a completion of a pruning iteration + callbackWhenOneIterationFinished() } +} +func makeBlockIteratorCreator( + state protocol.State, + badgerDB *badger.DB, + headers storage.Headers, + chunkDataPacksDB *pebble.DB, + config PruningConfig, +) (module.IteratorCreator, error) { root := state.Params().SealedRoot() sealed := latest.NewLatestSealedAndExecuted( root, @@ -56,12 +92,6 @@ func LoopPruneExecutionDataFromRootToLatestSealed( progress := pebblestorage.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey) - // TODO: add real pruner - prune := &ChunkDataPackPruner{ - chunkDataPacks: chunkDataPacks, - results: results, - } - creator, err := block_iterator.NewHeightBasedCreator( headers.BlockIDByHeight, progress, @@ -70,27 +100,30 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) if err != nil { - return fmt.Errorf("failed to create height based block iterator creator: %w", err) + return nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) } - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(config.SleepAfterEachIteration): - } + return creator, nil +} - iter, err := creator.Create() - if err != nil { - return fmt.Errorf("failed to create block iterator: %w", err) - } +// makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that +// takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. +func makeIterateAndPruneAll(ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner) func(iter module.BlockIterator) error { + isBatchFull := func(counter int) bool { + return uint(counter) >= config.BatchSize + } - err = executor.IterateExecuteAndCommitInBatch(iter, prune, pebbleimpl.ToDB(chunkDataPacksDB), isBatchFull, sleeper) + sleeper := func() { + time.Sleep(config.SleepAfterEachBatchCommit) + } + + db := pebbleimpl.ToDB(chunkDataPacksDB) + + return func(iter module.BlockIterator) error { + err := executor.IterateExecuteAndCommitInBatch(ctx, iter, prune, db, isBatchFull, sleeper) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } - - // call the callback to report a completion of a pruning iteration - callbackWhenOneIterationFinished() + return nil } } diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go index 6274a818a4b..5f587eaa033 100644 --- a/engine/execution/pruner/engine.go +++ b/engine/execution/pruner/engine.go @@ -1,6 +1,7 @@ package pruner import ( + "github.com/cockroachdb/pebble" "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" @@ -10,6 +11,8 @@ import ( "github.com/onflow/flow-go/storage" ) +// NewChunkDataPackPruningEngine creates a component that prunes chunk data packs +// from root to the latest sealed block. func NewChunkDataPackPruningEngine( log zerolog.Logger, state protocol.State, @@ -17,16 +20,19 @@ func NewChunkDataPackPruningEngine( headers storage.Headers, chunkDataPacks storage.ChunkDataPacks, results storage.ExecutionResults, - db storage.DB, + chunkDataPacksDB *pebble.DB, config PruningConfig, - callback func(), ) *component.ComponentManager { return component.NewComponentManagerBuilder(). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() + callback := func() { + log.Info().Msgf("Pruning iteration finished") + } + err := LoopPruneExecutionDataFromRootToLatestSealed( - ctx, state, badgerDB, headers, chunkDataPacks, results, db, config, callback) + ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config, callback) if err != nil { ctx.Throw(err) } diff --git a/engine/execution/pruner/executor.go b/engine/execution/pruner/executor.go index bba7ac3c8f8..201034c6ee2 100644 --- a/engine/execution/pruner/executor.go +++ b/engine/execution/pruner/executor.go @@ -6,21 +6,6 @@ import ( "github.com/onflow/flow-go/storage" ) -type ExecutionDBPruner struct { - *executor.AggregatedExecutor -} - -var _ executor.IterationExecutor = (*ExecutionDBPruner)(nil) - -func NewExecutionDataPruner() *ExecutionDBPruner { - chunkDataPackPruner := &ChunkDataPackPruner{} // TODO add depenedencies - return &ExecutionDBPruner{ - executor.NewAggregatedExecutor([]executor.IterationExecutor{ - chunkDataPackPruner, - }), - } -} - type ChunkDataPackPruner struct { chunkDataPacks storage.ChunkDataPacks results storage.ExecutionResults @@ -28,6 +13,14 @@ type ChunkDataPackPruner struct { var _ executor.IterationExecutor = (*ChunkDataPackPruner)(nil) +// TODO: replace when https://github.com/onflow/flow-go/pull/6919 is merged +func NewChunKDataPackPruner(chunkDataPacks storage.ChunkDataPacks, results storage.ExecutionResults) *ChunkDataPackPruner { + return &ChunkDataPackPruner{ + chunkDataPacks: chunkDataPacks, + results: results, + } +} + func (c *ChunkDataPackPruner) ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) { return nil } diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go index 2ec1cf7cabe..6116c507b1c 100644 --- a/module/block_iterator/executor/executor.go +++ b/module/block_iterator/executor/executor.go @@ -1,6 +1,7 @@ package executor import ( + "context" "fmt" "github.com/onflow/flow-go/model/flow" @@ -30,6 +31,8 @@ type IsBatchFull func(iteratedCountInCurrentBatch int) bool // can be resumed after restart. // it sleeps after each batch is committed in order to minimizing the impact on the system. func IterateExecuteAndCommitInBatch( + // ctx is used for cancelling the iteration when the context is done + ctx context.Context, // iterator decides how to iterate over blocks iter module.BlockIterator, // executor decides what data in the storage will be updated for a certain block @@ -47,6 +50,20 @@ func IterateExecuteAndCommitInBatch( iteratedCountInCurrentBatch := 0 for { + select { + // when the context is done, commit the last batch and return + case <-ctx.Done(): + if iteratedCountInCurrentBatch > 0 { + // commit the last batch + err := commitAndCheckpoint(batch, iter) + if err != nil { + return err + } + } + return nil + default: + } + // iterate over each block until the end blockID, hasNext, err := iter.Next() if err != nil { diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go index df2f76b3320..b993e87738b 100644 --- a/module/block_iterator/executor/executor_test.go +++ b/module/block_iterator/executor/executor_test.go @@ -1,6 +1,7 @@ package executor_test import ( + "context" "errors" "fmt" "testing" @@ -54,7 +55,7 @@ func TestExecute(t *testing.T) { // prune blocks batchSize := 3 - require.NoError(t, executor.IterateExecuteAndCommitInBatch(iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + require.NoError(t, executor.IterateExecuteAndCommitInBatch(context.Background(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // expect all blocks are pruned for _, b := range bs { @@ -113,7 +114,7 @@ func TestExecuteCanBeResumed(t *testing.T) { // prune blocks until interrupted at block 5 batchSize := 3 - err := executor.IterateExecuteAndCommitInBatch(iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) + err := executor.IterateExecuteAndCommitInBatch(context.Background(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) // expect all blocks are pruned @@ -144,7 +145,7 @@ func TestExecuteCanBeResumed(t *testing.T) { }, } - require.NoError(t, executor.IterateExecuteAndCommitInBatch(iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + require.NoError(t, executor.IterateExecuteAndCommitInBatch(context.Background(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // verify all blocks are pruned for _, b := range bs { From cba65fe4bb16087e1742c5c93695178b674576a1 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 29 Jan 2025 12:41:48 -0800 Subject: [PATCH 10/31] add flag to control pruning sleep after iteration --- cmd/execution_builder.go | 5 ++++- cmd/execution_config.go | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index ff42ef4eae0..413736483f1 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -996,6 +996,9 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner( } func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.ReadyDoneAware, error) { + cfg := exepruner.DefaultConfig + cfg.SleepAfterEachIteration = exeNode.exeConf.pruningConfigSleepAfterIteration + return exepruner.NewChunkDataPackPruningEngine( node.Logger, node.State, @@ -1004,7 +1007,7 @@ func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.Re exeNode.chunkDataPacks, exeNode.results, exeNode.chunkDataPackDB, - exepruner.DefaultConfig, + cfg, ), nil } diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 5e5b6556d8a..6b61dbc2ef6 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -69,6 +69,8 @@ type ExecutionConfig struct { enableStorehouse bool enableChecker bool publicAccessID string + + pruningConfigSleepAfterIteration time.Duration } func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { @@ -130,6 +132,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node") + flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-sleep-after-iteration", 500000*time.Hour, "sleep time after each iteration") } func (exeConf *ExecutionConfig) ValidateFlags() error { From 74e83761609c5bb0c7362f9aad2b7c3c5f868042 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 29 Jan 2025 12:49:15 -0800 Subject: [PATCH 11/31] remove callback --- engine/execution/pruner/core.go | 6 ++---- engine/execution/pruner/engine.go | 6 +----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 6f8880c4f49..8dc43790d3c 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -29,7 +29,6 @@ func LoopPruneExecutionDataFromRootToLatestSealed( results storage.ExecutionResults, chunkDataPacksDB *pebble.DB, config PruningConfig, - callbackWhenOneIterationFinished func(), ) error { // the creator can be reused to create new block iterator that can iterate from the last // checkpoint to the new latest (sealed) block. @@ -53,6 +52,8 @@ func LoopPruneExecutionDataFromRootToLatestSealed( return nil // wait first so that we give the data pruning lower priority compare to other tasks. // also we can disable this feature by setting the sleep time to a very large value. + // also allows the pruner to be more responsive to the context cancellation, meaning + // while the pruner is sleeping, it can be cancelled immediately. case <-time.After(config.SleepAfterEachIteration): } @@ -65,9 +66,6 @@ func LoopPruneExecutionDataFromRootToLatestSealed( if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } - - // call the callback to report a completion of a pruning iteration - callbackWhenOneIterationFinished() } } diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go index 5f587eaa033..3a465800d05 100644 --- a/engine/execution/pruner/engine.go +++ b/engine/execution/pruner/engine.go @@ -27,12 +27,8 @@ func NewChunkDataPackPruningEngine( AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() - callback := func() { - log.Info().Msgf("Pruning iteration finished") - } - err := LoopPruneExecutionDataFromRootToLatestSealed( - ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config, callback) + ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config) if err != nil { ctx.Throw(err) } From b6e6da3a3daea2dfc37cf758d5b93ffa60d5f48d Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 29 Jan 2025 13:03:37 -0800 Subject: [PATCH 12/31] add more flags --- cmd/execution_config.go | 9 ++++++++- engine/execution/pruner/core.go | 6 ++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 6b61dbc2ef6..5f3c46a00e1 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/engine/common/provider" "github.com/onflow/flow-go/engine/execution/computation/query" exeprovider "github.com/onflow/flow-go/engine/execution/provider" + exepruner "github.com/onflow/flow-go/engine/execution/pruner" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool" @@ -70,6 +71,9 @@ type ExecutionConfig struct { enableChecker bool publicAccessID string + pruningConfigThreshold uint64 + pruningConfigBatchSize uint + pruningConfigSleepAfterCommit time.Duration pruningConfigSleepAfterIteration time.Duration } @@ -132,7 +136,10 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node") - flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-sleep-after-iteration", 500000*time.Hour, "sleep time after each iteration") + flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days") + flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000") + flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachCommit, "sleep time after each batch commit, default 1s") + flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h") } func (exeConf *ExecutionConfig) ValidateFlags() error { diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 8dc43790d3c..18972b9f94e 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -77,14 +77,16 @@ func makeBlockIteratorCreator( config PruningConfig, ) (module.IteratorCreator, error) { root := state.Params().SealedRoot() - sealed := latest.NewLatestSealedAndExecuted( + sealedAndExecuted := latest.NewLatestSealedAndExecuted( root, state, badgerDB, ) + // retrieves the latest sealed and executed block height. + // the threshold ensures that a certain number of blocks are retained for querying instead of being pruned. latest := &LatestPrunable{ - LatestSealedAndExecuted: sealed, + LatestSealedAndExecuted: sealedAndExecuted, threshold: config.Threshold, } From b3fddf88b809e11785e61a1cb1bffcb447127949 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 30 Jan 2025 12:47:57 -0800 Subject: [PATCH 13/31] fix lint --- cmd/execution_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 5f3c46a00e1..ef97f45cd63 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -138,7 +138,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days") flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000") - flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachCommit, "sleep time after each batch commit, default 1s") + flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s") flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h") } From 321f4bf62f9d1d9a648505c8526491fd13bddb0c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 10:00:20 -0800 Subject: [PATCH 14/31] handle no block to iterate --- engine/execution/pruner/core.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 18972b9f94e..56d29d5cbc0 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -57,11 +57,16 @@ func LoopPruneExecutionDataFromRootToLatestSealed( case <-time.After(config.SleepAfterEachIteration): } - iter, err := creator.Create() + iter, hasNext, err := creator.Create() if err != nil { return fmt.Errorf("failed to create block iterator: %w", err) } + if !hasNext { + // no more blocks to iterate, we are done. + continue + } + err = iterateAndPruneAll(iter) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) From c42f85337ff7503e7341ddea3c3b69661ee79539 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 11:12:07 -0800 Subject: [PATCH 15/31] use the real chunk data pruner --- cmd/execution_builder.go | 8 ++++++-- engine/execution/pruner/core.go | 5 ++++- engine/execution/pruner/executor.go | 10 ++++------ module/block_iterator/executor/aggregator.go | 2 ++ 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 413736483f1..e6dd4142b0f 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -996,8 +996,12 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner( } func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.ReadyDoneAware, error) { - cfg := exepruner.DefaultConfig - cfg.SleepAfterEachIteration = exeNode.exeConf.pruningConfigSleepAfterIteration + cfg := exepruner.PruningConfig{ + Threshold: exeNode.exeConf.pruningConfigThreshold, + BatchSize: exeNode.exeConf.pruningConfigBatchSize, + SleepAfterEachBatchCommit: exeNode.exeConf.pruningConfigSleepAfterCommit, + SleepAfterEachIteration: exeNode.exeConf.pruningConfigSleepAfterIteration, + } return exepruner.NewChunkDataPackPruningEngine( node.Logger, diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 56d29d5cbc0..b12b5cc7eac 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -74,6 +74,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( } } +// makeBlockIteratorCreator create the block iterator creator func makeBlockIteratorCreator( state protocol.State, badgerDB *badger.DB, @@ -113,7 +114,9 @@ func makeBlockIteratorCreator( // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that // takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. -func makeIterateAndPruneAll(ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner) func(iter module.BlockIterator) error { +func makeIterateAndPruneAll( + ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner, +) func(iter module.BlockIterator) error { isBatchFull := func(counter int) bool { return uint(counter) >= config.BatchSize } diff --git a/engine/execution/pruner/executor.go b/engine/execution/pruner/executor.go index 201034c6ee2..754133c6739 100644 --- a/engine/execution/pruner/executor.go +++ b/engine/execution/pruner/executor.go @@ -3,24 +3,22 @@ package pruner import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/block_iterator/executor" + "github.com/onflow/flow-go/module/pruner/pruners" "github.com/onflow/flow-go/storage" ) type ChunkDataPackPruner struct { - chunkDataPacks storage.ChunkDataPacks - results storage.ExecutionResults + *pruners.ChunkDataPackPruner } var _ executor.IterationExecutor = (*ChunkDataPackPruner)(nil) -// TODO: replace when https://github.com/onflow/flow-go/pull/6919 is merged func NewChunKDataPackPruner(chunkDataPacks storage.ChunkDataPacks, results storage.ExecutionResults) *ChunkDataPackPruner { return &ChunkDataPackPruner{ - chunkDataPacks: chunkDataPacks, - results: results, + ChunkDataPackPruner: pruners.NewChunkDataPackPruner(chunkDataPacks, results), } } func (c *ChunkDataPackPruner) ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) { - return nil + return c.PruneByBlockID(blockID, batch) } diff --git a/module/block_iterator/executor/aggregator.go b/module/block_iterator/executor/aggregator.go index ad3958a8553..6892b67106b 100644 --- a/module/block_iterator/executor/aggregator.go +++ b/module/block_iterator/executor/aggregator.go @@ -5,6 +5,8 @@ import ( "github.com/onflow/flow-go/storage" ) +// AggregatedExecutor allows to aggregate multiple IterationExecutor instances into one +// so that they can be executed in a single call within the same batch. type AggregatedExecutor struct { executors []IterationExecutor } From 35d3a2bc228398e26fda0ec4a2b5f1d84c6efe10 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 12:06:09 -0800 Subject: [PATCH 16/31] add logging --- engine/execution/pruner/core.go | 10 ++++-- engine/execution/pruner/engine.go | 1 + module/block_iterator/executor/executor.go | 34 +++++++++++++++---- .../block_iterator/executor/executor_test.go | 12 +++++-- 4 files changed, 45 insertions(+), 12 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index b12b5cc7eac..b0a9a78a84d 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -7,6 +7,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/dgraph-io/badger/v2" + "github.com/rs/zerolog" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/block_iterator" @@ -21,6 +22,7 @@ import ( const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey" func LoopPruneExecutionDataFromRootToLatestSealed( + log zerolog.Logger, ctx context.Context, state protocol.State, badgerDB *badger.DB, @@ -40,6 +42,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( // the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks // and decides how to prune the chunk data packs. iterateAndPruneAll := makeIterateAndPruneAll( + log, ctx, // for cancelling the iteration when the context is done config, chunkDataPacksDB, @@ -47,6 +50,9 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) for { + log.Info().Msgf("execution data pruning will start in %s at %v", + config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration)) + select { case <-ctx.Done(): return nil @@ -115,7 +121,7 @@ func makeBlockIteratorCreator( // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that // takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. func makeIterateAndPruneAll( - ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner, + log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner, ) func(iter module.BlockIterator) error { isBatchFull := func(counter int) bool { return uint(counter) >= config.BatchSize @@ -128,7 +134,7 @@ func makeIterateAndPruneAll( db := pebbleimpl.ToDB(chunkDataPacksDB) return func(iter module.BlockIterator) error { - err := executor.IterateExecuteAndCommitInBatch(ctx, iter, prune, db, isBatchFull, sleeper) + err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, db, isBatchFull, sleeper) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go index 3a465800d05..a6fb2c4b058 100644 --- a/engine/execution/pruner/engine.go +++ b/engine/execution/pruner/engine.go @@ -28,6 +28,7 @@ func NewChunkDataPackPruningEngine( ready() err := LoopPruneExecutionDataFromRootToLatestSealed( + log.With().Str("component", "CDP-pruner").Logger(), ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config) if err != nil { ctx.Throw(err) diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go index 6116c507b1c..b6082d0e755 100644 --- a/module/block_iterator/executor/executor.go +++ b/module/block_iterator/executor/executor.go @@ -3,6 +3,9 @@ package executor import ( "context" "fmt" + "time" + + "github.com/rs/zerolog" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -31,6 +34,7 @@ type IsBatchFull func(iteratedCountInCurrentBatch int) bool // can be resumed after restart. // it sleeps after each batch is committed in order to minimizing the impact on the system. func IterateExecuteAndCommitInBatch( + log zerolog.Logger, // ctx is used for cancelling the iteration when the context is done ctx context.Context, // iterator decides how to iterate over blocks @@ -49,13 +53,21 @@ func IterateExecuteAndCommitInBatch( batch := db.NewBatch() iteratedCountInCurrentBatch := 0 + startTime := time.Now() + total := 0 + defer func() { + log.Info().Str("duration", time.Since(startTime).String()). + Int("total_block_pruned", total). + Msg("pruning completed") + }() + for { select { // when the context is done, commit the last batch and return case <-ctx.Done(): if iteratedCountInCurrentBatch > 0 { // commit the last batch - err := commitAndCheckpoint(batch, iter) + err := commitAndCheckpoint(log, batch, iter) if err != nil { return err } @@ -71,10 +83,12 @@ func IterateExecuteAndCommitInBatch( } if !hasNext { - // commit last batch - err := commitAndCheckpoint(batch, iter) - if err != nil { - return err + if iteratedCountInCurrentBatch > 0 { + // commit last batch + err := commitAndCheckpoint(log, batch, iter) + if err != nil { + return err + } } break @@ -86,11 +100,12 @@ func IterateExecuteAndCommitInBatch( return fmt.Errorf("failed to prune by block ID %v: %w", blockID, err) } iteratedCountInCurrentBatch++ + total++ // if batch is full, commit and sleep if isBatchFull(iteratedCountInCurrentBatch) { // commit the batch and save the progress - err := commitAndCheckpoint(batch, iter) + err := commitAndCheckpoint(log, batch, iter) if err != nil { return err } @@ -109,7 +124,12 @@ func IterateExecuteAndCommitInBatch( // commitAndCheckpoint commits the batch and checkpoints the iterator // so that the iteration progress can be resumed after restart. -func commitAndCheckpoint(batch storage.Batch, iter module.BlockIterator) error { +func commitAndCheckpoint(log zerolog.Logger, batch storage.Batch, iter module.BlockIterator) error { + start := time.Now() + defer func() { + log.Info().Str("commit-duration", time.Since(start).String()).Msg("batch committed") + }() + err := batch.Commit() if err != nil { return fmt.Errorf("failed to commit batch: %w", err) diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go index b993e87738b..a5ff0a7555e 100644 --- a/module/block_iterator/executor/executor_test.go +++ b/module/block_iterator/executor/executor_test.go @@ -55,7 +55,9 @@ func TestExecute(t *testing.T) { // prune blocks batchSize := 3 - require.NoError(t, executor.IterateExecuteAndCommitInBatch(context.Background(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + require.NoError(t, executor.IterateExecuteAndCommitInBatch( + unittest.Logger(), + context.Background(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // expect all blocks are pruned for _, b := range bs { @@ -114,7 +116,9 @@ func TestExecuteCanBeResumed(t *testing.T) { // prune blocks until interrupted at block 5 batchSize := 3 - err := executor.IterateExecuteAndCommitInBatch(context.Background(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) + err := executor.IterateExecuteAndCommitInBatch( + unittest.Logger(), + context.Background(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) // expect all blocks are pruned @@ -145,7 +149,9 @@ func TestExecuteCanBeResumed(t *testing.T) { }, } - require.NoError(t, executor.IterateExecuteAndCommitInBatch(context.Background(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + require.NoError(t, executor.IterateExecuteAndCommitInBatch( + unittest.Logger(), + context.Background(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // verify all blocks are pruned for _, b := range bs { From 1d369e1ec897217c9d31d9ba827f4da98cc64d0e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 12:22:08 -0800 Subject: [PATCH 17/31] update default value --- engine/execution/pruner/config.go | 9 ++++++--- engine/execution/pruner/core.go | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/engine/execution/pruner/config.go b/engine/execution/pruner/config.go index 3fc7aa7489d..42bc77be518 100644 --- a/engine/execution/pruner/config.go +++ b/engine/execution/pruner/config.go @@ -10,8 +10,11 @@ type PruningConfig struct { } var DefaultConfig = PruningConfig{ - Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second - BatchSize: 1000, - SleepAfterEachBatchCommit: 1 * time.Second, + Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second + BatchSize: 1200, + // when choosing a value, consider the batch size and block time, for instance, + // the block time is 1.2 block/second, and the batch size is 1200, so the batch commit time is 1200 / 1.2 = 1000 seconds. + // so the sleep time should be smaller than 1000 seconds, otherwise, the pruner is not able to keep up with the block generation. + SleepAfterEachBatchCommit: 12 * time.Second, SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out. } diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index b0a9a78a84d..32440672229 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -50,8 +50,8 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) for { - log.Info().Msgf("execution data pruning will start in %s at %v", - config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration)) + log.Info().Msgf("execution data pruning will start in %s at %s", + config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) select { case <-ctx.Done(): From 715c4e612addf2ec0e41d9ff593224827adb3b77 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 12:52:58 -0800 Subject: [PATCH 18/31] add logging --- engine/execution/pruner/config.go | 9 ++++++--- engine/execution/pruner/core.go | 19 +++++++++++++------ integration/localnet/builder/bootstrap.go | 1 + 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/engine/execution/pruner/config.go b/engine/execution/pruner/config.go index 42bc77be518..c48200a8b0c 100644 --- a/engine/execution/pruner/config.go +++ b/engine/execution/pruner/config.go @@ -12,9 +12,12 @@ type PruningConfig struct { var DefaultConfig = PruningConfig{ Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second BatchSize: 1200, - // when choosing a value, consider the batch size and block time, for instance, - // the block time is 1.2 block/second, and the batch size is 1200, so the batch commit time is 1200 / 1.2 = 1000 seconds. - // so the sleep time should be smaller than 1000 seconds, otherwise, the pruner is not able to keep up with the block generation. + // when choosing a value, consider the batch size and block time, + // for instance, + // the block time is 1.2 block/second, and the batch size is 1200, + // so the batch commit time is 1200 / 1.2 = 1000 seconds. + // the sleep time should be smaller than 1000 seconds, otherwise, + // the pruner is not able to keep up with the block generation. SleepAfterEachBatchCommit: 12 * time.Second, SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out. } diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 32440672229..fca280b10cd 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -34,7 +34,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) error { // the creator can be reused to create new block iterator that can iterate from the last // checkpoint to the new latest (sealed) block. - creator, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) + creator, getLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) if err != nil { return err } @@ -50,8 +50,15 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) for { - log.Info().Msgf("execution data pruning will start in %s at %s", - config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) + latest, err := getLatest.Latest() + if err != nil { + return fmt.Errorf("failed to get latest sealed and executed block: %w", err) + } + + log.Info(). + Uint64("latest_height", latest.Height). + Msgf("execution data pruning will start in %s at %s", + config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) select { case <-ctx.Done(): @@ -87,7 +94,7 @@ func makeBlockIteratorCreator( headers storage.Headers, chunkDataPacksDB *pebble.DB, config PruningConfig, -) (module.IteratorCreator, error) { +) (module.IteratorCreator, *LatestPrunable, error) { root := state.Params().SealedRoot() sealedAndExecuted := latest.NewLatestSealedAndExecuted( root, @@ -112,10 +119,10 @@ func makeBlockIteratorCreator( ) if err != nil { - return nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) + return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) } - return creator, nil + return creator, latest, nil } // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 08f649630f4..dcd264ecba4 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -403,6 +403,7 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se fmt.Sprintf("--extensive-tracing=%t", extesiveTracing), "--execution-data-dir=/data/execution-data", "--chunk-data-pack-dir=/data/chunk-data-pack", + "--pruning-sleep-after-iteration=30m", ) service.Volumes = append(service.Volumes, From 346a11c9f7c9a4e430b26f061982f34e47f58594 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 13:05:54 -0800 Subject: [PATCH 19/31] update log --- engine/execution/pruner/core.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index fca280b10cd..3ef72f90ac6 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -34,7 +34,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) error { // the creator can be reused to create new block iterator that can iterate from the last // checkpoint to the new latest (sealed) block. - creator, getLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) + creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) if err != nil { return err } @@ -50,13 +50,14 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ) for { - latest, err := getLatest.Latest() + nextToPrune, latestToPrune, err := getNextAndLatest() if err != nil { - return fmt.Errorf("failed to get latest sealed and executed block: %w", err) + return fmt.Errorf("failed to get next and latest to prune: %w", err) } log.Info(). - Uint64("latest_height", latest.Height). + Uint64("nextToPrune", nextToPrune). + Uint64("latestToPrune", latestToPrune). Msgf("execution data pruning will start in %s at %s", config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) @@ -94,7 +95,7 @@ func makeBlockIteratorCreator( headers storage.Headers, chunkDataPacksDB *pebble.DB, config PruningConfig, -) (module.IteratorCreator, *LatestPrunable, error) { +) (module.IteratorCreator, func() (nextToPrune uint64, latestToPrune uint64, err error), error) { root := state.Params().SealedRoot() sealedAndExecuted := latest.NewLatestSealedAndExecuted( root, @@ -122,7 +123,19 @@ func makeBlockIteratorCreator( return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) } - return creator, latest, nil + return creator, func() (uint64, uint64, error) { + next, err := progress.ProcessedIndex() + if err != nil { + return 0, 0, fmt.Errorf("failed to get next height to prune: %w", err) + } + + header, err := latest.Latest() + if err != nil { + return 0, 0, fmt.Errorf("failed to get latest prunable block: %w", err) + } + + return next, header.Height, nil + }, nil } // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that From 45ec586b8bea3a95deb7dea4c9cfa1718aa7f98d Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 13:09:12 -0800 Subject: [PATCH 20/31] update config --- cmd/execution_config.go | 8 ++++---- integration/localnet/builder/bootstrap.go | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/execution_config.go b/cmd/execution_config.go index ef97f45cd63..e1638e4a152 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -136,10 +136,10 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node") - flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days") - flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000") - flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s") - flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h") + flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-config-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days") + flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-config-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000") + flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-config-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s") + flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-config-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h") } func (exeConf *ExecutionConfig) ValidateFlags() error { diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index dcd264ecba4..15f4f3c21db 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -403,7 +403,8 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se fmt.Sprintf("--extensive-tracing=%t", extesiveTracing), "--execution-data-dir=/data/execution-data", "--chunk-data-pack-dir=/data/chunk-data-pack", - "--pruning-sleep-after-iteration=30m", + "--pruning-config-threshold=20", + "--pruning-config-sleep-after-iteration=30m", ) service.Volumes = append(service.Volumes, From edba299c14b16a75d75d1388abff451b8def7653 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 3 Feb 2025 11:23:41 -0800 Subject: [PATCH 21/31] add metrics --- cmd/execution_builder.go | 1 + engine/execution/pruner/core.go | 5 ++ engine/execution/pruner/engine.go | 4 +- integration/localnet/builder/bootstrap.go | 2 +- module/metrics.go | 3 + module/metrics/execution.go | 13 ++++ module/metrics/noop.go | 2 + module/mock/block_iterator.go | 92 +++++++++++++++++++++++ module/mock/execution_metrics.go | 5 ++ module/mock/iterator_creator.go | 64 ++++++++++++++++ module/mock/iterator_state.go | 70 +++++++++++++++++ module/mock/iterator_state_reader.go | 52 +++++++++++++ module/mock/iterator_state_writer.go | 42 +++++++++++ 13 files changed, 353 insertions(+), 2 deletions(-) create mode 100644 module/mock/block_iterator.go create mode 100644 module/mock/iterator_creator.go create mode 100644 module/mock/iterator_state.go create mode 100644 module/mock/iterator_state_reader.go create mode 100644 module/mock/iterator_state_writer.go diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index e6dd4142b0f..d479ecd6754 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1005,6 +1005,7 @@ func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.Re return exepruner.NewChunkDataPackPruningEngine( node.Logger, + exeNode.collector, node.State, node.DB, node.Storage.Headers, diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 3ef72f90ac6..8225709377a 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -23,6 +23,7 @@ const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutio func LoopPruneExecutionDataFromRootToLatestSealed( log zerolog.Logger, + metrics module.ExecutionMetrics, ctx context.Context, state protocol.State, badgerDB *badger.DB, @@ -61,6 +62,10 @@ func LoopPruneExecutionDataFromRootToLatestSealed( Msgf("execution data pruning will start in %s at %s", config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) + // last pruned is nextToPrune - 1. + // it won't underflow, because nextToPrune starts from root + 1 + metrics.ExecutionLastChunkDataPackPrunedHeight(nextToPrune - 1) + select { case <-ctx.Done(): return nil diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go index a6fb2c4b058..b871cfdcc4f 100644 --- a/engine/execution/pruner/engine.go +++ b/engine/execution/pruner/engine.go @@ -5,6 +5,7 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/state/protocol" @@ -15,6 +16,7 @@ import ( // from root to the latest sealed block. func NewChunkDataPackPruningEngine( log zerolog.Logger, + metrics module.ExecutionMetrics, state protocol.State, badgerDB *badger.DB, headers storage.Headers, @@ -28,7 +30,7 @@ func NewChunkDataPackPruningEngine( ready() err := LoopPruneExecutionDataFromRootToLatestSealed( - log.With().Str("component", "CDP-pruner").Logger(), + log.With().Str("component", "CDP-pruner").Logger(), metrics, ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config) if err != nil { ctx.Throw(err) diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 15f4f3c21db..f21fc3c4fcf 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -404,7 +404,7 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se "--execution-data-dir=/data/execution-data", "--chunk-data-pack-dir=/data/chunk-data-pack", "--pruning-config-threshold=20", - "--pruning-config-sleep-after-iteration=30m", + "--pruning-config-sleep-after-iteration=1m", ) service.Volumes = append(service.Volumes, diff --git a/module/metrics.go b/module/metrics.go index c1a2a2a24c1..74a0025856d 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -997,6 +997,9 @@ type ExecutionMetrics interface { // ExecutionLastFinalizedExecutedBlockHeight reports last finalized and executed block height ExecutionLastFinalizedExecutedBlockHeight(height uint64) + // ExecutionLastChunkDataPackPrunedHeight reports last chunk data pack pruned height + ExecutionLastChunkDataPackPrunedHeight(height uint64) + // ExecutionBlockExecuted reports the total time and computation spent on executing a block ExecutionBlockExecuted(dur time.Duration, stats BlockExecutionResultStats) diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 4a06a5895c9..aea61f90bfe 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -20,6 +20,7 @@ type ExecutionCollector struct { totalFailedTransactionsCounter prometheus.Counter lastExecutedBlockHeightGauge prometheus.Gauge lastFinalizedExecutedBlockHeightGauge prometheus.Gauge + lastChunkDataPackPrunedHeightGauge prometheus.Gauge stateStorageDiskTotal prometheus.Gauge storageStateCommitment prometheus.Gauge checkpointSize prometheus.Gauge @@ -653,6 +654,13 @@ func NewExecutionCollector(tracer module.Tracer) *ExecutionCollector { Help: "the last height that was finalized and executed", }), + lastChunkDataPackPrunedHeightGauge: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespaceExecution, + Subsystem: subsystemRuntime, + Name: "last_chunk_data_pack_pruned_height", + Help: "the last height that was pruned for chunk data pack", + }), + stateStorageDiskTotal: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespaceExecution, Subsystem: subsystemStateStorage, @@ -892,6 +900,11 @@ func (ec *ExecutionCollector) ExecutionLastFinalizedExecutedBlockHeight(height u ec.lastFinalizedExecutedBlockHeightGauge.Set(float64(height)) } +// ExecutionLastChunkDataPackPrunedHeight reports last chunk data pack pruned height +func (ec *ExecutionCollector) ExecutionLastChunkDataPackPrunedHeight(height uint64) { + ec.lastChunkDataPackPrunedHeightGauge.Set(float64(height)) +} + // ForestApproxMemorySize records approximate memory usage of forest (all in-memory trees) func (ec *ExecutionCollector) ForestApproxMemorySize(bytes uint64) { ec.forestApproxMemorySize.Set(float64(bytes)) diff --git a/module/metrics/noop.go b/module/metrics/noop.go index dbc31d27a6b..ed6b6d9572d 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -168,6 +168,8 @@ func (nc *NoopCollector) ExecutionLastExecutedBlockHeight(height uint64) func (nc *NoopCollector) ExecutionLastFinalizedExecutedBlockHeight(height uint64) {} func (nc *NoopCollector) ExecutionBlockExecuted(_ time.Duration, _ module.BlockExecutionResultStats) { } +func (nc *NoopCollector) ExecutionLastChunkDataPackPrunedHeight(height uint64) {} + func (nc *NoopCollector) ExecutionCollectionExecuted(_ time.Duration, _ module.CollectionExecutionResultStats) { } func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string, _ uint) {} diff --git a/module/mock/block_iterator.go b/module/mock/block_iterator.go new file mode 100644 index 00000000000..5dc5b1d95b1 --- /dev/null +++ b/module/mock/block_iterator.go @@ -0,0 +1,92 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// BlockIterator is an autogenerated mock type for the BlockIterator type +type BlockIterator struct { + mock.Mock +} + +// Checkpoint provides a mock function with given fields: +func (_m *BlockIterator) Checkpoint() (uint64, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Checkpoint") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Next provides a mock function with given fields: +func (_m *BlockIterator) Next() (flow.Identifier, bool, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Next") + } + + var r0 flow.Identifier + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func() (flow.Identifier, bool, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() flow.Identifier); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(flow.Identifier) + } + } + + if rf, ok := ret.Get(1).(func() bool); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func() error); ok { + r2 = rf() + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// NewBlockIterator creates a new instance of BlockIterator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBlockIterator(t interface { + mock.TestingT + Cleanup(func()) +}) *BlockIterator { + mock := &BlockIterator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/execution_metrics.go b/module/mock/execution_metrics.go index 619fca3d60e..28af0a9d4fa 100644 --- a/module/mock/execution_metrics.go +++ b/module/mock/execution_metrics.go @@ -86,6 +86,11 @@ func (_m *ExecutionMetrics) ExecutionComputationResultUploaded() { _m.Called() } +// ExecutionLastChunkDataPackPrunedHeight provides a mock function with given fields: height +func (_m *ExecutionMetrics) ExecutionLastChunkDataPackPrunedHeight(height uint64) { + _m.Called(height) +} + // ExecutionLastExecutedBlockHeight provides a mock function with given fields: height func (_m *ExecutionMetrics) ExecutionLastExecutedBlockHeight(height uint64) { _m.Called(height) diff --git a/module/mock/iterator_creator.go b/module/mock/iterator_creator.go new file mode 100644 index 00000000000..f91f3821002 --- /dev/null +++ b/module/mock/iterator_creator.go @@ -0,0 +1,64 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + module "github.com/onflow/flow-go/module" + mock "github.com/stretchr/testify/mock" +) + +// IteratorCreator is an autogenerated mock type for the IteratorCreator type +type IteratorCreator struct { + mock.Mock +} + +// Create provides a mock function with given fields: +func (_m *IteratorCreator) Create() (module.BlockIterator, bool, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 module.BlockIterator + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func() (module.BlockIterator, bool, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() module.BlockIterator); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(module.BlockIterator) + } + } + + if rf, ok := ret.Get(1).(func() bool); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func() error); ok { + r2 = rf() + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// NewIteratorCreator creates a new instance of IteratorCreator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIteratorCreator(t interface { + mock.TestingT + Cleanup(func()) +}) *IteratorCreator { + mock := &IteratorCreator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/iterator_state.go b/module/mock/iterator_state.go new file mode 100644 index 00000000000..36c9352607c --- /dev/null +++ b/module/mock/iterator_state.go @@ -0,0 +1,70 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// IteratorState is an autogenerated mock type for the IteratorState type +type IteratorState struct { + mock.Mock +} + +// LoadState provides a mock function with given fields: +func (_m *IteratorState) LoadState() (uint64, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for LoadState") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SaveState provides a mock function with given fields: _a0 +func (_m *IteratorState) SaveState(_a0 uint64) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for SaveState") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewIteratorState creates a new instance of IteratorState. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIteratorState(t interface { + mock.TestingT + Cleanup(func()) +}) *IteratorState { + mock := &IteratorState{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/iterator_state_reader.go b/module/mock/iterator_state_reader.go new file mode 100644 index 00000000000..524da8023f7 --- /dev/null +++ b/module/mock/iterator_state_reader.go @@ -0,0 +1,52 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// IteratorStateReader is an autogenerated mock type for the IteratorStateReader type +type IteratorStateReader struct { + mock.Mock +} + +// LoadState provides a mock function with given fields: +func (_m *IteratorStateReader) LoadState() (uint64, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for LoadState") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewIteratorStateReader creates a new instance of IteratorStateReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIteratorStateReader(t interface { + mock.TestingT + Cleanup(func()) +}) *IteratorStateReader { + mock := &IteratorStateReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/mock/iterator_state_writer.go b/module/mock/iterator_state_writer.go new file mode 100644 index 00000000000..e9506d5942d --- /dev/null +++ b/module/mock/iterator_state_writer.go @@ -0,0 +1,42 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// IteratorStateWriter is an autogenerated mock type for the IteratorStateWriter type +type IteratorStateWriter struct { + mock.Mock +} + +// SaveState provides a mock function with given fields: _a0 +func (_m *IteratorStateWriter) SaveState(_a0 uint64) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for SaveState") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewIteratorStateWriter creates a new instance of IteratorStateWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIteratorStateWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *IteratorStateWriter { + mock := &IteratorStateWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From fea8d15f09cd08965a62f1b89fe148b270e990db Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 4 Feb 2025 13:48:57 -0800 Subject: [PATCH 22/31] add test cases to chunk data pack pruner core --- engine/execution/pruner/core_test.go | 123 ++++++++++++++++++++ engine/verification/verifier/engine_test.go | 4 +- module/pruner/pruners/chunk_data_pack.go | 4 - storage/badger/results.go | 2 +- utils/unittest/fixtures.go | 13 ++- utils/unittest/mocks/protocol_state.go | 40 ++++++- 6 files changed, 175 insertions(+), 11 deletions(-) create mode 100644 engine/execution/pruner/core_test.go diff --git a/engine/execution/pruner/core_test.go b/engine/execution/pruner/core_test.go new file mode 100644 index 00000000000..236f4864026 --- /dev/null +++ b/engine/execution/pruner/core_test.go @@ -0,0 +1,123 @@ +package pruner + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/verification" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + badgerstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/storage/store" + "github.com/onflow/flow-go/utils/unittest" + unittestMocks "github.com/onflow/flow-go/utils/unittest/mocks" +) + +func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { + unittest.RunWithBadgerDB(t, func(bdb *badger.DB) { + unittest.RunWithPebbleDB(t, func(pdb *pebble.DB) { + // create dependencies + ps := unittestMocks.NewProtocolState() + blocks, rootResult, rootSeal := unittest.ChainFixture(0) + genesis := blocks[0] + require.NoError(t, ps.Bootstrap(genesis, rootResult, rootSeal)) + + ctx, cancel := context.WithCancel(context.Background()) + metrics := metrics.NewNoopCollector() + headers := badgerstorage.NewHeaders(metrics, bdb) + results := badgerstorage.NewExecutionResults(metrics, bdb) + + transactions := badgerstorage.NewTransactions(metrics, bdb) + collections := badgerstorage.NewCollections(bdb, transactions) + chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), collections, 1000) + + lastSealedHeight := 30 + lastFinalizedHeight := lastSealedHeight + 2 // 2 finalized but unsealed + // indexed by height + chunks := make([]*verification.VerifiableChunkData, lastFinalizedHeight+2) + parentID := genesis.ID() + require.NoError(t, headers.Store(genesis.Header)) + for i := 1; i <= lastFinalizedHeight; i++ { + chunk, block := unittest.VerifiableChunkDataFixture(0, func(header *flow.Header) { + header.Height = uint64(i) + header.ParentID = parentID + }) + chunks[i] = chunk // index by height + require.NoError(t, headers.Store(chunk.Header)) + require.NoError(t, bdb.Update(operation.IndexBlockHeight(chunk.Header.Height, chunk.Header.ID()))) + require.NoError(t, results.Store(chunk.Result)) + require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID())) + require.NoError(t, chunkDataPacks.Store([]*flow.ChunkDataPack{chunk.ChunkDataPack})) + require.NoError(t, collections.Store(chunk.ChunkDataPack.Collection)) + // verify that chunk data pack fixture can be found by the result + for _, c := range chunk.Result.Chunks { + chunkID := c.ID() + require.Equal(t, chunk.ChunkDataPack.ID(), chunkID) + _, err := chunkDataPacks.ByChunkID(chunkID) + require.NoError(t, err) + } + // verify the result can be found by block + _, err := results.ByBlockID(chunk.Header.ID()) + require.NoError(t, err) + + // Finalize block + require.NoError(t, ps.Extend(block)) + require.NoError(t, ps.Finalize(block.ID())) + parentID = block.ID() + } + + // last seale and executed is the last sealed + require.NoError(t, bdb.Update(operation.InsertExecutedBlock(chunks[lastFinalizedHeight].Header.ID()))) + lastSealed := chunks[lastSealedHeight].Header + require.NoError(t, ps.MakeSeal(lastSealed.ID())) + + // create config + cfg := PruningConfig{ + Threshold: 10, + BatchSize: 3, + SleepAfterEachBatchCommit: 1 * time.Millisecond, + SleepAfterEachIteration: 100 * time.Millisecond, + } + + // wait long enough for chunks data packs are pruned + go (func(cancel func()) { + time.Sleep(1 * time.Second) + // cancel the context to stop the loop + cancel() + })(cancel) + + require.NoError(t, LoopPruneExecutionDataFromRootToLatestSealed( + unittest.Logger(), metrics, ctx, ps, bdb, headers, chunkDataPacks, results, pdb, cfg, + )) + + // verify the chunk data packs beyond the threshold are pruned + // if lastSealedHeight is 2, threshold is 1, then block height 1 and 2 will be stored, + // and we only prune block 1, the last pruned height is 1 (block 2 is not pruned) + // so the lastPrunedHeight should be calculated as lastSealedHeight (2) - threshold(1) = 1 + lastPrunedHeight := lastSealedHeight - int(cfg.Threshold) // 90 + for i := 1; i <= lastPrunedHeight; i++ { + expected := chunks[i] + _, err := chunkDataPacks.ByChunkID(expected.ChunkDataPack.ID()) + require.Error(t, err, fmt.Errorf("chunk data pack at height %v should be pruned, but not", i)) + require.ErrorIs(t, err, storage.ErrNotFound) + } + + // verify the chunk data packs within the threshold are not pruned + for i := lastPrunedHeight + 1; i <= lastFinalizedHeight; i++ { + expected := chunks[i] + actual, err := chunkDataPacks.ByChunkID(expected.ChunkDataPack.ID()) + require.NoError(t, err) + require.Equal(t, expected.ChunkDataPack, actual) + } + }) + }) +} diff --git a/engine/verification/verifier/engine_test.go b/engine/verification/verifier/engine_test.go index ee24d16eba1..06c06410a6f 100644 --- a/engine/verification/verifier/engine_test.go +++ b/engine/verification/verifier/engine_test.go @@ -121,7 +121,7 @@ func (suite *VerifierEngineTestSuite) TestVerifyHappyPath() { consensusNodes := unittest.IdentityListFixture(1, unittest.WithRole(flow.RoleConsensus)) suite.ss.On("Identities", testifymock.Anything).Return(consensusNodes, nil) - vChunk := unittest.VerifiableChunkDataFixture(uint64(0)) + vChunk, _ := unittest.VerifiableChunkDataFixture(uint64(0)) tests := []struct { name string @@ -301,7 +301,7 @@ func (suite *VerifierEngineTestSuite) TestVerifyUnhappyPaths() { } for i, test := range tests { - vc := unittest.VerifiableChunkDataFixture(uint64(i)) + vc, _ := unittest.VerifiableChunkDataFixture(uint64(i)) expectedErr := test.errFn(vc) suite.chunkVerifier.On("Verify", vc).Return(nil, expectedErr).Once() diff --git a/module/pruner/pruners/chunk_data_pack.go b/module/pruner/pruners/chunk_data_pack.go index 71517a4522a..433bd41ef74 100644 --- a/module/pruner/pruners/chunk_data_pack.go +++ b/module/pruner/pruners/chunk_data_pack.go @@ -36,10 +36,6 @@ func (p *ChunkDataPackPruner) PruneByBlockID(blockID flow.Identifier, batchWrite chunkID := chunk.ID() // remove chunk data pack err := p.chunkDataPacks.BatchRemove(chunkID, batchWriter) - if errors.Is(err, storage.ErrNotFound) { - continue - } - if err != nil { return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err) } diff --git a/storage/badger/results.go b/storage/badger/results.go index d4d1a4525b0..6c3e0c08337 100644 --- a/storage/badger/results.go +++ b/storage/badger/results.go @@ -38,7 +38,7 @@ func NewExecutionResults(collector module.CacheMetrics, db *badger.DB) *Executio res := &ExecutionResults{ db: db, - cache: newCache[flow.Identifier, *flow.ExecutionResult](collector, metrics.ResourceResult, + cache: newCache(collector, metrics.ResourceResult, withLimit[flow.Identifier, *flow.ExecutionResult](flow.DefaultTransactionExpiry+100), withStore(store), withRetrieve(retrieve)), diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 25435338910..5d280e746b6 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -1541,7 +1541,7 @@ func RegisterIDFixture() flow.RegisterID { // VerifiableChunkDataFixture returns a complete verifiable chunk with an // execution receipt referencing the block/collections. -func VerifiableChunkDataFixture(chunkIndex uint64) *verification.VerifiableChunkData { +func VerifiableChunkDataFixture(chunkIndex uint64, opts ...func(*flow.Header)) (*verification.VerifiableChunkData, *flow.Block) { guarantees := make([]*flow.CollectionGuarantee, 0) @@ -1558,6 +1558,9 @@ func VerifiableChunkDataFixture(chunkIndex uint64) *verification.VerifiableChunk Seals: nil, } header := BlockHeaderFixture() + for _, opt := range opts { + opt(header) + } header.PayloadHash = payload.Hash() block := flow.Block{ @@ -1597,13 +1600,17 @@ func VerifiableChunkDataFixture(chunkIndex uint64) *verification.VerifiableChunk endState = result.Chunks[index+1].StartState } + chunkDataPack := ChunkDataPackFixture(chunk.ID(), func(c *flow.ChunkDataPack) { + c.Collection = &col + }) + return &verification.VerifiableChunkData{ Chunk: &chunk, Header: block.Header, Result: &result, - ChunkDataPack: ChunkDataPackFixture(result.ID()), + ChunkDataPack: chunkDataPack, EndState: endState, - } + }, &block } // ChunkDataResponseMsgFixture creates a chunk data response message with a single-transaction collection, and random chunk ID. diff --git a/utils/unittest/mocks/protocol_state.go b/utils/unittest/mocks/protocol_state.go index 79174c04f0b..6de071f1eba 100644 --- a/utils/unittest/mocks/protocol_state.go +++ b/utils/unittest/mocks/protocol_state.go @@ -25,11 +25,14 @@ type ProtocolState struct { children map[flow.Identifier][]flow.Identifier heights map[uint64]*flow.Block finalized uint64 + sealed uint64 root *flow.Block result *flow.ExecutionResult seal *flow.Seal } +var _ protocol.State = (*ProtocolState)(nil) + func NewProtocolState() *ProtocolState { return &ProtocolState{ blocks: make(map[flow.Identifier]*flow.Block), @@ -136,6 +139,20 @@ func (ps *ProtocolState) Final() protocol.Snapshot { return snapshot } +func (ps *ProtocolState) Sealed() protocol.Snapshot { + ps.Lock() + defer ps.Unlock() + + sealed, ok := ps.heights[ps.sealed] + if !ok { + return nil + } + + snapshot := new(protocolmock.Snapshot) + snapshot.On("Head").Return(sealed.Header, nil) + return snapshot +} + func pending(ps *ProtocolState, blockID flow.Identifier) []flow.Identifier { var pendingIDs []flow.Identifier pendingIDs, ok := ps.children[blockID] @@ -179,7 +196,7 @@ func (m *ProtocolState) Extend(block *flow.Block) error { } if _, ok := m.blocks[block.Header.ParentID]; !ok { - return fmt.Errorf("could not retrieve parent") + return fmt.Errorf("could not retrieve parent %v", block.Header.ParentID) } m.blocks[id] = block @@ -224,3 +241,24 @@ func (m *ProtocolState) Finalize(blockID flow.Identifier) error { return nil } + +func (m *ProtocolState) MakeSeal(blockID flow.Identifier) error { + m.Lock() + defer m.Unlock() + + block, ok := m.blocks[blockID] + if !ok { + return fmt.Errorf("could not retrieve final header") + } + + if block.Header.Height <= m.sealed { + return fmt.Errorf("could not seal old blocks") + } + + if block.Header.Height >= m.finalized { + return fmt.Errorf("incorrect sealed height sealed %v, finalized %v", block.Header.Height, m.finalized) + } + + m.sealed = block.Header.Height + return nil +} From bd0c75db8ea4cc3dc0b0e3d6090eb138a4e2b969 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 4 Feb 2025 16:10:49 -0800 Subject: [PATCH 23/31] add not found check --- module/pruner/pruners/chunk_data_pack.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/module/pruner/pruners/chunk_data_pack.go b/module/pruner/pruners/chunk_data_pack.go index 433bd41ef74..71517a4522a 100644 --- a/module/pruner/pruners/chunk_data_pack.go +++ b/module/pruner/pruners/chunk_data_pack.go @@ -36,6 +36,10 @@ func (p *ChunkDataPackPruner) PruneByBlockID(blockID flow.Identifier, batchWrite chunkID := chunk.ID() // remove chunk data pack err := p.chunkDataPacks.BatchRemove(chunkID, batchWriter) + if errors.Is(err, storage.ErrNotFound) { + continue + } + if err != nil { return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err) } From 955355ce7cad20b65600ebaa3fd7f925cd2a0c61 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 12:52:53 -0800 Subject: [PATCH 24/31] add IteratorState to IteratorCreator --- engine/execution/pruner/core.go | 26 ++++++++++++++------------ module/block_iterator.go | 1 + module/block_iterator/creator.go | 4 ++++ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 8225709377a..2cf792578a9 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -16,7 +16,7 @@ import ( "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation/pebbleimpl" - pebblestorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" ) const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey" @@ -33,9 +33,11 @@ func LoopPruneExecutionDataFromRootToLatestSealed( chunkDataPacksDB *pebble.DB, config PruningConfig, ) error { + + chunksDB := pebbleimpl.ToDB(chunkDataPacksDB) // the creator can be reused to create new block iterator that can iterate from the last // checkpoint to the new latest (sealed) block. - creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunkDataPacksDB, config) + creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunksDB, config) if err != nil { return err } @@ -46,7 +48,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( log, ctx, // for cancelling the iteration when the context is done config, - chunkDataPacksDB, + chunksDB, NewChunKDataPackPruner(chunkDataPacks, results), ) @@ -98,7 +100,7 @@ func makeBlockIteratorCreator( state protocol.State, badgerDB *badger.DB, headers storage.Headers, - chunkDataPacksDB *pebble.DB, + chunkDataPacksDB storage.DB, config PruningConfig, ) (module.IteratorCreator, func() (nextToPrune uint64, latestToPrune uint64, err error), error) { root := state.Params().SealedRoot() @@ -115,11 +117,11 @@ func makeBlockIteratorCreator( threshold: config.Threshold, } - progress := pebblestorage.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey) + initializer := store.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey) creator, err := block_iterator.NewHeightBasedCreator( headers.BlockIDByHeight, - progress, + initializer, root, latest.Latest, ) @@ -128,8 +130,10 @@ func makeBlockIteratorCreator( return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) } - return creator, func() (uint64, uint64, error) { - next, err := progress.ProcessedIndex() + stateReader := creator.IteratorState() + + return creator, func() (nextToPrune uint64, latestToPrune uint64, err error) { + next, err := stateReader.LoadState() if err != nil { return 0, 0, fmt.Errorf("failed to get next height to prune: %w", err) } @@ -146,7 +150,7 @@ func makeBlockIteratorCreator( // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that // takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. func makeIterateAndPruneAll( - log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB *pebble.DB, prune *ChunkDataPackPruner, + log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner, ) func(iter module.BlockIterator) error { isBatchFull := func(counter int) bool { return uint(counter) >= config.BatchSize @@ -156,10 +160,8 @@ func makeIterateAndPruneAll( time.Sleep(config.SleepAfterEachBatchCommit) } - db := pebbleimpl.ToDB(chunkDataPacksDB) - return func(iter module.BlockIterator) error { - err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, db, isBatchFull, sleeper) + err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, chunkDataPacksDB, isBatchFull, sleeper) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } diff --git a/module/block_iterator.go b/module/block_iterator.go index dc6ba7e7c37..fc1dfa637e0 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -65,4 +65,5 @@ type BlockIterator interface { // any error returned are exception type IteratorCreator interface { Create() (fromSavedIndexToLatest BlockIterator, hasNext bool, exception error) + IteratorState() IteratorStateReader } diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 05550831e07..746c95ffd4d 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -61,6 +61,10 @@ func (c *Creator) Create() (iter module.BlockIterator, hasNext bool, exception e return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), true, nil } +func (c *Creator) IteratorState() module.IteratorStateReader { + return c.progress +} + // NewHeightBasedCreator creates a block iterator that iterates through blocks // from root to the latest (either finalized or sealed) by height. func NewHeightBasedCreator( From c84f885535f27d543e8a7c685005ab891b0cab2c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 13:24:36 -0800 Subject: [PATCH 25/31] update mocks --- module/mock/iterator_creator.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/module/mock/iterator_creator.go b/module/mock/iterator_creator.go index f91f3821002..38fe0e78f22 100644 --- a/module/mock/iterator_creator.go +++ b/module/mock/iterator_creator.go @@ -49,6 +49,26 @@ func (_m *IteratorCreator) Create() (module.BlockIterator, bool, error) { return r0, r1, r2 } +// IteratorState provides a mock function with given fields: +func (_m *IteratorCreator) IteratorState() module.IteratorStateReader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IteratorState") + } + + var r0 module.IteratorStateReader + if rf, ok := ret.Get(0).(func() module.IteratorStateReader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(module.IteratorStateReader) + } + } + + return r0 +} + // NewIteratorCreator creates a new instance of IteratorCreator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIteratorCreator(t interface { From f49d87a94cdaf99c30844c3057f424b2f7e740ee Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 16:07:37 -0800 Subject: [PATCH 26/31] address review comments --- engine/execution/pruner/core.go | 4 ++-- engine/execution/pruner/core_test.go | 2 +- engine/execution/pruner/engine.go | 4 ++-- engine/execution/pruner/executor.go | 2 +- module/block_iterator/executor/executor.go | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 2cf792578a9..32a782956b4 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -22,9 +22,9 @@ import ( const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey" func LoopPruneExecutionDataFromRootToLatestSealed( + ctx context.Context, log zerolog.Logger, metrics module.ExecutionMetrics, - ctx context.Context, state protocol.State, badgerDB *badger.DB, headers storage.Headers, @@ -49,7 +49,7 @@ func LoopPruneExecutionDataFromRootToLatestSealed( ctx, // for cancelling the iteration when the context is done config, chunksDB, - NewChunKDataPackPruner(chunkDataPacks, results), + NewChunkDataPackPruner(chunkDataPacks, results), ) for { diff --git a/engine/execution/pruner/core_test.go b/engine/execution/pruner/core_test.go index 236f4864026..105d562990f 100644 --- a/engine/execution/pruner/core_test.go +++ b/engine/execution/pruner/core_test.go @@ -96,7 +96,7 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) { })(cancel) require.NoError(t, LoopPruneExecutionDataFromRootToLatestSealed( - unittest.Logger(), metrics, ctx, ps, bdb, headers, chunkDataPacks, results, pdb, cfg, + ctx, unittest.Logger(), metrics, ps, bdb, headers, chunkDataPacks, results, pdb, cfg, )) // verify the chunk data packs beyond the threshold are pruned diff --git a/engine/execution/pruner/engine.go b/engine/execution/pruner/engine.go index b871cfdcc4f..4b347dd043f 100644 --- a/engine/execution/pruner/engine.go +++ b/engine/execution/pruner/engine.go @@ -30,8 +30,8 @@ func NewChunkDataPackPruningEngine( ready() err := LoopPruneExecutionDataFromRootToLatestSealed( - log.With().Str("component", "CDP-pruner").Logger(), metrics, - ctx, state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config) + ctx, log.With().Str("component", "CDP-pruner").Logger(), metrics, + state, badgerDB, headers, chunkDataPacks, results, chunkDataPacksDB, config) if err != nil { ctx.Throw(err) } diff --git a/engine/execution/pruner/executor.go b/engine/execution/pruner/executor.go index 754133c6739..3460ef8e98c 100644 --- a/engine/execution/pruner/executor.go +++ b/engine/execution/pruner/executor.go @@ -13,7 +13,7 @@ type ChunkDataPackPruner struct { var _ executor.IterationExecutor = (*ChunkDataPackPruner)(nil) -func NewChunKDataPackPruner(chunkDataPacks storage.ChunkDataPacks, results storage.ExecutionResults) *ChunkDataPackPruner { +func NewChunkDataPackPruner(chunkDataPacks storage.ChunkDataPacks, results storage.ExecutionResults) *ChunkDataPackPruner { return &ChunkDataPackPruner{ ChunkDataPackPruner: pruners.NewChunkDataPackPruner(chunkDataPacks, results), } diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go index b6082d0e755..87ac8a0e876 100644 --- a/module/block_iterator/executor/executor.go +++ b/module/block_iterator/executor/executor.go @@ -57,8 +57,8 @@ func IterateExecuteAndCommitInBatch( total := 0 defer func() { log.Info().Str("duration", time.Since(startTime).String()). - Int("total_block_pruned", total). - Msg("pruning completed") + Int("total_block_executed", total). + Msg("block iteration and execution completed") }() for { @@ -94,10 +94,10 @@ func IterateExecuteAndCommitInBatch( break } - // prune all the data indexed by the block + // execute all the data indexed by the block err = executor.ExecuteByBlockID(blockID, batch) if err != nil { - return fmt.Errorf("failed to prune by block ID %v: %w", blockID, err) + return fmt.Errorf("failed to execute by block ID %v: %w", blockID, err) } iteratedCountInCurrentBatch++ total++ From e8c08a0523b07fdb0b5d3bab23ac7c23658db232 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 16:11:38 -0800 Subject: [PATCH 27/31] add comments --- engine/execution/pruner/core.go | 8 +++++++- module/block_iterator.go | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 32a782956b4..fd8eae69b89 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -102,7 +102,13 @@ func makeBlockIteratorCreator( headers storage.Headers, chunkDataPacksDB storage.DB, config PruningConfig, -) (module.IteratorCreator, func() (nextToPrune uint64, latestToPrune uint64, err error), error) { +) ( + module.IteratorCreator, + // this is for logging purpose, so that after each round of pruning, + // we can log and report metrics about the next and latest to prune + func() (nextToPrune uint64, latestToPrune uint64, err error), + error, // any error are exception +) { root := state.Params().SealedRoot() sealedAndExecuted := latest.NewLatestSealedAndExecuted( root, diff --git a/module/block_iterator.go b/module/block_iterator.go index fc1dfa637e0..9a3fc6387d2 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -64,6 +64,10 @@ type BlockIterator interface { // if there is no block to iterate, hasNext is false // any error returned are exception type IteratorCreator interface { + // Create return the next block iterator Create() (fromSavedIndexToLatest BlockIterator, hasNext bool, exception error) + + // IteratorState returns the iterate state, useful to know the progress of the iterator + // after each round of iteration IteratorState() IteratorStateReader } From b057655c11614184dbf813d160b41a2691914437 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 20 Feb 2025 10:49:10 -0800 Subject: [PATCH 28/31] address review comments --- cmd/execution_config.go | 4 ++-- engine/execution/pruner/config.go | 7 +++++-- engine/execution/pruner/core.go | 10 +++++++++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/cmd/execution_config.go b/cmd/execution_config.go index e1638e4a152..2368ea5fb92 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -137,9 +137,9 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node") flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-config-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days") - flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-config-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000") + flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-config-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1200") flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-config-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s") - flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-config-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h") + flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-config-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default max int64") } func (exeConf *ExecutionConfig) ValidateFlags() error { diff --git a/engine/execution/pruner/config.go b/engine/execution/pruner/config.go index c48200a8b0c..1f0286b999c 100644 --- a/engine/execution/pruner/config.go +++ b/engine/execution/pruner/config.go @@ -1,6 +1,9 @@ package pruner -import "time" +import ( + "math" + "time" +) type PruningConfig struct { Threshold uint64 // The threshold is the number of blocks that we want to keep in the database. @@ -19,5 +22,5 @@ var DefaultConfig = PruningConfig{ // the sleep time should be smaller than 1000 seconds, otherwise, // the pruner is not able to keep up with the block generation. SleepAfterEachBatchCommit: 12 * time.Second, - SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out. + SleepAfterEachIteration: math.MaxInt64, // by default it's disabled so that we can slowly roll this feature out. } diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index fd8eae69b89..09e52a44f94 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -163,7 +163,15 @@ func makeIterateAndPruneAll( } sleeper := func() { - time.Sleep(config.SleepAfterEachBatchCommit) + // if the context is done, return immediately + // otherwise sleep for the configured time + for { + select { + case <-ctx.Done(): + return + case <-time.After(config.SleepAfterEachBatchCommit): + } + } } return func(iter module.BlockIterator) error { From 1828c84dcb0362a1e1645ec9b544575d6d764d2f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 20 Feb 2025 11:45:43 -0800 Subject: [PATCH 29/31] reorder logs and context arguments --- engine/execution/pruner/core.go | 2 +- module/block_iterator/executor/executor.go | 2 +- module/block_iterator/executor/executor_test.go | 9 +++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index 09e52a44f94..fe91b29f065 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -175,7 +175,7 @@ func makeIterateAndPruneAll( } return func(iter module.BlockIterator) error { - err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, chunkDataPacksDB, isBatchFull, sleeper) + err := executor.IterateExecuteAndCommitInBatch(ctx, log, iter, prune, chunkDataPacksDB, isBatchFull, sleeper) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go index 87ac8a0e876..fbcd3d1c84c 100644 --- a/module/block_iterator/executor/executor.go +++ b/module/block_iterator/executor/executor.go @@ -34,9 +34,9 @@ type IsBatchFull func(iteratedCountInCurrentBatch int) bool // can be resumed after restart. // it sleeps after each batch is committed in order to minimizing the impact on the system. func IterateExecuteAndCommitInBatch( - log zerolog.Logger, // ctx is used for cancelling the iteration when the context is done ctx context.Context, + log zerolog.Logger, // iterator decides how to iterate over blocks iter module.BlockIterator, // executor decides what data in the storage will be updated for a certain block diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go index a5ff0a7555e..76130c15dce 100644 --- a/module/block_iterator/executor/executor_test.go +++ b/module/block_iterator/executor/executor_test.go @@ -56,8 +56,7 @@ func TestExecute(t *testing.T) { // prune blocks batchSize := 3 require.NoError(t, executor.IterateExecuteAndCommitInBatch( - unittest.Logger(), - context.Background(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + context.Background(), unittest.Logger(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // expect all blocks are pruned for _, b := range bs { @@ -117,8 +116,7 @@ func TestExecuteCanBeResumed(t *testing.T) { // prune blocks until interrupted at block 5 batchSize := 3 err := executor.IterateExecuteAndCommitInBatch( - unittest.Logger(), - context.Background(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) + context.Background(), unittest.Logger(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) // expect all blocks are pruned @@ -150,8 +148,7 @@ func TestExecuteCanBeResumed(t *testing.T) { } require.NoError(t, executor.IterateExecuteAndCommitInBatch( - unittest.Logger(), - context.Background(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + context.Background(), unittest.Logger(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) // verify all blocks are pruned for _, b := range bs { From cefd7fdb2e2477d2a8e422599d8be580e3a45a12 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 20 Feb 2025 14:03:49 -0800 Subject: [PATCH 30/31] simplify executor.IterateExecuteAndCommitInBatch --- engine/execution/pruner/core.go | 27 ++++------------ module/block_iterator/executor/executor.go | 26 +++++++--------- .../block_iterator/executor/executor_test.go | 31 +++++-------------- 3 files changed, 25 insertions(+), 59 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index fe91b29f065..d3532ba5e38 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -45,8 +45,8 @@ func LoopPruneExecutionDataFromRootToLatestSealed( // the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks // and decides how to prune the chunk data packs. iterateAndPruneAll := makeIterateAndPruneAll( - log, ctx, // for cancelling the iteration when the context is done + log, config, chunksDB, NewChunkDataPackPruner(chunkDataPacks, results), @@ -64,8 +64,8 @@ func LoopPruneExecutionDataFromRootToLatestSealed( Msgf("execution data pruning will start in %s at %s", config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) - // last pruned is nextToPrune - 1. - // it won't underflow, because nextToPrune starts from root + 1 + // last pruned is nextToPrune - 1. + // it won't underflow, because nextToPrune starts from root + 1 metrics.ExecutionLastChunkDataPackPrunedHeight(nextToPrune - 1) select { @@ -156,26 +156,11 @@ func makeBlockIteratorCreator( // makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that // takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. func makeIterateAndPruneAll( - log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner, + ctx context.Context, log zerolog.Logger, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner, ) func(iter module.BlockIterator) error { - isBatchFull := func(counter int) bool { - return uint(counter) >= config.BatchSize - } - - sleeper := func() { - // if the context is done, return immediately - // otherwise sleep for the configured time - for { - select { - case <-ctx.Done(): - return - case <-time.After(config.SleepAfterEachBatchCommit): - } - } - } - return func(iter module.BlockIterator) error { - err := executor.IterateExecuteAndCommitInBatch(ctx, log, iter, prune, chunkDataPacksDB, isBatchFull, sleeper) + err := executor.IterateExecuteAndCommitInBatch( + ctx, log, iter, prune, chunkDataPacksDB, config.BatchSize, config.SleepAfterEachBatchCommit) if err != nil { return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) } diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go index fbcd3d1c84c..5d2ee811aec 100644 --- a/module/block_iterator/executor/executor.go +++ b/module/block_iterator/executor/executor.go @@ -20,14 +20,6 @@ type IterationExecutor interface { ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) } -// Sleeper allows the caller to slow down the iteration after each batch is committed -type Sleeper func() - -// IsBatchFull decides the batch size for each commit. -// it takes the number of blocks iterated in the current batch, -// and returns whether the batch is full. -type IsBatchFull func(iteratedCountInCurrentBatch int) bool - // IterateExecuteAndCommitInBatch iterates over blocks and execute tasks with data that was indexed by the block. // the update to the storage database is done in batch, and the batch is committed when it's full. // the iteration progress is saved after batch is committed, so that the iteration progress @@ -44,14 +36,14 @@ func IterateExecuteAndCommitInBatch( // db creates a new batch for each block, and passed to the executor for adding updates, // the batch is commited when it's full db storage.DB, - // isBatchFull decides the batch size for each commit. - isBatchFull IsBatchFull, - // sleeper allows the caller to slow down the iteration after each batch is committed + // batchSize decides the batch size for each commit. + batchSize uint, + // sleepAfterEachBatchCommit allows the caller to slow down the iteration after each batch is committed // in order to minimize the impact on the system - sleeper Sleeper, + sleepAfterEachBatchCommit time.Duration, ) error { batch := db.NewBatch() - iteratedCountInCurrentBatch := 0 + iteratedCountInCurrentBatch := uint(0) startTime := time.Now() total := 0 @@ -103,7 +95,7 @@ func IterateExecuteAndCommitInBatch( total++ // if batch is full, commit and sleep - if isBatchFull(iteratedCountInCurrentBatch) { + if iteratedCountInCurrentBatch >= batchSize { // commit the batch and save the progress err := commitAndCheckpoint(log, batch, iter) if err != nil { @@ -111,7 +103,11 @@ func IterateExecuteAndCommitInBatch( } // wait a bit to minimize the impact on the system - sleeper() + select { + case <-ctx.Done(): + return nil + case <-time.After(sleepAfterEachBatchCommit): + } // create a new batch, and reset iteratedCountInCurrentBatch batch = db.NewBatch() diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go index 76130c15dce..578e04871bd 100644 --- a/module/block_iterator/executor/executor_test.go +++ b/module/block_iterator/executor/executor_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -51,12 +52,11 @@ func TestExecute(t *testing.T) { }, } - sleeper := &sleeper{} - // prune blocks - batchSize := 3 + batchSize := uint(3) + nosleep := time.Duration(0) require.NoError(t, executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + context.Background(), unittest.Logger(), iter, pr, pdb, batchSize, nosleep)) // expect all blocks are pruned for _, b := range bs { @@ -65,9 +65,6 @@ func TestExecute(t *testing.T) { err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err) } - - // the sleeper should be called 3 times, because blockCount blocks will be pruned in 3 batchs. - require.Equal(t, blockCount/batchSize, sleeper.count) }) } @@ -111,12 +108,11 @@ func TestExecuteCanBeResumed(t *testing.T) { }, } - sleeper := &sleeper{} - // prune blocks until interrupted at block 5 - batchSize := 3 + batchSize := uint(3) + nosleep := time.Duration(0) err := executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) + context.Background(), unittest.Logger(), iter, pruneUntilInterrupted, pdb, batchSize, nosleep) require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) // expect all blocks are pruned @@ -135,9 +131,6 @@ func TestExecuteCanBeResumed(t *testing.T) { require.NoError(t, operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)) } - // the sleeper should be called once - require.Equal(t, 5/batchSize, sleeper.count) - // now resume the pruning iterToAll := restoreBlockIterator(iter.blocks, iter.stored) @@ -148,7 +141,7 @@ func TestExecuteCanBeResumed(t *testing.T) { } require.NoError(t, executor.IterateExecuteAndCommitInBatch( - context.Background(), unittest.Logger(), iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + context.Background(), unittest.Logger(), iterToAll, pr, pdb, batchSize, nosleep)) // verify all blocks are pruned for _, b := range bs { @@ -191,14 +184,6 @@ func restoreBlockIterator(blocks []flow.Identifier, stored int) *iterator { } } -type sleeper struct { - count int -} - -func (s *sleeper) Sleep() { - s.count++ -} - type testExecutor struct { executeByBlockID func(id flow.Identifier, batchWriter storage.ReaderBatchWriter) error } From 35182748c18381951a3a443387bf79219e0795ab Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 20 Feb 2025 14:09:47 -0800 Subject: [PATCH 31/31] remove makeIterateAndPruneAll --- engine/execution/pruner/core.go | 34 +++++++++++---------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/engine/execution/pruner/core.go b/engine/execution/pruner/core.go index d3532ba5e38..ed7decc170f 100644 --- a/engine/execution/pruner/core.go +++ b/engine/execution/pruner/core.go @@ -42,15 +42,18 @@ func LoopPruneExecutionDataFromRootToLatestSealed( return err } - // the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks + pruner := NewChunkDataPackPruner(chunkDataPacks, results) + + // iterateAndPruneAll takes a block iterator and iterates through all the blocks // and decides how to prune the chunk data packs. - iterateAndPruneAll := makeIterateAndPruneAll( - ctx, // for cancelling the iteration when the context is done - log, - config, - chunksDB, - NewChunkDataPackPruner(chunkDataPacks, results), - ) + iterateAndPruneAll := func(iter module.BlockIterator) error { + err := executor.IterateExecuteAndCommitInBatch( + ctx, log, iter, pruner, chunksDB, config.BatchSize, config.SleepAfterEachBatchCommit) + if err != nil { + return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) + } + return nil + } for { nextToPrune, latestToPrune, err := getNextAndLatest() @@ -152,18 +155,3 @@ func makeBlockIteratorCreator( return next, header.Height, nil }, nil } - -// makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that -// takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. -func makeIterateAndPruneAll( - ctx context.Context, log zerolog.Logger, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner, -) func(iter module.BlockIterator) error { - return func(iter module.BlockIterator) error { - err := executor.IterateExecuteAndCommitInBatch( - ctx, log, iter, prune, chunkDataPacksDB, config.BatchSize, config.SleepAfterEachBatchCommit) - if err != nil { - return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) - } - return nil - } -}