-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Chunk Data Pack Pruner] Add Engine for pruning chunk data pack #6946
Open
zhangchiqing
wants to merge
27
commits into
master
Choose a base branch
from
leo/cdp-engine
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
e1d6940
add executor aggregator
zhangchiqing cfd5ba8
refactor constructor
zhangchiqing 1ebb051
add prunable
zhangchiqing 2ac1c3a
add executor
zhangchiqing 1c0d203
add config
zhangchiqing f45a2ff
add engine core
zhangchiqing 723d363
add engine
zhangchiqing 62ff1d4
add latest sealed and executed
zhangchiqing 8bb6830
add chunk data pack pruner engine
zhangchiqing cdd50af
add flag to control pruning sleep after iteration
zhangchiqing a70b738
remove callback
zhangchiqing e97aaa6
add more flags
zhangchiqing 22e54d6
fix lint
zhangchiqing 2a1d735
handle no block to iterate
zhangchiqing c3f2fd7
use the real chunk data pruner
zhangchiqing 779dfe9
add logging
zhangchiqing 8aba979
update default value
zhangchiqing fe64766
add logging
zhangchiqing 34dddbd
update log
zhangchiqing 8571f1a
update config
zhangchiqing b581d0d
add metrics
zhangchiqing f9f7ee7
add test cases to chunk data pack pruner core
zhangchiqing 7f4586b
add not found check
zhangchiqing f7d349d
add IteratorState to IteratorCreator
zhangchiqing f9e0a63
update mocks
zhangchiqing 4c91379
address review comments
zhangchiqing fa1dfcf
add comments
zhangchiqing File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package pruner | ||
|
||
import "time" | ||
|
||
type PruningConfig struct { | ||
Threshold uint64 // The threshold is the number of blocks that we want to keep in the database. | ||
BatchSize uint // The batch size is the number of blocks that we want to delete in one batch. | ||
SleepAfterEachBatchCommit time.Duration // The sleep time after each batch commit. | ||
SleepAfterEachIteration time.Duration // The sleep time after each iteration. | ||
} | ||
|
||
var DefaultConfig = PruningConfig{ | ||
Threshold: 30 * 60 * 60 * 24 * 1.2, // (30 days of blocks) days * hours * minutes * seconds * block_per_second | ||
BatchSize: 1200, | ||
// when choosing a value, consider the batch size and block time, | ||
// for instance, | ||
// the block time is 1.2 block/second, and the batch size is 1200, | ||
// so the batch commit time is 1200 / 1.2 = 1000 seconds. | ||
// the sleep time should be smaller than 1000 seconds, otherwise, | ||
// the pruner is not able to keep up with the block generation. | ||
SleepAfterEachBatchCommit: 12 * time.Second, | ||
SleepAfterEachIteration: 500000 * time.Hour, // by default it's disabled so that we can slowly roll this feature out. | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
package pruner | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/cockroachdb/pebble" | ||
"github.com/dgraph-io/badger/v2" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/block_iterator" | ||
"github.com/onflow/flow-go/module/block_iterator/executor" | ||
"github.com/onflow/flow-go/module/block_iterator/latest" | ||
"github.com/onflow/flow-go/state/protocol" | ||
"github.com/onflow/flow-go/storage" | ||
"github.com/onflow/flow-go/storage/operation/pebbleimpl" | ||
"github.com/onflow/flow-go/storage/store" | ||
) | ||
|
||
const NextHeightForUnprunedExecutionDataPackKey = "NextHeightForUnprunedExecutionDataPackKey" | ||
|
||
func LoopPruneExecutionDataFromRootToLatestSealed( | ||
ctx context.Context, | ||
log zerolog.Logger, | ||
metrics module.ExecutionMetrics, | ||
state protocol.State, | ||
badgerDB *badger.DB, | ||
headers storage.Headers, | ||
chunkDataPacks storage.ChunkDataPacks, | ||
results storage.ExecutionResults, | ||
chunkDataPacksDB *pebble.DB, | ||
config PruningConfig, | ||
) error { | ||
|
||
chunksDB := pebbleimpl.ToDB(chunkDataPacksDB) | ||
// the creator can be reused to create new block iterator that can iterate from the last | ||
// checkpoint to the new latest (sealed) block. | ||
creator, getNextAndLatest, err := makeBlockIteratorCreator(state, badgerDB, headers, chunksDB, config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main function of the pruning logic. In order to prevent this function from being very long, I broke the logic into two functions: |
||
if err != nil { | ||
return err | ||
} | ||
|
||
// the returned iterateAndPruneAll takes a block iterator and iterates through all the blocks | ||
// and decides how to prune the chunk data packs. | ||
iterateAndPruneAll := makeIterateAndPruneAll( | ||
log, | ||
ctx, // for cancelling the iteration when the context is done | ||
config, | ||
chunksDB, | ||
NewChunkDataPackPruner(chunkDataPacks, results), | ||
) | ||
|
||
for { | ||
nextToPrune, latestToPrune, err := getNextAndLatest() | ||
if err != nil { | ||
return fmt.Errorf("failed to get next and latest to prune: %w", err) | ||
} | ||
|
||
log.Info(). | ||
Uint64("nextToPrune", nextToPrune). | ||
Uint64("latestToPrune", latestToPrune). | ||
Msgf("execution data pruning will start in %s at %s", | ||
config.SleepAfterEachIteration, time.Now().Add(config.SleepAfterEachIteration).UTC()) | ||
|
||
// last pruned is nextToPrune - 1. | ||
// it won't underflow, because nextToPrune starts from root + 1 | ||
metrics.ExecutionLastChunkDataPackPrunedHeight(nextToPrune - 1) | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
// wait first so that we give the data pruning lower priority compare to other tasks. | ||
// also we can disable this feature by setting the sleep time to a very large value. | ||
// also allows the pruner to be more responsive to the context cancellation, meaning | ||
// while the pruner is sleeping, it can be cancelled immediately. | ||
case <-time.After(config.SleepAfterEachIteration): | ||
} | ||
|
||
iter, hasNext, err := creator.Create() | ||
if err != nil { | ||
return fmt.Errorf("failed to create block iterator: %w", err) | ||
} | ||
|
||
if !hasNext { | ||
// no more blocks to iterate, we are done. | ||
continue | ||
} | ||
|
||
err = iterateAndPruneAll(iter) | ||
if err != nil { | ||
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) | ||
} | ||
} | ||
} | ||
|
||
// makeBlockIteratorCreator create the block iterator creator | ||
func makeBlockIteratorCreator( | ||
state protocol.State, | ||
badgerDB *badger.DB, | ||
headers storage.Headers, | ||
chunkDataPacksDB storage.DB, | ||
config PruningConfig, | ||
) ( | ||
module.IteratorCreator, | ||
// this is for logging purpose, so that after each round of pruning, | ||
// we can log and report metrics about the next and latest to prune | ||
func() (nextToPrune uint64, latestToPrune uint64, err error), | ||
error, // any error are exception | ||
) { | ||
root := state.Params().SealedRoot() | ||
sealedAndExecuted := latest.NewLatestSealedAndExecuted( | ||
root, | ||
state, | ||
badgerDB, | ||
) | ||
|
||
// retrieves the latest sealed and executed block height. | ||
// the threshold ensures that a certain number of blocks are retained for querying instead of being pruned. | ||
latest := &LatestPrunable{ | ||
LatestSealedAndExecuted: sealedAndExecuted, | ||
threshold: config.Threshold, | ||
} | ||
|
||
initializer := store.NewConsumerProgress(chunkDataPacksDB, NextHeightForUnprunedExecutionDataPackKey) | ||
|
||
creator, err := block_iterator.NewHeightBasedCreator( | ||
headers.BlockIDByHeight, | ||
initializer, | ||
root, | ||
latest.Latest, | ||
) | ||
|
||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to create height based block iterator creator: %w", err) | ||
} | ||
|
||
stateReader := creator.IteratorState() | ||
|
||
return creator, func() (nextToPrune uint64, latestToPrune uint64, err error) { | ||
next, err := stateReader.LoadState() | ||
if err != nil { | ||
return 0, 0, fmt.Errorf("failed to get next height to prune: %w", err) | ||
} | ||
|
||
header, err := latest.Latest() | ||
if err != nil { | ||
return 0, 0, fmt.Errorf("failed to get latest prunable block: %w", err) | ||
} | ||
|
||
return next, header.Height, nil | ||
}, nil | ||
} | ||
|
||
// makeIterateAndPruneAll takes config and chunk data packs db and pruner and returns a function that | ||
// takes a block iterator and iterates through all the blocks and decides how to prune the chunk data packs. | ||
func makeIterateAndPruneAll( | ||
log zerolog.Logger, ctx context.Context, config PruningConfig, chunkDataPacksDB storage.DB, prune *ChunkDataPackPruner, | ||
) func(iter module.BlockIterator) error { | ||
isBatchFull := func(counter int) bool { | ||
return uint(counter) >= config.BatchSize | ||
} | ||
|
||
sleeper := func() { | ||
time.Sleep(config.SleepAfterEachBatchCommit) | ||
} | ||
|
||
return func(iter module.BlockIterator) error { | ||
err := executor.IterateExecuteAndCommitInBatch(log, ctx, iter, prune, chunkDataPacksDB, isBatchFull, sleeper) | ||
if err != nil { | ||
return fmt.Errorf("failed to iterate, execute, and commit in batch: %w", err) | ||
} | ||
return nil | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peterargue
This method is called
PrunedExecutionData
, which currently only prune chunk data packs. We also need to prune other data, such as execution results, and execution data for bitswap. I haven't decided whether to put them all here, since they are in different database. I'm thinking to use one engine for pruning each data, so that we can have separate past and config to prune different dataset.