Skip to content

Commit

Permalink
Add Blob snapshots for bsc (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
blxdyx authored Aug 27, 2024
1 parent c8bbc3d commit 34134a8
Show file tree
Hide file tree
Showing 40 changed files with 876 additions and 135 deletions.
5 changes: 3 additions & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
return snapshotsync.WaitForDownloader(ctx, "CapCliDownloader", false, false, snapshotsync.OnlyCaplin, s, tx,
freezeblocks.NewBlockReader(
freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root()),
freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root())),
freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root()),
freezeblocks.NewBscRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root())),
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer), []string{})
}

Expand Down Expand Up @@ -570,7 +571,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error {
}

var bor *freezeblocks.BorRoSnapshots
blockReader := freezeblocks.NewBlockReader(allSnapshots, bor)
blockReader := freezeblocks.NewBlockReader(allSnapshots, bor, nil)
eth1Getter := getters.NewExecutionSnapshotReader(ctx, beaconConfig, blockReader, db)
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root())
if err := csn.ReopenFolder(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
}); err != nil {
panic(err)
}
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", 0, log.New()), nil /* BorSnapshots */)
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", 0, log.New()), nil /* BorSnapshots */, nil)
bw := blockio.NewBlockWriter(histV3)
return br, bw
}
Expand Down
10 changes: 6 additions & 4 deletions cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ var cmdResetState = &cobra.Command{
}
ctx, _ := common.RootContext()
defer db.Close()
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()

if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) }); err != nil {
if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) }); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
Expand All @@ -59,7 +60,7 @@ var cmdResetState = &cobra.Command{

// set genesis after reset all buckets
fmt.Printf("After reset: \n")
if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) }); err != nil {
if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) }); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
Expand Down Expand Up @@ -97,7 +98,7 @@ func init() {
rootCmd.AddCommand(cmdClearBadBlocks)
}

func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblocks.BorRoSnapshots, agg *state.Aggregator) error {
func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblocks.BorRoSnapshots, bscSn *freezeblocks.BscRoSnapshots, agg *state.Aggregator) error {
var err error
var progress uint64
w := new(tabwriter.Writer)
Expand All @@ -123,6 +124,7 @@ func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblo
fmt.Fprintf(w, "prune distance: %s\n\n", pm.String())
fmt.Fprintf(w, "blocks.v2: %t, blocks=%d, segments=%d, indices=%d\n", snapshots.Cfg().Enabled, snapshots.BlocksAvailable(), snapshots.SegmentsMax(), snapshots.IndicesMax())
fmt.Fprintf(w, "blocks.bor.v2: segments=%d, indices=%d\n\n", borSn.SegmentsMax(), borSn.IndicesMax())
fmt.Fprintf(w, "blobsidecars.bsc.v2: segments=%d, indices=%d\n\n", bscSn.SegmentsMax(), bscSn.IndicesMax())
h3, err := kvcfg.HistoryV3.Enabled(tx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (kv.RwDB
return nil, err
}
if h3 {
_, _, agg := allSnapshots(context.Background(), db, logger)
_, _, _, agg := allSnapshots(context.Background(), db, logger)
tdb, err := temporal.New(db, agg)
if err != nil {
return nil, err
Expand Down
57 changes: 36 additions & 21 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,10 @@ var cmdSetSnap = &cobra.Command{
return
}
defer db.Close()
sn, borSn, agg := allSnapshots(cmd.Context(), db, logger)
sn, borSn, bscSn, agg := allSnapshots(cmd.Context(), db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()

cfg := sn.Cfg()
Expand Down Expand Up @@ -739,9 +740,10 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return err
}

sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
br, bw := blocksIO(db, logger)
engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
Expand Down Expand Up @@ -838,7 +840,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
return nil
}
if unwind > 0 {
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, _, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -869,7 +871,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
return nil
}

sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, _, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -902,9 +904,10 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
}

func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
Expand Down Expand Up @@ -942,9 +945,10 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
tmpdir := datadir.New(datadirCli).Tmp
chainConfig := fromdb.ChainConfig(db)
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)

Expand Down Expand Up @@ -1040,9 +1044,10 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {

engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()

if warmup {
Expand Down Expand Up @@ -1123,9 +1128,10 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {

func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.IntermediateHashes))
Expand Down Expand Up @@ -1181,9 +1187,10 @@ func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {

func stageHashState(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.HashState))
Expand Down Expand Up @@ -1362,9 +1369,10 @@ func stageHistory(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.AccountHistoryIndex))
Expand Down Expand Up @@ -1421,7 +1429,7 @@ func stageHistory(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if err != nil {
return err
}
_ = printStages(tx, sn, borSn, agg)
_ = printStages(tx, sn, borSn, bscSn, agg)
} else {
if err := stagedsync.SpawnAccountHistoryIndex(stageAcc, tx, cfg, ctx, logger); err != nil {
return err
Expand All @@ -1438,9 +1446,10 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
must(sync.SetCurrentStage(stages.TxLookup))
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()

if reset {
Expand Down Expand Up @@ -1488,11 +1497,12 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}

func printAllStages(db kv.RoDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
return db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) })
return db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) })
}

func printAppliedMigrations(db kv.RwDB, ctx context.Context, logger log.Logger) error {
Expand Down Expand Up @@ -1522,9 +1532,10 @@ func removeMigration(db kv.RwDB, ctx context.Context) error {
var openSnapshotOnce sync.Once
var _allSnapshotsSingleton *freezeblocks.RoSnapshots
var _allBorSnapshotsSingleton *freezeblocks.BorRoSnapshots
var _allBscSnapshotsSingleton *freezeblocks.BscRoSnapshots
var _aggSingleton *libstate.Aggregator

func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *libstate.Aggregator) {
func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *freezeblocks.BscRoSnapshots, *libstate.Aggregator) {
openSnapshotOnce.Do(func() {
var useSnapshots bool
_ = db.View(context.Background(), func(tx kv.Tx) error {
Expand All @@ -1537,6 +1548,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true)
_allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger)
_allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger)
_allBscSnapshotsSingleton = freezeblocks.NewBscRoSnapshots(snapCfg, dirs.Snap, 0, logger)

var err error
_aggSingleton, err = libstate.NewAggregator(ctx, dirs.SnapHistory, dirs.Tmp, config3.HistoryV3AggregationStep, db, logger)
Expand All @@ -1557,6 +1569,10 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
panic(err)
}
_allBorSnapshotsSingleton.LogStat("all")
if err := _allBscSnapshotsSingleton.ReopenFolder(); err != nil {
panic(err)
}
_allBscSnapshotsSingleton.LogStat("all")
db.View(context.Background(), func(tx kv.Tx) error {
_aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
Expand All @@ -1566,7 +1582,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
})
}
})
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _allBscSnapshotsSingleton, _aggSingleton
}

var openBlockReaderOnce sync.Once
Expand All @@ -1575,9 +1591,9 @@ var _blockWriterSingleton *blockio.BlockWriter

func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) {
openBlockReaderOnce.Do(func() {
sn, borSn, _ := allSnapshots(context.Background(), db, logger)
sn, borSn, bscSn, _ := allSnapshots(context.Background(), db, logger)
histV3 := kvcfg.HistoryV3.FromDB(db)
_blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn)
_blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn, bscSn)
_blockWriterSingleton = blockio.NewBlockWriter(histV3)
})
return _blockReaderSingleton, _blockWriterSingleton
Expand Down Expand Up @@ -1616,7 +1632,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
cfg.Miner = *miningConfig
}
cfg.Dirs = datadir.New(datadirCli)
allSn, _, agg := allSnapshots(ctx, db, logger)
allSn, _, _, agg := allSnapshots(ctx, db, logger)
cfg.Snapshot = allSn.Cfg()

blockReader, blockWriter := blocksIO(db, logger)
Expand Down Expand Up @@ -1651,7 +1667,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
}

notifications := &shards.Notifications{}
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, chainConfig, notifications.Events, logger)

var (
snapDb kv.RwDB
Expand All @@ -1668,7 +1683,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
if parlia, ok := engine.(*parlia.Parlia); ok {
blobStore = parlia.BlobStore
}

blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, blobStore, chainConfig, notifications.Events, logger)
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
engine, heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)
Expand Down
9 changes: 6 additions & 3 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
return err
}

sn, borSn, agg := allSnapshots(ctx, db, logger1)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger1)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig, logger1)
chainConfig, historyV3, pm := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
Expand Down Expand Up @@ -452,9 +453,10 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
}

func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
dirs := datadir.New(datadirCli)
Expand Down Expand Up @@ -526,9 +528,10 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) e
func loopExec(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
sn, borSn, agg := allSnapshots(ctx, db, logger)
sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer bscSn.Close()
defer agg.Close()
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil, logger)

Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil)

var histV3Enabled bool
_ = db.View(ctx, func(tx kv.Tx) error {
Expand Down
10 changes: 10 additions & 0 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func (back *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash commo
func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") }
func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) BscSnapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) AllTypes() []snaptype.Type { panic("not implemented") }
func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() }
func (back *RemoteBackend) FrozenBorBlocks() uint64 { return back.blockReader.FrozenBorBlocks() }
func (back *RemoteBackend) FrozenBscBlobs() uint64 { return back.blockReader.FrozenBscBlobs() }
func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() }
func (back *RemoteBackend) FreezingCfg() ethconfig.BlocksFreezing {
return back.blockReader.FreezingCfg()
Expand Down Expand Up @@ -301,6 +303,14 @@ func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, hash c
return back.blockReader.BorStartEventID(ctx, tx, hash, blockNum)
}

func (back *RemoteBackend) ReadBlobByNumber(ctx context.Context, tx kv.Getter, blockNum uint64) ([]*types.BlobSidecar, bool, error) {
return back.blockReader.ReadBlobByNumber(ctx, tx, blockNum)
}

func (back *RemoteBackend) ReadBlobTxCount(ctx context.Context, blockNum uint64, hash common.Hash) (uint32, error) {
return back.blockReader.ReadBlobTxCount(ctx, blockNum, hash)
}

func (back *RemoteBackend) LastSpanId(ctx context.Context, tx kv.Tx) (uint64, bool, error) {
return back.blockReader.LastSpanId(ctx, tx)
}
Expand Down
Loading

0 comments on commit 34134a8

Please sign in to comment.