Skip to content

Commit

Permalink
Merge pull request #6946 from onflow/leo/cdp-engine
Browse files Browse the repository at this point in the history
[Chunk Data Pack Pruner] Add Engine for pruning chunk data pack
  • Loading branch information
zhangchiqing authored Feb 21, 2025
2 parents bd39edc + 3518274 commit 0f99dc5
Show file tree
Hide file tree
Showing 29 changed files with 1,099 additions and 53 deletions.
29 changes: 29 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -55,6 +56,7 @@ import (
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
"github.com/onflow/flow-go/engine/execution/rpc"
"github.com/onflow/flow-go/engine/execution/scripts"
"github.com/onflow/flow-go/engine/execution/state"
Expand Down Expand Up @@ -139,6 +141,8 @@ type ExecutionNode struct {
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
chunkDataPackDB *pebble.DB
chunkDataPacks storageerr.ChunkDataPacks
providerEngine exeprovider.ProviderEngine
checkerEng *checker.Engine
syncCore *chainsync.Core
Expand Down Expand Up @@ -226,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
// TODO: will re-visit this once storehouse has implemented new WAL for checkpoint file of
// payloadless trie.
// Component("execution data pruner", exeNode.LoadExecutionDataPruner).
Component("execution db pruner", exeNode.LoadExecutionDBPruner).
Component("blob service", exeNode.LoadBlobService).
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Expand Down Expand Up @@ -743,6 +748,9 @@ func (exeNode *ExecutionNode) LoadExecutionState(
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

exeNode.chunkDataPackDB = chunkDataPackDB
exeNode.chunkDataPacks = chunkDataPacks

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
exeNode.serviceEvents = storage.NewServiceEvents(node.Metrics.Cache, node.DB)
Expand Down Expand Up @@ -987,6 +995,27 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner(
return exeNode.executionDataPruner, err
}

func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.ReadyDoneAware, error) {
cfg := exepruner.PruningConfig{
Threshold: exeNode.exeConf.pruningConfigThreshold,
BatchSize: exeNode.exeConf.pruningConfigBatchSize,
SleepAfterEachBatchCommit: exeNode.exeConf.pruningConfigSleepAfterCommit,
SleepAfterEachIteration: exeNode.exeConf.pruningConfigSleepAfterIteration,
}

return exepruner.NewChunkDataPackPruningEngine(
node.Logger,
exeNode.collector,
node.State,
node.DB,
node.Storage.Headers,
exeNode.chunkDataPacks,
exeNode.results,
exeNode.chunkDataPackDB,
cfg,
), nil
}

func (exeNode *ExecutionNode) LoadCheckerEngine(
node *NodeConfig,
) (
Expand Down
10 changes: 10 additions & 0 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/onflow/flow-go/engine/common/provider"
"github.com/onflow/flow-go/engine/execution/computation/query"
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool"
Expand Down Expand Up @@ -69,6 +70,11 @@ type ExecutionConfig struct {
enableStorehouse bool
enableChecker bool
publicAccessID string

pruningConfigThreshold uint64
pruningConfigBatchSize uint
pruningConfigSleepAfterCommit time.Duration
pruningConfigSleepAfterIteration time.Duration
}

func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
Expand Down Expand Up @@ -130,6 +136,10 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true")
flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node")

flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-config-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days")
flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-config-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1200")
flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-config-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s")
flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-config-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default max int64")
}

func (exeConf *ExecutionConfig) ValidateFlags() error {
Expand Down
26 changes: 26 additions & 0 deletions engine/execution/pruner/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pruner

import (
"math"
"time"
)

type PruningConfig struct {
Threshold uint64 // The threshold is the number of blocks that we want to keep in the database.
BatchSize uint // The batch size is the number of blocks that we want to delete in one batch.
SleepAfterEachBatchCommit time.Duration // The sleep time after each batch commit.
SleepAfterEachIteration time.Duration // The sleep time after each iteration.
}

var DefaultConfig = PruningConfig{
Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second
BatchSize: 1200,
// when choosing a value, consider the batch size and block time,
// for instance,
// the block time is 1.2 block/second, and the batch size is 1200,
// so the batch commit time is 1200 / 1.2 = 1000 seconds.
// the sleep time should be smaller than 1000 seconds, otherwise,
// the pruner is not able to keep up with the block generation.
SleepAfterEachBatchCommit: 12 * time.Second,
SleepAfterEachIteration: math.MaxInt64, // by default it's disabled so that we can slowly roll this feature out.
}
157 changes: 157 additions & 0 deletions engine/execution/pruner/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package pruner

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/block_iterator"
"github.com/onflow/flow-go/module/block_iterator/executor"
"github.com/onflow/flow-go/module/block_iterator/latest"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
"github.com/onflow/flow-go/storage/store"
)

const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey"

func LoopPruneExecutionDataFromRootToLatestSealed(
ctx context.Context,
log zerolog.Logger,
metrics module.ExecutionMetrics,
state protocol.State,
badgerDB *badger.DB,
headers storage.Headers,
chunkDataPacks storage.ChunkDataPacks,
results storage.ExecutionResults,
chunkDataPacksDB *pebble.DB,
config PruningConfig,
) error {

chunksDB := pebbleimpl.ToDB(chunkDataPacksDB)
// the creator can be reused to create new block iterator that can iterate from the last
// checkpoint to the new latest (sealed) block.
creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunksDB, config)
if err != nil {
return err
}

pruner := NewChunkDataPackPruner(chunkDataPacks, results)

// iterateAndPruneAll takes a block iterator and iterates through all the blocks
// and decides how to prune the chunk data packs.
iterateAndPruneAll := func(iter module.BlockIterator) error {
err := executor.IterateExecuteAndCommitInBatch(
ctx, log, iter, pruner, chunksDB, config.BatchSize, config.SleepAfterEachBatchCommit)
if err != nil {
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err)
}
return nil
}

for {
nextToPrune, latestToPrune, err := getNextAndLatest()
if err != nil {
return fmt.Errorf("failed to get next and latest to prune: %w", err)
}

log.Info().
Uint64("nextToPrune", nextToPrune).
Uint64("latestToPrune", latestToPrune).
Msgf("execution data pruning will start in %s at %s",
config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC())

// last pruned is nextToPrune - 1.
// it won't underflow, because nextToPrune starts from root + 1
metrics.ExecutionLastChunkDataPackPrunedHeight(nextToPrune - 1)

select {
case <-ctx.Done():
return nil
// wait first so that we give the data pruning lower priority compare to other tasks.
// also we can disable this feature by setting the sleep time to a very large value.
// also allows the pruner to be more responsive to the context cancellation, meaning
// while the pruner is sleeping, it can be cancelled immediately.
case <-time.After(config.SleepAfterEachIteration):
}

iter, hasNext, err := creator.Create()
if err != nil {
return fmt.Errorf("failed to create block iterator: %w", err)
}

if !hasNext {
// no more blocks to iterate, we are done.
continue
}

err = iterateAndPruneAll(iter)
if err != nil {
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err)
}
}
}

// makeBlockIteratorCreator create the block iterator creator
func makeBlockIteratorCreator(
state protocol.State,
badgerDB *badger.DB,
headers storage.Headers,
chunkDataPacksDB storage.DB,
config PruningConfig,
) (
module.IteratorCreator,
// this is for logging purpose, so that after each round of pruning,
// we can log and report metrics about the next and latest to prune
func() (nextToPrune uint64, latestToPrune uint64, err error),
error, // any error are exception
) {
root := state.Params().SealedRoot()
sealedAndExecuted := latest.NewLatestSealedAndExecuted(
root,
state,
badgerDB,
)

// retrieves the latest sealed and executed block height.
// the threshold ensures that a certain number of blocks are retained for querying instead of being pruned.
latest := &LatestPrunable{
LatestSealedAndExecuted: sealedAndExecuted,
threshold: config.Threshold,
}

initializer := store.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey)

creator, err := block_iterator.NewHeightBasedCreator(
headers.BlockIDByHeight,
initializer,
root,
latest.Latest,
)

if err != nil {
return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err)
}

stateReader := creator.IteratorState()

return creator, func() (nextToPrune uint64, latestToPrune uint64, err error) {
next, err := stateReader.LoadState()
if err != nil {
return 0, 0, fmt.Errorf("failed to get next height to prune: %w", err)
}

header, err := latest.Latest()
if err != nil {
return 0, 0, fmt.Errorf("failed to get latest prunable block: %w", err)
}

return next, header.Height, nil
}, nil
}
Loading

0 comments on commit 0f99dc5

Please sign in to comment.