Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chunk Data Pack Pruner] Add Engine for pruning chunk data pack #6946

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -55,6 +56,7 @@ import (
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
"github.com/onflow/flow-go/engine/execution/rpc"
"github.com/onflow/flow-go/engine/execution/scripts"
"github.com/onflow/flow-go/engine/execution/state"
Expand Down Expand Up @@ -139,6 +141,8 @@ type ExecutionNode struct {
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
chunkDataPackDB *pebble.DB
chunkDataPacks storageerr.ChunkDataPacks
providerEngine exeprovider.ProviderEngine
checkerEng *checker.Engine
syncCore *chainsync.Core
Expand Down Expand Up @@ -226,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
// TODO: will re-visit this once storehouse has implemented new WAL for checkpoint file of
// payloadless trie.
// Component("execution data pruner", exeNode.LoadExecutionDataPruner).
Component("execution db pruner", exeNode.LoadExecutionDBPruner).
Component("blob service", exeNode.LoadBlobService).
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Expand Down Expand Up @@ -743,6 +748,9 @@ func (exeNode *ExecutionNode) LoadExecutionState(
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

exeNode.chunkDataPackDB = chunkDataPackDB
exeNode.chunkDataPacks = chunkDataPacks

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
exeNode.serviceEvents = storage.NewServiceEvents(node.Metrics.Cache, node.DB)
Expand Down Expand Up @@ -987,6 +995,27 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner(
return exeNode.executionDataPruner, err
}

func (exeNode *ExecutionNode) LoadExecutionDBPruner(node *NodeConfig) (module.ReadyDoneAware, error) {
cfg := exepruner.PruningConfig{
Threshold: exeNode.exeConf.pruningConfigThreshold,
BatchSize: exeNode.exeConf.pruningConfigBatchSize,
SleepAfterEachBatchCommit: exeNode.exeConf.pruningConfigSleepAfterCommit,
SleepAfterEachIteration: exeNode.exeConf.pruningConfigSleepAfterIteration,
}

return exepruner.NewChunkDataPackPruningEngine(
node.Logger,
exeNode.collector,
node.State,
node.DB,
node.Storage.Headers,
exeNode.chunkDataPacks,
exeNode.results,
exeNode.chunkDataPackDB,
cfg,
), nil
}

func (exeNode *ExecutionNode) LoadCheckerEngine(
node *NodeConfig,
) (
Expand Down
10 changes: 10 additions & 0 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/onflow/flow-go/engine/common/provider"
"github.com/onflow/flow-go/engine/execution/computation/query"
exeprovider "github.com/onflow/flow-go/engine/execution/provider"
exepruner "github.com/onflow/flow-go/engine/execution/pruner"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool"
Expand Down Expand Up @@ -69,6 +70,11 @@ type ExecutionConfig struct {
enableStorehouse bool
enableChecker bool
publicAccessID string

pruningConfigThreshold uint64
pruningConfigBatchSize uint
pruningConfigSleepAfterCommit time.Duration
pruningConfigSleepAfterIteration time.Duration
}

func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
Expand Down Expand Up @@ -130,6 +136,10 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true")
flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node")

flags.Uint64Var(&exeConf.pruningConfigThreshold, "pruning-config-threshold", exepruner.DefaultConfig.Threshold, "the number of blocks that we want to keep in the database, default 30 days")
flags.UintVar(&exeConf.pruningConfigBatchSize, "pruning-config-batch-size", exepruner.DefaultConfig.BatchSize, "the batch size is the number of blocks that we want to delete in one batch, default 1000")
flags.DurationVar(&exeConf.pruningConfigSleepAfterCommit, "pruning-config-sleep-after-commit", exepruner.DefaultConfig.SleepAfterEachBatchCommit, "sleep time after each batch commit, default 1s")
flags.DurationVar(&exeConf.pruningConfigSleepAfterIteration, "pruning-config-sleep-after-iteration", exepruner.DefaultConfig.SleepAfterEachIteration, "sleep time after each iteration, default 500000h")
}

func (exeConf *ExecutionConfig) ValidateFlags() error {
Expand Down
23 changes: 23 additions & 0 deletions engine/execution/pruner/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pruner

import "time"

type PruningConfig struct {
Threshold uint64 // The threshold is the number of blocks that we want to keep in the database.
BatchSize uint // The batch size is the number of blocks that we want to delete in one batch.
SleepAfterEachBatchCommit time.Duration // The sleep time after each batch commit.
SleepAfterEachIteration time.Duration // The sleep time after each iteration.
}

var DefaultConfig = PruningConfig{
Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second
BatchSize: 1200,
// when choosing a value, consider the batch size and block time,
// for instance,
// the block time is 1.2 block/second, and the batch size is 1200,
// so the batch commit time is 1200 / 1.2 = 1000 seconds.
// the sleep time should be smaller than 1000 seconds, otherwise,
// the pruner is not able to keep up with the block generation.
SleepAfterEachBatchCommit: 12 * time.Second,
SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out.
}
176 changes: 176 additions & 0 deletions engine/execution/pruner/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package pruner

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/block_iterator"
"github.com/onflow/flow-go/module/block_iterator/executor"
"github.com/onflow/flow-go/module/block_iterator/latest"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
"github.com/onflow/flow-go/storage/store"
)

const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey"

func LoopPruneExecutionDataFromRootToLatestSealed(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me it seems like there are 3 parts:

  1. Individual data type logic that removes all of the data and indexes associated with a blockID.

@peterargue

This method is called PrunedExecutionData, which currently only prune chunk data packs. We also need to prune other data, such as execution results, and execution data for bitswap. I haven't decided whether to put them all here, since they are in different database. I'm thinking to use one engine for pruning each data, so that we can have separate past and config to prune different dataset.

ctx context.Context,
log zerolog.Logger,
metrics module.ExecutionMetrics,
state protocol.State,
badgerDB *badger.DB,
headers storage.Headers,
chunkDataPacks storage.ChunkDataPacks,
results storage.ExecutionResults,
chunkDataPacksDB *pebble.DB,
config PruningConfig,
) error {

chunksDB := pebbleimpl.ToDB(chunkDataPacksDB)
// the creator can be reused to create new block iterator that can iterate from the last
// checkpoint to the new latest (sealed) block.
creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunksDB, config)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main function of the pruning logic. In order to prevent this function from being very long, I broke the logic into two functions: makeBlockIteratorCreator, and makeIterateAndPruneAll.

if err != nil {
return err
}

// the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks
// and decides how to prune the chunk data packs.
iterateAndPruneAll := makeIterateAndPruneAll(
log,
ctx, // for cancelling the iteration when the context is done
config,
chunksDB,
NewChunkDataPackPruner(chunkDataPacks, results),
)

for {
nextToPrune, latestToPrune, err := getNextAndLatest()
if err != nil {
return fmt.Errorf("failed to get next and latest to prune: %w", err)
}

log.Info().
Uint64("nextToPrune", nextToPrune).
Uint64("latestToPrune", latestToPrune).
Msgf("execution data pruning will start in %s at %s",
config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC())

// last pruned is nextToPrune - 1.
// it won't underflow, because nextToPrune starts from root + 1
metrics.ExecutionLastChunkDataPackPrunedHeight(nextToPrune - 1)

select {
case <-ctx.Done():
return nil
// wait first so that we give the data pruning lower priority compare to other tasks.
// also we can disable this feature by setting the sleep time to a very large value.
// also allows the pruner to be more responsive to the context cancellation, meaning
// while the pruner is sleeping, it can be cancelled immediately.
case <-time.After(config.SleepAfterEachIteration):
}

iter, hasNext, err := creator.Create()
if err != nil {
return fmt.Errorf("failed to create block iterator: %w", err)
}

if !hasNext {
// no more blocks to iterate, we are done.
continue
}

err = iterateAndPruneAll(iter)
if err != nil {
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err)
}
}
}

// makeBlockIteratorCreator create the block iterator creator
func makeBlockIteratorCreator(
state protocol.State,
badgerDB *badger.DB,
headers storage.Headers,
chunkDataPacksDB storage.DB,
config PruningConfig,
) (
module.IteratorCreator,
// this is for logging purpose, so that after each round of pruning,
// we can log and report metrics about the next and latest to prune
func() (nextToPrune uint64, latestToPrune uint64, err error),
error, // any error are exception
) {
root := state.Params().SealedRoot()
sealedAndExecuted := latest.NewLatestSealedAndExecuted(
root,
state,
badgerDB,
)

// retrieves the latest sealed and executed block height.
// the threshold ensures that a certain number of blocks are retained for querying instead of being pruned.
latest := &LatestPrunable{
LatestSealedAndExecuted: sealedAndExecuted,
threshold: config.Threshold,
}

initializer := store.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey)

creator, err := block_iterator.NewHeightBasedCreator(
headers.BlockIDByHeight,
initializer,
root,
latest.Latest,
)

if err != nil {
return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err)
}

stateReader := creator.IteratorState()

return creator, func() (nextToPrune uint64, latestToPrune uint64, err error) {
next, err := stateReader.LoadState()
if err != nil {
return 0, 0, fmt.Errorf("failed to get next height to prune: %w", err)
}

header, err := latest.Latest()
if err != nil {
return 0, 0, fmt.Errorf("failed to get latest prunable block: %w", err)
}

return next, header.Height, nil
}, nil
}

// makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that
// takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs.
func makeIterateAndPruneAll(
log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner,
) func(iter module.BlockIterator) error {
isBatchFull := func(counter int) bool {
return uint(counter) >= config.BatchSize
}

sleeper := func() {
time.Sleep(config.SleepAfterEachBatchCommit)
}

return func(iter module.BlockIterator) error {
err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, chunkDataPacksDB, isBatchFull, sleeper)
if err != nil {
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err)
}
return nil
}
}
Loading
Loading