From 34134a894a67aff4a79d2eb7da0b6727a5287e50 Mon Sep 17 00:00:00 2001 From: blxdyx <125243069+blxdyx@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:23:46 +0800 Subject: [PATCH] Add Blob snapshots for bsc (#486) --- cmd/capcli/cli.go | 5 +- cmd/hack/hack.go | 2 +- cmd/integration/commands/reset_state.go | 10 +- cmd/integration/commands/root.go | 2 +- cmd/integration/commands/stages.go | 57 +-- cmd/integration/commands/state_stages.go | 9 +- cmd/rpcdaemon/cli/config.go | 2 +- cmd/rpcdaemon/rpcservices/eth_backend.go | 10 + cmd/snapshots/cmp/cmp.go | 8 +- cmd/state/commands/check_change_sets.go | 2 +- cmd/state/commands/opcode_tracer.go | 2 +- cmd/state/commands/state_root.go | 2 +- cmd/state/verify/verify_txlookup.go | 2 +- core/snaptype/block_types.go | 9 +- core/snaptype/bsc_types.go | 56 +++ erigon-lib/chain/snapcfg/util.go | 2 + erigon-lib/common/datadir/dirs.go | 4 +- erigon-lib/downloader/snaptype/type.go | 1 + erigon-lib/go.mod | 2 +- erigon-lib/go.sum | 4 +- eth/backend.go | 26 +- eth/stagedsync/stage_bodies.go | 12 - eth/stagedsync/stage_interhashes_test.go | 8 +- go.mod | 2 +- go.sum | 4 +- p2p/sentry/simulator/sentry_simulator.go | 2 +- .../heimdall/simulator/heimdall_simulator.go | 2 +- turbo/app/snapshots_cmd.go | 54 ++- turbo/jsonrpc/bsc_api.go | 65 ++-- turbo/jsonrpc/daemon.go | 3 - turbo/services/interfaces.go | 8 + .../snapshotsync/freezeblocks/block_reader.go | 85 ++++- .../freezeblocks/block_snapshots.go | 34 +- .../freezeblocks/bsc_sanpshots.go | 1 - .../freezeblocks/bsc_snapshots.go | 357 ++++++++++++++++++ turbo/snapshotsync/freezeblocks/util.go | 101 +++++ turbo/snapshotsync/freezeblocks/util_test.go | 29 ++ turbo/snapshotsync/snapshotsync.go | 20 +- turbo/stages/genesis_test.go | 2 +- turbo/stages/mock/mock_sentry.go | 5 +- 40 files changed, 876 insertions(+), 135 deletions(-) create mode 100644 core/snaptype/bsc_types.go delete mode 100644 turbo/snapshotsync/freezeblocks/bsc_sanpshots.go create mode 100644 turbo/snapshotsync/freezeblocks/bsc_snapshots.go create mode 100644 turbo/snapshotsync/freezeblocks/util.go create mode 100644 turbo/snapshotsync/freezeblocks/util_test.go diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index b124fc95e8d..20c0969ef1a 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -530,7 +530,8 @@ func (d *DownloadSnapshots) Run(ctx *Context) error { return snapshotsync.WaitForDownloader(ctx, "CapCliDownloader", false, false, snapshotsync.OnlyCaplin, s, tx, freezeblocks.NewBlockReader( freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root()), - freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root())), + freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root()), + freezeblocks.NewBscRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, 0, log.Root())), params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer), []string{}) } @@ -570,7 +571,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error { } var bor *freezeblocks.BorRoSnapshots - blockReader := freezeblocks.NewBlockReader(allSnapshots, bor) + blockReader := freezeblocks.NewBlockReader(allSnapshots, bor, nil) eth1Getter := getters.NewExecutionSnapshotReader(ctx, beaconConfig, blockReader, db) csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root()) if err := csn.ReopenFolder(); err != nil { diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 315fa75db0c..ca722903c74 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -139,7 +139,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) { }); err != nil { panic(err) } - br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", 0, log.New()), nil /* BorSnapshots */) + br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", 0, log.New()), nil /* BorSnapshots */, nil) bw := blockio.NewBlockWriter(histV3) return br, bw } diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 84e4990202e..90ec71d6880 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -38,12 +38,13 @@ var cmdResetState = &cobra.Command{ } ctx, _ := common.RootContext() defer db.Close() - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() - if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) }); err != nil { + if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) }); err != nil { if !errors.Is(err, context.Canceled) { logger.Error(err.Error()) } @@ -59,7 +60,7 @@ var cmdResetState = &cobra.Command{ // set genesis after reset all buckets fmt.Printf("After reset: \n") - if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) }); err != nil { + if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) }); err != nil { if !errors.Is(err, context.Canceled) { logger.Error(err.Error()) } @@ -97,7 +98,7 @@ func init() { rootCmd.AddCommand(cmdClearBadBlocks) } -func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblocks.BorRoSnapshots, agg *state.Aggregator) error { +func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblocks.BorRoSnapshots, bscSn *freezeblocks.BscRoSnapshots, agg *state.Aggregator) error { var err error var progress uint64 w := new(tabwriter.Writer) @@ -123,6 +124,7 @@ func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblo fmt.Fprintf(w, "prune distance: %s\n\n", pm.String()) fmt.Fprintf(w, "blocks.v2: %t, blocks=%d, segments=%d, indices=%d\n", snapshots.Cfg().Enabled, snapshots.BlocksAvailable(), snapshots.SegmentsMax(), snapshots.IndicesMax()) fmt.Fprintf(w, "blocks.bor.v2: segments=%d, indices=%d\n\n", borSn.SegmentsMax(), borSn.IndicesMax()) + fmt.Fprintf(w, "blobsidecars.bsc.v2: segments=%d, indices=%d\n\n", bscSn.SegmentsMax(), bscSn.IndicesMax()) h3, err := kvcfg.HistoryV3.Enabled(tx) if err != nil { return err diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go index fcb9a932357..ee22c8d5fad 100644 --- a/cmd/integration/commands/root.go +++ b/cmd/integration/commands/root.go @@ -104,7 +104,7 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (kv.RwDB return nil, err } if h3 { - _, _, agg := allSnapshots(context.Background(), db, logger) + _, _, _, agg := allSnapshots(context.Background(), db, logger) tdb, err := temporal.New(db, agg) if err != nil { return nil, err diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index f22bc2d21ff..1625b49d9ac 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -496,9 +496,10 @@ var cmdSetSnap = &cobra.Command{ return } defer db.Close() - sn, borSn, agg := allSnapshots(cmd.Context(), db, logger) + sn, borSn, bscSn, agg := allSnapshots(cmd.Context(), db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() cfg := sn.Cfg() @@ -739,9 +740,10 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error { return err } - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() br, bw := blocksIO(db, logger) engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) @@ -838,7 +840,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log return nil } if unwind > 0 { - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, _, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() defer agg.Close() @@ -869,7 +871,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log return nil } - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, _, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() defer agg.Close() @@ -902,9 +904,10 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log } func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error { - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db) _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) @@ -942,9 +945,10 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error { func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { tmpdir := datadir.New(datadirCli).Tmp chainConfig := fromdb.ChainConfig(db) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) @@ -1040,9 +1044,10 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) must(sync.SetCurrentStage(stages.Execution)) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() if warmup { @@ -1123,9 +1128,10 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error { dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) must(sync.SetCurrentStage(stages.IntermediateHashes)) @@ -1181,9 +1187,10 @@ func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error { func stageHashState(db kv.RwDB, ctx context.Context, logger log.Logger) error { dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) must(sync.SetCurrentStage(stages.HashState)) @@ -1362,9 +1369,10 @@ func stageHistory(db kv.RwDB, ctx context.Context, logger log.Logger) error { if historyV3 { return fmt.Errorf("this stage is disable in --history.v3=true") } - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) must(sync.SetCurrentStage(stages.AccountHistoryIndex)) @@ -1421,7 +1429,7 @@ func stageHistory(db kv.RwDB, ctx context.Context, logger log.Logger) error { if err != nil { return err } - _ = printStages(tx, sn, borSn, agg) + _ = printStages(tx, sn, borSn, bscSn, agg) } else { if err := stagedsync.SpawnAccountHistoryIndex(stageAcc, tx, cfg, ctx, logger); err != nil { return err @@ -1438,9 +1446,10 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error { _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) chainConfig := fromdb.ChainConfig(db) must(sync.SetCurrentStage(stages.TxLookup)) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() if reset { @@ -1488,11 +1497,12 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error { } func printAllStages(db kv.RoDB, ctx context.Context, logger log.Logger) error { - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() - return db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, agg) }) + return db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, borSn, bscSn, agg) }) } func printAppliedMigrations(db kv.RwDB, ctx context.Context, logger log.Logger) error { @@ -1522,9 +1532,10 @@ func removeMigration(db kv.RwDB, ctx context.Context) error { var openSnapshotOnce sync.Once var _allSnapshotsSingleton *freezeblocks.RoSnapshots var _allBorSnapshotsSingleton *freezeblocks.BorRoSnapshots +var _allBscSnapshotsSingleton *freezeblocks.BscRoSnapshots var _aggSingleton *libstate.Aggregator -func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *libstate.Aggregator) { +func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *freezeblocks.BscRoSnapshots, *libstate.Aggregator) { openSnapshotOnce.Do(func() { var useSnapshots bool _ = db.View(context.Background(), func(tx kv.Tx) error { @@ -1537,6 +1548,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true) _allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger) _allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger) + _allBscSnapshotsSingleton = freezeblocks.NewBscRoSnapshots(snapCfg, dirs.Snap, 0, logger) var err error _aggSingleton, err = libstate.NewAggregator(ctx, dirs.SnapHistory, dirs.Tmp, config3.HistoryV3AggregationStep, db, logger) @@ -1557,6 +1569,10 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl panic(err) } _allBorSnapshotsSingleton.LogStat("all") + if err := _allBscSnapshotsSingleton.ReopenFolder(); err != nil { + panic(err) + } + _allBscSnapshotsSingleton.LogStat("all") db.View(context.Background(), func(tx kv.Tx) error { _aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 { _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) @@ -1566,7 +1582,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl }) } }) - return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton + return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _allBscSnapshotsSingleton, _aggSingleton } var openBlockReaderOnce sync.Once @@ -1575,9 +1591,9 @@ var _blockWriterSingleton *blockio.BlockWriter func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) { openBlockReaderOnce.Do(func() { - sn, borSn, _ := allSnapshots(context.Background(), db, logger) + sn, borSn, bscSn, _ := allSnapshots(context.Background(), db, logger) histV3 := kvcfg.HistoryV3.FromDB(db) - _blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn) + _blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn, bscSn) _blockWriterSingleton = blockio.NewBlockWriter(histV3) }) return _blockReaderSingleton, _blockWriterSingleton @@ -1616,7 +1632,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, cfg.Miner = *miningConfig } cfg.Dirs = datadir.New(datadirCli) - allSn, _, agg := allSnapshots(ctx, db, logger) + allSn, _, _, agg := allSnapshots(ctx, db, logger) cfg.Snapshot = allSn.Cfg() blockReader, blockWriter := blocksIO(db, logger) @@ -1651,7 +1667,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, } notifications := &shards.Notifications{} - blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, chainConfig, notifications.Events, logger) var ( snapDb kv.RwDB @@ -1668,7 +1683,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, if parlia, ok := engine.(*parlia.Parlia); ok { blobStore = parlia.BlobStore } - + blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, blobStore, chainConfig, notifications.Events, logger) stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, engine, heimdallClient, recents, signatures, logger) sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 5e342009b0b..2335cc4a161 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -175,9 +175,10 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. return err } - sn, borSn, agg := allSnapshots(ctx, db, logger1) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger1) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig, logger1) chainConfig, historyV3, pm := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db) @@ -452,9 +453,10 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) { } func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error { - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) dirs := datadir.New(datadirCli) @@ -526,9 +528,10 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) e func loopExec(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error { chainConfig := fromdb.ChainConfig(db) dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db) - sn, borSn, agg := allSnapshots(ctx, db, logger) + sn, borSn, bscSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() + defer bscSn.Close() defer agg.Close() engine, vmConfig, sync, _, _ := newSync(ctx, db, nil, logger) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index dd3c917d50e..79c19e149c7 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -432,7 +432,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger }() } onNewSnapshot() - blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) + blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil) var histV3Enabled bool _ = db.View(ctx, func(tx kv.Tx) error { diff --git a/cmd/rpcdaemon/rpcservices/eth_backend.go b/cmd/rpcdaemon/rpcservices/eth_backend.go index 60a2f18a250..99e3ff27d5f 100644 --- a/cmd/rpcdaemon/rpcservices/eth_backend.go +++ b/cmd/rpcdaemon/rpcservices/eth_backend.go @@ -99,9 +99,11 @@ func (back *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash commo func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") } func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") } func (back *RemoteBackend) BorSnapshots() services.BlockSnapshots { panic("not implemented") } +func (back *RemoteBackend) BscSnapshots() services.BlockSnapshots { panic("not implemented") } func (back *RemoteBackend) AllTypes() []snaptype.Type { panic("not implemented") } func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() } func (back *RemoteBackend) FrozenBorBlocks() uint64 { return back.blockReader.FrozenBorBlocks() } +func (back *RemoteBackend) FrozenBscBlobs() uint64 { return back.blockReader.FrozenBscBlobs() } func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() } func (back *RemoteBackend) FreezingCfg() ethconfig.BlocksFreezing { return back.blockReader.FreezingCfg() @@ -301,6 +303,14 @@ func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, hash c return back.blockReader.BorStartEventID(ctx, tx, hash, blockNum) } +func (back *RemoteBackend) ReadBlobByNumber(ctx context.Context, tx kv.Getter, blockNum uint64) ([]*types.BlobSidecar, bool, error) { + return back.blockReader.ReadBlobByNumber(ctx, tx, blockNum) +} + +func (back *RemoteBackend) ReadBlobTxCount(ctx context.Context, blockNum uint64, hash common.Hash) (uint32, error) { + return back.blockReader.ReadBlobTxCount(ctx, blockNum, hash) +} + func (back *RemoteBackend) LastSpanId(ctx context.Context, tx kv.Tx) (uint64, bool, error) { return back.blockReader.LastSpanId(ctx, tx) } diff --git a/cmd/snapshots/cmp/cmp.go b/cmd/snapshots/cmp/cmp.go index d1cb0fc8414..2451014f528 100644 --- a/cmd/snapshots/cmp/cmp.go +++ b/cmd/snapshots/cmp/cmp.go @@ -485,8 +485,8 @@ func (c comparitor) compareHeaders(ctx context.Context, f1ents []fs.DirEntry, f2 atomic.AddUint64(&compareTime, uint64(time.Since(startTime))) }() - blockReader1 := freezeblocks.NewBlockReader(f1snaps, nil) - blockReader2 := freezeblocks.NewBlockReader(f2snaps, nil) + blockReader1 := freezeblocks.NewBlockReader(f1snaps, nil, nil) + blockReader2 := freezeblocks.NewBlockReader(f2snaps, nil, nil) g, gctx = errgroup.WithContext(ctx) g.SetLimit(2) @@ -767,8 +767,8 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en atomic.AddUint64(&compareTime, uint64(time.Since(startTime))) }() - blockReader1 := freezeblocks.NewBlockReader(f1snaps, nil) - blockReader2 := freezeblocks.NewBlockReader(f2snaps, nil) + blockReader1 := freezeblocks.NewBlockReader(f1snaps, nil, nil) + blockReader2 := freezeblocks.NewBlockReader(f2snaps, nil, nil) return func() error { for i := ent1.From; i < ent1.To; i++ { diff --git a/cmd/state/commands/check_change_sets.go b/cmd/state/commands/check_change_sets.go index bf7dd471630..972334849f3 100644 --- a/cmd/state/commands/check_change_sets.go +++ b/cmd/state/commands/check_change_sets.go @@ -86,7 +86,7 @@ func CheckChangeSets(ctx context.Context, genesis *types.Genesis, blockNum uint6 if err := allSnapshots.ReopenFolder(); err != nil { return fmt.Errorf("reopen snapshot segments: %w", err) } - blockReader := freezeblocks.NewBlockReader(allSnapshots, nil /* BorSnapshots */) + blockReader := freezeblocks.NewBlockReader(allSnapshots, nil /* BorSnapshots */, nil) chainDb := db defer chainDb.Close() diff --git a/cmd/state/commands/opcode_tracer.go b/cmd/state/commands/opcode_tracer.go index e1aafb97374..2af2f84dff0 100644 --- a/cmd/state/commands/opcode_tracer.go +++ b/cmd/state/commands/opcode_tracer.go @@ -433,7 +433,7 @@ func OpcodeTracer(genesis *types.Genesis, blockNum uint64, chaindata string, num return nil }) dirs := datadir2.New(filepath.Dir(chainDb.(*mdbx.MdbxKV).Path())) - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */, nil) chainConfig := genesis.Config vmConfig := vm.Config{Tracer: ot, Debug: true} diff --git a/cmd/state/commands/state_root.go b/cmd/state/commands/state_root.go index bb39e75da35..64225914575 100644 --- a/cmd/state/commands/state_root.go +++ b/cmd/state/commands/state_root.go @@ -56,7 +56,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) { panic(err) } dirs := datadir2.New(filepath.Dir(db.(*kv2.MdbxKV).Path())) - br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */) + br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */, nil) bw := blockio.NewBlockWriter(histV3) return br, bw } diff --git a/cmd/state/verify/verify_txlookup.go b/cmd/state/verify/verify_txlookup.go index be2d380407b..ff90d30ef43 100644 --- a/cmd/state/verify/verify_txlookup.go +++ b/cmd/state/verify/verify_txlookup.go @@ -31,7 +31,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) { panic(err) } dirs := datadir2.New(filepath.Dir(db.(*mdbx.MdbxKV).Path())) - br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */) + br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), nil /* BorSnapshots */, nil) bw := blockio.NewBlockWriter(histV3) return br, bw } diff --git a/core/snaptype/block_types.go b/core/snaptype/block_types.go index a49a68471c9..cfc01293343 100644 --- a/core/snaptype/block_types.go +++ b/core/snaptype/block_types.go @@ -33,31 +33,34 @@ func init() { snapcfg.RegisterKnownTypes(networkname.GoerliChainName, ethereumTypes) snapcfg.RegisterKnownTypes(networkname.GnosisChainName, ethereumTypes) snapcfg.RegisterKnownTypes(networkname.ChiadoChainName, ethereumTypes) - snapcfg.RegisterKnownTypes(networkname.BSCChainName, ethereumTypes) } var Enums = struct { snaptype.Enums Headers, Bodies, - Transactions snaptype.Enum + Transactions, + BscBlobs snaptype.Enum }{ Enums: snaptype.Enums{}, Headers: snaptype.MinCoreEnum, Bodies: snaptype.MinCoreEnum + 1, Transactions: snaptype.MinCoreEnum + 2, + BscBlobs: snaptype.MinBscEnum, } var Indexes = struct { HeaderHash, BodyHash, TxnHash, - TxnHash2BlockNum snaptype.Index + TxnHash2BlockNum, + BscBlobNum snaptype.Index }{ HeaderHash: snaptype.Index{Name: "headers"}, BodyHash: snaptype.Index{Name: "bodies"}, TxnHash: snaptype.Index{Name: "transactions"}, TxnHash2BlockNum: snaptype.Index{Name: "transactions-to-block", Offset: 1}, + BscBlobNum: snaptype.Index{Name: "blocksidecars"}, } var ( diff --git a/core/snaptype/bsc_types.go b/core/snaptype/bsc_types.go new file mode 100644 index 00000000000..d297a49a78c --- /dev/null +++ b/core/snaptype/bsc_types.go @@ -0,0 +1,56 @@ +package snaptype + +import ( + "context" + "encoding/binary" + "fmt" + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon-lib/chain/networkname" + "github.com/ledgerwatch/erigon-lib/chain/snapcfg" + "github.com/ledgerwatch/erigon-lib/common/background" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/recsplit" + "github.com/ledgerwatch/log/v3" +) + +func init() { + initTypes() +} + +func initTypes() { + bscTypes := append(BlockSnapshotTypes, BscSnapshotTypes...) + snapcfg.RegisterKnownTypes(networkname.BSCChainName, bscTypes) + snapcfg.RegisterKnownTypes(networkname.ChapelChainName, bscTypes) +} + +var ( + BlobSidecars = snaptype.RegisterType( + Enums.BscBlobs, + "bscblobsidecars", + snaptype.Versions{ + Current: 1, //2, + MinSupported: 1, + }, + nil, + []snaptype.Index{Indexes.BscBlobNum}, + snaptype.IndexBuilderFunc( + func(ctx context.Context, info snaptype.FileInfo, salt uint32, _ *chain.Config, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { + num := make([]byte, binary.MaxVarintLen64) + if err := snaptype.BuildIndex(ctx, info, salt, info.From, tmpDir, log.LvlDebug, p, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + if i%20_000 == 0 { + logger.Log(lvl, fmt.Sprintf("Generating idx for %s", info.Type.Name()), "progress", i) + } + p.Processed.Add(1) + n := binary.PutUvarint(num, i) + if err := idx.AddKey(num[:n], offset); err != nil { + return err + } + return nil + }, logger); err != nil { + return fmt.Errorf("can't index %s: %w", info.Name(), err) + } + return nil + }), + ) + BscSnapshotTypes = []snaptype.Type{BlobSidecars} +) diff --git a/erigon-lib/chain/snapcfg/util.go b/erigon-lib/chain/snapcfg/util.go index e86047f2158..99dc0e64ef9 100644 --- a/erigon-lib/chain/snapcfg/util.go +++ b/erigon-lib/chain/snapcfg/util.go @@ -26,6 +26,7 @@ var ( Gnosis = fromToml(snapshothashes.Gnosis) Chiado = fromToml(snapshothashes.Chiado) Bsc = fromToml(snapshothashes.Bsc) + Chapel = fromToml(snapshothashes.Chapel) ) type PreverifiedItem struct { @@ -333,6 +334,7 @@ var knownPreverified = map[string]Preverified{ networkname.GnosisChainName: Gnosis, networkname.ChiadoChainName: Chiado, networkname.BSCChainName: Bsc, + networkname.ChapelChainName: Chapel, } func RegisterKnownTypes(networkName string, types []snaptype.Type) { diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go index c56faea87eb..b80198c9750 100644 --- a/erigon-lib/common/datadir/dirs.go +++ b/erigon-lib/common/datadir/dirs.go @@ -46,6 +46,7 @@ type Dirs struct { Nodes string CaplinBlobs string CaplinIndexing string + Blobs string } func New(datadir string) Dirs { @@ -74,11 +75,12 @@ func New(datadir string) Dirs { Nodes: filepath.Join(datadir, "nodes"), CaplinBlobs: filepath.Join(datadir, "caplin", "blobs"), CaplinIndexing: filepath.Join(datadir, "caplin", "indexing"), + Blobs: filepath.Join(datadir, "blobs"), } dir.MustExist(dirs.Chaindata, dirs.Tmp, dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors, - dirs.Downloader, dirs.TxPool, dirs.Nodes, dirs.CaplinBlobs, dirs.CaplinIndexing) + dirs.Downloader, dirs.TxPool, dirs.Nodes, dirs.CaplinBlobs, dirs.CaplinIndexing, dirs.Blobs) return dirs } diff --git a/erigon-lib/downloader/snaptype/type.go b/erigon-lib/downloader/snaptype/type.go index cd8d76db83c..4ec7378692c 100644 --- a/erigon-lib/downloader/snaptype/type.go +++ b/erigon-lib/downloader/snaptype/type.go @@ -301,6 +301,7 @@ type Enums struct { const MinCoreEnum = 1 const MinBorEnum = 4 const MinCaplinEnum = 8 +const MinBscEnum = 10 var CaplinEnums = struct { Enums diff --git a/erigon-lib/go.mod b/erigon-lib/go.mod index 81e9ce98e02..42a25f60e04 100644 --- a/erigon-lib/go.mod +++ b/erigon-lib/go.mod @@ -144,6 +144,6 @@ require ( zombiezen.com/go/sqlite v0.13.1 // indirect ) -replace github.com/ledgerwatch/erigon-snapshot => github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836 +replace github.com/ledgerwatch/erigon-snapshot => github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448 replace github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-8 diff --git a/erigon-lib/go.sum b/erigon-lib/go.sum index dd3f6ae30d0..2d3c74e2d18 100644 --- a/erigon-lib/go.sum +++ b/erigon-lib/go.sum @@ -295,8 +295,8 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836 h1:k9ufjwUYzhV/vxBe3VhsVGl7PQ0xlolrqOBNebtpkJg= -github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= +github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448 h1:fYnBKJ9EzVxEAxzaenOoTm8I9GQWtX/rRxouWOPjHl0= +github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/eth/backend.go b/eth/backend.go index 48a60c9d52f..59f0485dfba 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -344,7 +344,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. - blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, config.HistoryV3, chainConfig.Bor != nil, logger) + blockReader, blockWriter, allSnapshots, allBorSnapshots, _, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, config.HistoryV3, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger) if err != nil { return nil, err } @@ -361,6 +361,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger return nil, err } + // TODO: @Blxdyx add remoteDbServer for bscSnapshots, now it use for remote Rpc server. kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger) backend.notifications.StateChangesConsumer = kvRPC backend.kvRPC = kvRPC @@ -711,7 +712,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, blockReader, logger, latestBlockBuiltStore) // initialize engine backend - blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, backend.chainConfig, backend.notifications.Events, logger) + blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, blobStore, backend.chainConfig, backend.notifications.Events, logger) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi, logger) @@ -1389,7 +1390,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl return err } -func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, histV3 bool, isBor bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *libstate.Aggregator, error) { +func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, histV3 bool, isBor bool, isBsc bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *freezeblocks.BscRoSnapshots, *libstate.Aggregator, error) { var minFrozenBlock uint64 if frozenLimit := snConfig.Sync.FrozenBlockLimit; frozenLimit != 0 { @@ -1405,29 +1406,40 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) } + var allBscSnapshots *freezeblocks.BscRoSnapshots + if isBsc { + allBscSnapshots = freezeblocks.NewBscRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) + } + var err error if snConfig.Snapshot.NoDownloader { allSnapshots.ReopenFolder() if isBor { allBorSnapshots.ReopenFolder() } + if isBsc { + allBscSnapshots.ReopenFolder() + } } else { allSnapshots.OptimisticalyReopenWithDB(db) if isBor { allBorSnapshots.OptimisticalyReopenWithDB(db) } + if isBsc { + allBscSnapshots.OptimisticalyReopenWithDB(db) + } } - blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) + blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots) blockWriter := blockio.NewBlockWriter(histV3) agg, err := libstate.NewAggregator(ctx, dirs.SnapHistory, dirs.Tmp, config3.HistoryV3AggregationStep, db, logger) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } if err = agg.OpenFolder(); err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } - return blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, nil + return blockReader, blockWriter, allSnapshots, allBorSnapshots, allBscSnapshots, agg, nil } func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) { diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 4c0104465db..0fb34f28072 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -436,18 +436,6 @@ func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg, ctx context.Co logEvery := time.NewTicker(logInterval) defer logEvery.Stop() - if cfg.blobStore != nil { - for i := u.CurrentBlockNumber; i > u.UnwindPoint; i-- { - blockHash, err := rawdb.ReadCanonicalHash(tx, i) - if err != nil { - return err - } - if err = cfg.blobStore.RemoveBlobSidecars(ctx, i, blockHash); err != nil { - return err - } - } - } - if err := cfg.blockWriter.MakeBodiesNonCanonical(tx, u.UnwindPoint+1); err != nil { return err } diff --git a/eth/stagedsync/stage_interhashes_test.go b/eth/stagedsync/stage_interhashes_test.go index 5541e3dddfc..9960085e4f7 100644 --- a/eth/stagedsync/stage_interhashes_test.go +++ b/eth/stagedsync/stage_interhashes_test.go @@ -81,7 +81,7 @@ func TestAccountAndStorageTrie(t *testing.T) { // ---------------------------------------------------------------- historyV3 := false - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New())) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), nil) cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) _, err := stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, log.New()) assert.Nil(t, err) @@ -203,7 +203,7 @@ func TestAccountTrieAroundExtensionNode(t *testing.T) { hash6 := libcommon.HexToHash("0x3100000000000000000000000000000000000000000000000000000000000000") assert.Nil(t, tx.Put(kv.HashedAccounts, hash6[:], encoded)) - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New())) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), nil) _, err := stagedsync.RegenerateIntermediateHashes("IH", tx, stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil), libcommon.Hash{} /* expectedRootHash */, ctx, log.New()) assert.Nil(t, err) @@ -266,7 +266,7 @@ func TestStorageDeletion(t *testing.T) { // Populate account & storage trie DB tables // ---------------------------------------------------------------- historyV3 := false - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New())) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), nil) cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) _, err = stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, log.New()) assert.Nil(t, err) @@ -385,7 +385,7 @@ func TestHiveTrieRoot(t *testing.T) { common.FromHex("02081bc16d674ec80000"))) historyV3 := false - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New())) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, t.TempDir(), 0, log.New()), nil) cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) logger := log.New() _, err := stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, logger) diff --git a/go.mod b/go.mod index 9b5a3215245..24cd05725fd 100644 --- a/go.mod +++ b/go.mod @@ -320,6 +320,6 @@ replace ( github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-8 github.com/cometbft/cometbft => github.com/bnb-chain/greenfield-tendermint v0.0.0-20230417032003-4cda1f296fb2 github.com/gballet/go-verkle => github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 - github.com/ledgerwatch/erigon-snapshot => github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836 + github.com/ledgerwatch/erigon-snapshot => github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448 github.com/tendermint/tendermint => github.com/bnb-chain/tendermint v0.31.16 ) diff --git a/go.sum b/go.sum index a31281ee3c5..21383f60ad4 100644 --- a/go.sum +++ b/go.sum @@ -1272,8 +1272,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= -github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836 h1:k9ufjwUYzhV/vxBe3VhsVGl7PQ0xlolrqOBNebtpkJg= -github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240808061348-94b6faa77836/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= +github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448 h1:fYnBKJ9EzVxEAxzaenOoTm8I9GQWtX/rRxouWOPjHl0= +github.com/node-real/bsc-erigon-snapshot v1.0.1-0.20240826065436-2c6a68658448/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6 h1:iZ5rEHU561k2tdi/atkIsrP5/3AX3BjyhYtC96nJ260= diff --git a/p2p/sentry/simulator/sentry_simulator.go b/p2p/sentry/simulator/sentry_simulator.go index effc0995860..cb2384da994 100644 --- a/p2p/sentry/simulator/sentry_simulator.go +++ b/p2p/sentry/simulator/sentry_simulator.go @@ -105,7 +105,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC messageReceivers: map[isentry.MessageId][]isentry.Sentry_MessagesServer{}, knownSnapshots: knownSnapshots, activeSnapshots: activeSnapshots, - blockReader: freezeblocks.NewBlockReader(activeSnapshots, nil), + blockReader: freezeblocks.NewBlockReader(activeSnapshots, nil, nil), logger: logger, downloader: downloader, chain: chain, diff --git a/polygon/heimdall/simulator/heimdall_simulator.go b/polygon/heimdall/simulator/heimdall_simulator.go index ef98c2c19d3..0a2bcffd9aa 100644 --- a/polygon/heimdall/simulator/heimdall_simulator.go +++ b/polygon/heimdall/simulator/heimdall_simulator.go @@ -104,7 +104,7 @@ func NewHeimdall(ctx context.Context, chain string, snapshotLocation string, log ctx: ctx, knownBorSnapshots: knownBorSnapshots, activeBorSnapshots: activeBorSnapshots, - blockReader: freezeblocks.NewBlockReader(nil, activeBorSnapshots), + blockReader: freezeblocks.NewBlockReader(nil, activeBorSnapshots, nil), logger: logger, downloader: downloader, chain: chain, diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 443e8b818e1..aec4b40ff47 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -7,9 +7,15 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon/core/blob_storage" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/spf13/afero" "io" + "math" "net/http" "os" + "path" "path/filepath" "runtime" "time" @@ -209,12 +215,13 @@ func doIntegrity(cliCtx *cli.Context) error { cfg := ethconfig.NewSnapCfg(true, false, true) - blockSnaps, borSnaps, caplinSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + blockSnaps, borSnaps, bscSnaps, caplinSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) if err != nil { return err } defer blockSnaps.Close() defer borSnaps.Close() + defer bscSnaps.Close() defer caplinSnaps.Close() defer agg.Close() @@ -358,12 +365,13 @@ func doIndicesCommand(cliCtx *cli.Context) error { cfg := ethconfig.NewSnapCfg(true, false, true) chainConfig := fromdb.ChainConfig(chainDB) - blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + blockSnaps, borSnaps, bscSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) if err != nil { return err } defer blockSnaps.Close() defer borSnaps.Close() + defer bscSnaps.Close() defer caplinSnaps.Close() defer agg.Close() @@ -382,8 +390,8 @@ func doIndicesCommand(cliCtx *cli.Context) error { } func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) ( - blockSnaps *freezeblocks.RoSnapshots, borSnaps *freezeblocks.BorRoSnapshots, csn *freezeblocks.CaplinSnapshots, - br *freezeblocks.BlockRetire, agg *libstate.Aggregator, err error, + blockSnaps *freezeblocks.RoSnapshots, borSnaps *freezeblocks.BorRoSnapshots, bscSnaps *freezeblocks.BscRoSnapshots, + csn *freezeblocks.CaplinSnapshots, br *freezeblocks.BlockRetire, agg *libstate.Aggregator, err error, ) { blockSnaps = freezeblocks.NewRoSnapshots(cfg, dirs.Snap, 0, logger) if err = blockSnaps.ReopenFolder(); err != nil { @@ -396,17 +404,21 @@ func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.D return } + bscSnaps = freezeblocks.NewBscRoSnapshots(cfg, dirs.Snap, 0, logger) + if err = bscSnaps.ReopenFolder(); err != nil { + return + } + bscSnaps.LogStat("bsc:open") + chainConfig := fromdb.ChainConfig(chainDB) var beaconConfig *clparams.BeaconChainConfig _, beaconConfig, _, err = clparams.GetConfigsByNetworkName(chainConfig.ChainName) - if err != nil { - return - } - - csn = freezeblocks.NewCaplinSnapshots(cfg, beaconConfig, dirs, logger) - if err = csn.ReopenFolder(); err != nil { - return + if err == nil { + csn = freezeblocks.NewCaplinSnapshots(cfg, beaconConfig, dirs, logger) + if err = csn.ReopenFolder(); err != nil { + return + } } borSnaps.LogStat("bor:open") @@ -424,9 +436,15 @@ func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.D return } - blockReader := freezeblocks.NewBlockReader(blockSnaps, borSnaps) + blockReader := freezeblocks.NewBlockReader(blockSnaps, borSnaps, bscSnaps) blockWriter := blockio.NewBlockWriter(fromdb.HistV3(chainDB)) - br = freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, chainDB, chainConfig, nil, logger) + + var bs services.BlobStorage + if chainConfig.Parlia != nil { + bs = openBlobStore(dirs, chainConfig, blockReader) + bscSnaps.LogStat("blobStore:open") + } + br = freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, chainDB, bs, chainConfig, nil, logger) return } @@ -551,12 +569,13 @@ func doRetireCommand(cliCtx *cli.Context) error { defer db.Close() cfg := ethconfig.NewSnapCfg(true, false, true) - blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, db, logger) + blockSnaps, borSnaps, bscSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, db, logger) if err != nil { return err } defer blockSnaps.Close() defer borSnaps.Close() + defer bscSnaps.Close() defer caplinSnaps.Close() defer agg.Close() @@ -830,3 +849,10 @@ func openAgg(ctx context.Context, dirs datadir.Dirs, chainDB kv.RwDB, logger log return agg } + +func openBlobStore(dirs datadir.Dirs, chainConfig *chain.Config, blockReader services.FullBlockReader) services.BlobStorage { + blobDbPath := path.Join(dirs.Blobs, "blob") + blobDb := mdbx.MustOpen(blobDbPath) + blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), dirs.Blobs), math.MaxUint64, chainConfig, blockReader) + return blobStore +} diff --git a/turbo/jsonrpc/bsc_api.go b/turbo/jsonrpc/bsc_api.go index 1a7906bd36a..6660ff975e9 100644 --- a/turbo/jsonrpc/bsc_api.go +++ b/turbo/jsonrpc/bsc_api.go @@ -3,6 +3,7 @@ package jsonrpc import ( "context" "fmt" + "github.com/ethereum/go-ethereum/common" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/hexutil" "github.com/ledgerwatch/erigon/consensus" @@ -31,8 +32,8 @@ type BscAPI interface { GetTransactionsByBlockNumber(ctx context.Context, blockNr rpc.BlockNumber) ([]*RPCTransaction, error) GetVerifyResult(ctx context.Context, blockNr rpc.BlockNumber, blockHash libcommon.Hash, diffHash libcommon.Hash) ([]map[string]interface{}, error) PendingTransactions() ([]*RPCTransaction, error) - GetBlobSidecars(ctx context.Context, numberOrHash rpc.BlockNumberOrHash) ([]map[string]interface{}, error) - GetBlobSidecarByTxHash(ctx context.Context, hash libcommon.Hash) (map[string]interface{}, error) + GetBlobSidecars(ctx context.Context, numberOrHash rpc.BlockNumberOrHash, fullBlob *bool) ([]map[string]interface{}, error) + GetBlobSidecarByTxHash(ctx context.Context, hash libcommon.Hash, fullBlob *bool) (map[string]interface{}, error) } type BscImpl struct { @@ -221,34 +222,41 @@ func (s *BscImpl) PendingTransactions() ([]*RPCTransaction, error) { return nil, fmt.Errorf(NotImplemented, "eth_pendingTransactions") } -func (api *BscImpl) GetBlobSidecars(ctx context.Context, numberOrHash rpc.BlockNumberOrHash) ([]map[string]interface{}, error) { +func (api *BscImpl) GetBlobSidecars(ctx context.Context, numberOrHash rpc.BlockNumberOrHash, fullBlob *bool) ([]map[string]interface{}, error) { + showBlob := true + if fullBlob != nil { + showBlob = *fullBlob + } tx, err := api.ethApi.db.BeginRo(ctx) if err != nil { return nil, err } defer tx.Rollback() - - blockNumber, blockHash, _, err := rpchelper.GetBlockNumber(numberOrHash, tx, api.ethApi.filters) - if err != nil { - return nil, err - } - - bsc, err := api.parlia() + blockNumber, _, _, err := rpchelper.GetBlockNumber(numberOrHash, tx, api.ethApi.filters) if err != nil { return nil, err } - blobSidecars, found, err := bsc.BlobStore.ReadBlobSidecars(ctx, blockNumber, blockHash) + blobSidecars, found, err := api.ethApi._blockReader.ReadBlobByNumber(ctx, tx, blockNumber) if err != nil || !found { return nil, err } result := make([]map[string]interface{}, len(blobSidecars)) for i, sidecar := range blobSidecars { - result[i] = marshalBlobSidecar(sidecar) + result[i] = marshalBlobSidecar(sidecar, showBlob) } return result, nil } -func (api *BscImpl) GetBlobSidecarByTxHash(ctx context.Context, hash libcommon.Hash) (map[string]interface{}, error) { +func (api *BscImpl) GetBlobSidecarByTxHash(ctx context.Context, hash libcommon.Hash, fullBlob *bool) (map[string]interface{}, error) { + showBlob := true + if fullBlob != nil { + showBlob = *fullBlob + } + roTx, err := api.ethApi.db.BeginRo(ctx) + if err != nil { + return nil, err + } + defer roTx.Rollback() tx, err := api.ethApi.GetTransactionByHash(ctx, hash) if err != nil { return nil, err @@ -256,29 +264,44 @@ func (api *BscImpl) GetBlobSidecarByTxHash(ctx context.Context, hash libcommon.H if tx == nil || tx.BlockNumber == nil || tx.BlockHash == nil || tx.TransactionIndex == nil { return nil, nil } - bsc, err := api.parlia() - if err != nil { - return nil, err - } - blobSidecars, found, err := bsc.BlobStore.ReadBlobSidecars(ctx, tx.BlockNumber.Uint64(), *tx.BlockHash) + + blobSidecars, found, err := api.ethApi._blockReader.ReadBlobByNumber(ctx, roTx, tx.BlockNumber.Uint64()) if err != nil || !found { return nil, err } for _, sidecar := range blobSidecars { if sidecar.TxIndex == uint64(*tx.TransactionIndex) { - return marshalBlobSidecar(sidecar), nil + return marshalBlobSidecar(sidecar, showBlob), nil } } return nil, nil } -func marshalBlobSidecar(sidecar *types.BlobSidecar) map[string]interface{} { +func marshalBlobSidecar(sidecar *types.BlobSidecar, fullBlob bool) map[string]interface{} { fields := map[string]interface{}{ "blockHash": sidecar.BlockHash, "blockNumber": hexutil.EncodeUint64(sidecar.BlockNumber.Uint64()), "txHash": sidecar.TxHash, "txIndex": hexutil.EncodeUint64(sidecar.TxIndex), - "blobSidecar": sidecar.BlobTxSidecar, + } + fields["blobSidecar"] = marshalBlob(sidecar.BlobTxSidecar, fullBlob) + return fields +} + +func marshalBlob(blobTxSidecar types.BlobTxSidecar, fullBlob bool) map[string]interface{} { + fields := map[string]interface{}{ + "blobs": blobTxSidecar.Blobs, + "commitments": blobTxSidecar.Commitments, + "proofs": blobTxSidecar.Proofs, + } + if !fullBlob { + var blobs []common.Hash + for _, blob := range blobTxSidecar.Blobs { + var value common.Hash + copy(value[:], blob[:32]) + blobs = append(blobs, value) + } + fields["blobs"] = blobs } return fields } diff --git a/turbo/jsonrpc/daemon.go b/turbo/jsonrpc/daemon.go index 81e9e31ee05..573286faba7 100644 --- a/turbo/jsonrpc/daemon.go +++ b/turbo/jsonrpc/daemon.go @@ -50,9 +50,6 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m case *bor.Bor: borImpl = NewBorAPI(base, db) case lazy: - if _, ok := engine.Engine().(*parlia.Parlia); !engine.HasEngine() || ok { - bscImpl = NewBscAPI(ethImpl) - } if _, ok := engine.Engine().(*bor.Bor); !engine.HasEngine() || ok { borImpl = NewBorAPI(base, db) } diff --git a/turbo/services/interfaces.go b/turbo/services/interfaces.go index 7daeb2a41dd..00be36b93ee 100644 --- a/turbo/services/interfaces.go +++ b/turbo/services/interfaces.go @@ -101,6 +101,11 @@ type BlockAndTxnReader interface { TxnReader } +type BlobReader interface { + ReadBlobByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) ([]*types.BlobSidecar, bool, error) + ReadBlobTxCount(ctx context.Context, blockNum uint64, hash common.Hash) (uint32, error) +} + type FullBlockReader interface { BlockReader BodyReader @@ -111,15 +116,18 @@ type FullBlockReader interface { BorCheckpointReader TxnReader CanonicalReader + BlobReader FrozenBlocks() uint64 FrozenBorBlocks() uint64 + FrozenBscBlobs() uint64 FrozenFiles() (list []string) FreezingCfg() ethconfig.BlocksFreezing CanPruneTo(currentBlockInDB uint64) (canPruneBlocksTo uint64) Snapshots() BlockSnapshots BorSnapshots() BlockSnapshots + BscSnapshots() BlockSnapshots AllTypes() []snaptype.Type } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 77804a8ced1..7c4619a174c 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -111,9 +111,11 @@ func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, bl } func (r *RemoteBlockReader) Snapshots() services.BlockSnapshots { panic("not implemented") } func (r *RemoteBlockReader) BorSnapshots() services.BlockSnapshots { panic("not implemented") } +func (r *RemoteBlockReader) BscSnapshots() services.BlockSnapshots { panic("not implemented") } func (r *RemoteBlockReader) AllTypes() []snaptype.Type { panic("not implemented") } func (r *RemoteBlockReader) FrozenBlocks() uint64 { panic("not supported") } func (r *RemoteBlockReader) FrozenBorBlocks() uint64 { panic("not supported") } +func (r *RemoteBlockReader) FrozenBscBlobs() uint64 { panic("not supported") } func (r *RemoteBlockReader) FrozenFiles() (list []string) { panic("not supported") } func (r *RemoteBlockReader) FreezingCfg() ethconfig.BlocksFreezing { panic("not supported") } @@ -306,17 +308,27 @@ func (r *RemoteBlockReader) Checkpoint(ctx context.Context, tx kv.Getter, spanId return nil, nil } +func (r *RemoteBlockReader) ReadBlobByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) ([]*types.BlobSidecar, bool, error) { + return nil, false, nil +} + +func (r *RemoteBlockReader) ReadBlobTxCount(ctx context.Context, blockNum uint64, hash common.Hash) (uint32, error) { + return 0, nil +} + // BlockReader can read blocks from db and snapshots type BlockReader struct { sn *RoSnapshots borSn *BorRoSnapshots bs services.BlobStorage + bscSn *BscRoSnapshots } -func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots) *BlockReader { +func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots, bscSnapshot services.BlockSnapshots) *BlockReader { borSn, _ := borSnapshots.(*BorRoSnapshots) sn, _ := snapshots.(*RoSnapshots) - return &BlockReader{sn: sn, borSn: borSn} + bscSn, _ := bscSnapshot.(*BscRoSnapshots) + return &BlockReader{sn: sn, borSn: borSn, bscSn: bscSn} } func (r *BlockReader) WithSidecars(blobStorage services.BlobStorage) { @@ -326,6 +338,7 @@ func (r *BlockReader) WithSidecars(blobStorage services.BlobStorage) { func (r *BlockReader) CanPruneTo(currentBlockInDB uint64) uint64 { return CanDeleteTo(currentBlockInDB, r.sn.BlocksAvailable()) } + func (r *BlockReader) Snapshots() services.BlockSnapshots { return r.sn } func (r *BlockReader) BorSnapshots() services.BlockSnapshots { if r.borSn != nil { @@ -335,12 +348,23 @@ func (r *BlockReader) BorSnapshots() services.BlockSnapshots { return nil } +func (r *BlockReader) BscSnapshots() services.BlockSnapshots { + if r.bscSn != nil { + return r.bscSn + } + + return nil +} + func (r *BlockReader) AllTypes() []snaptype.Type { var types []snaptype.Type types = append(types, r.sn.Types()...) if r.borSn != nil { types = append(types, r.borSn.Types()...) } + if r.bscSn != nil { + types = append(types, r.bscSn.Types()...) + } return types } @@ -351,11 +375,22 @@ func (r *BlockReader) FrozenBorBlocks() uint64 { } return 0 } + +func (r *BlockReader) FrozenBscBlobs() uint64 { + if r.bscSn != nil { + return r.bscSn.BlocksAvailable() + } + return 0 +} + func (r *BlockReader) FrozenFiles() []string { files := r.sn.Files() if r.borSn != nil { files = append(files, r.borSn.Files()...) } + if r.bscSn != nil { + files = append(files, r.bscSn.Files()...) + } sort.Strings(files) return files } @@ -583,15 +618,13 @@ func (r *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash common.Has if err != nil { return nil, err } - if r.bs != nil { - blobSidecars, found, err := r.bs.ReadBlobSidecars(ctx, blockHeight, hash) - if err == nil && found && len(blobSidecars) > 0 { - // Has to be there as it is optional - if body.Withdrawals == nil { - body.Withdrawals = make([]*types.Withdrawal, 0) - } - body.Sidecars = blobSidecars + blobSidecars, found, err := r.ReadBlobByNumber(ctx, tx, blockHeight) + if err == nil && found && len(blobSidecars) > 0 { + // Has to be there as it is optional + if body.Withdrawals == nil { + body.Withdrawals = make([]*types.Withdrawal, 0) } + body.Sidecars = blobSidecars } bodyRlp, err = rlp.EncodeToBytes(body) @@ -1733,6 +1766,38 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 { return index.BaseDataID() + index.KeyCount() - 1 } +func (r *BlockReader) ReadBlobByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) ([]*types.BlobSidecar, bool, error) { + maxBlobInFiles := r.FrozenBscBlobs() + if blockHeight > maxBlobInFiles || maxBlobInFiles == 0 { + blockHash, err := r.CanonicalHash(ctx, tx, blockHeight) + if err != nil { + return nil, false, fmt.Errorf("failed ReadCanonicalHash: %w", err) + } + return r.bs.ReadBlobSidecars(ctx, blockHeight, blockHash) + } + log.Info("Read BlobSidecar from snapshots") + blobs, err := r.bscSn.ReadBlobSidecars(blockHeight) + if err != nil { + return nil, false, err + } + if blobs == nil { + return nil, false, nil + } + return blobs, true, nil +} + +func (r *BlockReader) ReadBlobTxCount(ctx context.Context, blockNum uint64, hash common.Hash) (uint32, error) { + maxBlobInFiles := r.FrozenBscBlobs() + if blockNum > maxBlobInFiles || maxBlobInFiles == 0 { + return r.bs.BlobTxCount(ctx, hash) + } + blobs, err := r.bscSn.ReadBlobSidecars(blockNum) + if err != nil { + return 0, err + } + return uint32(len(blobs)), nil +} + // ---- Data Integrity part ---- func (r *BlockReader) ensureHeaderNumber(n uint64, seg *Segment) error { diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 2c203945194..c7c242200b3 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -306,7 +306,7 @@ func (s *RoSnapshots) BlocksAvailable() uint64 { return 0 } - return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) + return s.idxMax.Load() } func (s *RoSnapshots) LogStat(label string) { var m runtime.MemStats @@ -1119,6 +1119,7 @@ type BlockRetire struct { workers int tmpDir string db kv.RoDB + bs services.BlobStorage notifier services.DBEventNotifier logger log.Logger @@ -1128,8 +1129,8 @@ type BlockRetire struct { chainConfig *chain.Config } -func NewBlockRetire(compressWorkers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire { - return &BlockRetire{workers: compressWorkers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, db: db, chainConfig: chainConfig, notifier: notifier, logger: logger} +func NewBlockRetire(compressWorkers int, dirs datadir.Dirs, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, db kv.RoDB, bs services.BlobStorage, chainConfig *chain.Config, notifier services.DBEventNotifier, logger log.Logger) *BlockRetire { + return &BlockRetire{workers: compressWorkers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, db: db, bs: bs, chainConfig: chainConfig, notifier: notifier, logger: logger} } func (br *BlockRetire) SetWorkers(workers int) { @@ -1148,6 +1149,10 @@ func (br *BlockRetire) borSnapshots() *BorRoSnapshots { return br.blockReader.BorSnapshots().(*BorRoSnapshots) } +func (br *BlockRetire) bscSnapshots() *BscRoSnapshots { + return br.blockReader.BscSnapshots().(*BscRoSnapshots) +} + func (br *BlockRetire) HasNewFrozenFiles() bool { return br.needSaveFilesListInDB.CompareAndSwap(true, false) } @@ -1355,6 +1360,7 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, minBlockNum uint64, max br.maxScheduledBlock.Store(maxBlockNum) } includeBor := br.chainConfig.Bor != nil + includeBsc := br.chainConfig.Parlia != nil var err error if includeBor { @@ -1374,7 +1380,7 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, minBlockNum uint64, max } for { - var ok, okBor bool + var ok, okBor, okBsc bool minBlockNum = cmp.Max(br.blockReader.FrozenBlocks(), minBlockNum) maxBlockNum = br.maxScheduledBlock.Load() @@ -1391,7 +1397,19 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, minBlockNum uint64, max } } - if !(ok || okBor) { + if includeBsc { + for { + okBsc, err = br.retireBscBlocks(ctx, br.blockReader.FrozenBscBlobs(), minBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) + if err != nil { + return err + } + if !okBsc { + break + } + } + } + + if !(ok || okBor || okBsc) { break } } @@ -1409,6 +1427,12 @@ func (br *BlockRetire) BuildMissedIndicesIfNeed(ctx context.Context, logPrefix s } } + if cc.Parlia != nil { + if err := br.bscSnapshots().RoSnapshots.buildMissedIndicesIfNeed(ctx, logPrefix, notifier, br.dirs, cc, br.logger); err != nil { + return err + } + } + return nil } diff --git a/turbo/snapshotsync/freezeblocks/bsc_sanpshots.go b/turbo/snapshotsync/freezeblocks/bsc_sanpshots.go deleted file mode 100644 index 144a6d5daa6..00000000000 --- a/turbo/snapshotsync/freezeblocks/bsc_sanpshots.go +++ /dev/null @@ -1 +0,0 @@ -package freezeblocks diff --git a/turbo/snapshotsync/freezeblocks/bsc_snapshots.go b/turbo/snapshotsync/freezeblocks/bsc_snapshots.go new file mode 100644 index 00000000000..8e12f122506 --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/bsc_snapshots.go @@ -0,0 +1,357 @@ +package freezeblocks + +import ( + "context" + "fmt" + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon-lib/chain/networkname" + "github.com/ledgerwatch/erigon-lib/chain/snapcfg" + "github.com/ledgerwatch/erigon-lib/common/background" + "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/seg" + "github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb" + coresnaptype "github.com/ledgerwatch/erigon/core/snaptype" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/log/v3" + "path/filepath" + "reflect" +) + +var BscProduceFiles = dbg.EnvBool("BSC_PRODUCE_FILES", false) + +const ( + bscMinSegFrom = 39_700_000 + chapelMinSegFrom = 39_500_000 +) + +func (br *BlockRetire) dbHasEnoughDataForBscRetire(ctx context.Context) (bool, error) { + return true, nil +} + +func (br *BlockRetire) retireBscBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) { + if !BscProduceFiles { + return false, nil + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + snapshots := br.bscSnapshots() + + chainConfig := fromdb.ChainConfig(br.db) + var minimumBlob uint64 + notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers + if chainConfig.ChainName == networkname.BSCChainName { + minimumBlob = bscMinSegFrom + } else { + minimumBlob = chapelMinSegFrom + } + blockFrom := max(blockReader.FrozenBscBlobs(), minimumBlob) + blocksRetired := false + for _, snap := range blockReader.BscSnapshots().Types() { + if maxBlockNum <= blockFrom || maxBlockNum-blockFrom < snaptype.Erigon2MergeLimit { + continue + } + + blockTo := maxBlockNum + + logger.Log(lvl, "[bsc snapshot] Retire Bsc Blobs", "type", snap, + "range", fmt.Sprintf("%d-%d", blockFrom, blockTo)) + + blocksRetired = true + if err := DumpBlobs(ctx, blockFrom, blockTo, br.chainConfig, tmpDir, snapshots.Dir(), db, workers, lvl, blockReader, br.bs, logger); err != nil { + return true, fmt.Errorf("DumpBlobs: %w", err) + } + } + + if blocksRetired { + if err := snapshots.ReopenFolder(); err != nil { + return true, fmt.Errorf("reopen: %w", err) + } + snapshots.LogStat("bsc:retire") + if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size + notifier.OnNewSnapshot() + } + + // now prune blobs from the database + blockTo := (maxBlockNum / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + roTx, err := db.BeginRo(ctx) + if err != nil { + return false, nil + } + defer roTx.Rollback() + + for i := blockFrom; i < blockTo; i++ { + if i%10000 == 0 { + log.Info("remove sidecars", "blockNum", i) + } + blockHash, err := blockReader.CanonicalHash(ctx, roTx, i) + if err != nil { + return false, err + } + if err = br.bs.RemoveBlobSidecars(ctx, i, blockHash); err != nil { + log.Error("remove sidecars", "blockNum", i, "err", err) + } + + } + if seedNewSnapshots != nil { + downloadRequest := []services.DownloadRequest{ + services.NewDownloadRequest("", ""), + } + if err := seedNewSnapshots(downloadRequest); err != nil { + return false, err + } + } + } + + return blocksRetired, nil +} + +type BscRoSnapshots struct { + RoSnapshots +} + +// NewBscSnapshots - opens all snapshots. But to simplify everything: +// - it opens snapshots only on App start and immutable after +// - all snapshots of given blocks range must exist - to make this blocks range available +// - gaps are not allowed +// - segment have [from:to) semantic +func NewBscRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, segmentsMin uint64, logger log.Logger) *BscRoSnapshots { + return &BscRoSnapshots{*newRoSnapshots(cfg, snapDir, coresnaptype.BscSnapshotTypes, segmentsMin, logger)} +} + +func (s *BscRoSnapshots) Ranges() []Range { + view := s.View() + defer view.Close() + return view.base.Ranges() +} + +func (s *BscRoSnapshots) ReopenFolder() error { + files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), coresnaptype.BscSnapshotTypes, true) + if err != nil { + return err + } + list := make([]string, 0, len(files)) + for _, f := range files { + _, fName := filepath.Split(f.Path) + list = append(list, fName) + } + return s.ReopenList(list, false) +} + +type BscView struct { + base *View +} + +func (s *BscRoSnapshots) View() *BscView { + v := &BscView{base: s.RoSnapshots.View()} + v.base.baseSegType = coresnaptype.BlobSidecars + return v +} + +func (v *BscView) Close() { + v.base.Close() +} + +func (v *BscView) BlobSidecars() []*Segment { return v.base.Segments(coresnaptype.BlobSidecars) } + +func (v *BscView) BlobSidecarsSegment(blockNum uint64) (*Segment, bool) { + return v.base.Segment(coresnaptype.BlobSidecars, blockNum) +} + +func dumpBlobsRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, blobStore services.BlobStorage, blockReader services.FullBlockReader, chainConfig *chain.Config, workers int, lvl log.Lvl, logger log.Logger) (err error) { + f := coresnaptype.BlobSidecars.FileInfo(snapDir, blockFrom, blockTo) + sn, err := seg.NewCompressor(ctx, "Snapshot "+f.Type.Name(), f.Path, tmpDir, seg.MinPatternScore, workers, lvl, logger) + if err != nil { + return err + } + defer sn.Close() + + tx, err := chainDB.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + + // Generate .seg file, which is just the list of beacon blocks. + for i := blockFrom; i < blockTo; i++ { + // read root. + blockHash, err := blockReader.CanonicalHash(ctx, tx, i) + if err != nil { + return err + } + + blobTxCount, err := blobStore.BlobTxCount(ctx, blockHash) + if err != nil { + return err + } + if blobTxCount == 0 { + sn.AddWord(nil) + continue + } + sidecars, found, err := blobStore.ReadBlobSidecars(ctx, i, blockHash) + if err != nil { + return fmt.Errorf("read blob sidecars: blockNum = %d, blobTxcount = %d, err = %v", i, blobTxCount, err) + } + if !found { + return fmt.Errorf("blob sidecars not found for block %d", i) + } + dataRLP, err := rlp.EncodeToBytes(sidecars) + if err != nil { + return err + } + if err := sn.AddWord(dataRLP); err != nil { + return err + } + if i%20_000 == 0 { + logger.Log(lvl, "Dumping bsc blobs", "progress", i) + } + + } + if err := sn.Compress(); err != nil { + return fmt.Errorf("compress: %w", err) + } + // Generate .idx file, which is the slot => offset mapping. + p := &background.Progress{} + + if err := f.Type.BuildIndexes(ctx, f, chainConfig, tmpDir, p, lvl, logger); err != nil { + return err + } + + return nil +} + +func DumpBlobs(ctx context.Context, blockFrom, blockTo uint64, chainConfig *chain.Config, tmpDir, snapDir string, chainDB kv.RoDB, workers int, lvl log.Lvl, blockReader services.FullBlockReader, blobStore services.BlobStorage, logger log.Logger) error { + //if checkBlobs(ctx, blockFrom, blockTo, chainDB, blobStore, blockReader, logger) == false { + // return fmt.Errorf("check blobs failed") + //} + for i := blockFrom; i < blockTo; i = chooseSegmentEnd(i, blockTo, coresnaptype.Enums.BscBlobs, chainConfig) { + blocksPerFile := snapcfg.MergeLimit("", coresnaptype.Enums.BscBlobs, i) + if blockTo-i < blocksPerFile { + break + } + logger.Log(lvl, "Dumping blobs sidecars", "from", i, "to", blockTo) + if err := dumpBlobsRange(ctx, i, chooseSegmentEnd(i, blockTo, coresnaptype.Enums.BscBlobs, chainConfig), tmpDir, snapDir, chainDB, blobStore, blockReader, chainConfig, workers, lvl, logger); err != nil { + return err + } + } + return nil +} + +func (s *BscRoSnapshots) ReadBlobSidecars(blockNum uint64) ([]*types.BlobSidecar, error) { + view := s.View() + defer view.Close() + + var buf []byte + + seg, ok := view.BlobSidecarsSegment(blockNum) + if !ok { + return nil, nil + } + + idxNum := seg.Index() + + if idxNum == nil { + return nil, nil + } + blockOffset := idxNum.OrdinalLookup(blockNum - idxNum.BaseDataID()) + + gg := seg.MakeGetter() + gg.Reset(blockOffset) + if !gg.HasNext() { + return nil, nil + } + + buf, _ = gg.Next(buf) + if len(buf) == 0 { + return nil, nil + } + + var sidecars []*types.BlobSidecar + err := rlp.DecodeBytes(buf, &sidecars) + if err != nil { + return nil, err + } + + return sidecars, nil +} + +// +//func checkBlobs(ctx context.Context, blockFrom, blockTo uint64, chainDB kv.RoDB, blobStore services.BlobStorage, blockReader services.FullBlockReader, logger log.Logger) bool { +// tx, err := chainDB.BeginRo(ctx) +// if err != nil { +// return false +// } +// defer tx.Rollback() +// var missedBlobs []uint64 +// noErr := true +// for i := blockFrom; i < blockTo; i++ { +// block, err := blockReader.BlockByNumber(ctx, tx, i) +// if err != nil { +// log.Error("ReadCanonicalHash", "blockNum", i, "blockHash", block.Hash(), "err", err) +// noErr = false +// } +// var blobTxCount uint64 +// +// for _, tx := range block.Transactions() { +// if tx.Type() != types.BlobTxType { +// continue +// } +// blobTxCount++ +// } +// if blobTxCount == 0 { +// continue +// } +// blobs, found, err := blobStore.ReadBlobSidecars(ctx, i, block.Hash()) +// if err != nil { +// noErr = false +// missedBlobs = append(missedBlobs, i) +// log.Error("read blob sidecars:", "blockNum", i, "blobTxCount", blobTxCount, "err", err) +// err := blobStore.RemoveBlobSidecars(ctx, i, block.Hash()) +// log.Error("Remove blob sidecars:", "blockNum", i, "blobTxCount", blobTxCount, "err", err) +// continue +// } +// if !found { +// noErr = false +// missedBlobs = append(missedBlobs, i) +// log.Error("blob sidecars not found for block ", "blockNumber", i, "count", blobTxCount) +// continue +// } +// +// if uint64(len(blobs)) != blobTxCount { +// missedBlobs = append(missedBlobs, i) +// noErr = false +// log.Error("blob sidecars not found for block ", "blockNumber", i, "want", blobTxCount, "actual", len(blobs)) +// continue +// } +// +// if i%20_000 == 0 { +// logger.Info("Dumping bsc blobs", "progress", i) +// } +// } +// +// log.Info("Start query missedBlobs from http") +// for _, num := range missedBlobs { +// blobs := GetBlobSidecars(num) +// hash, err := blockReader.CanonicalHash(ctx, tx, num) +// if err != nil { +// log.Error("GetBlobSidecars failed", "num", num, "err", err) +// return noErr +// } +// if err = blobStore.WriteBlobSidecars(ctx, hash, blobs); err != nil { +// log.Error("WriteBlobSidecars failed", "num", num, "err", err) +// } +// log.Info("WriteBlobSidecars blobs", "num", num, "hash", hash, "blobs", len(blobs)) +// time.Sleep(1 * time.Second) +// } +// +// return noErr +//} diff --git a/turbo/snapshotsync/freezeblocks/util.go b/turbo/snapshotsync/freezeblocks/util.go new file mode 100644 index 00000000000..c97b7da5377 --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/util.go @@ -0,0 +1,101 @@ +package freezeblocks + +import ( + "bytes" + "encoding/json" + "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/hexutil" + "github.com/ledgerwatch/erigon/core/types" + "io/ioutil" + "net/http" +) + +const ( + testnetURL = "" + mainNetURL = "" +) + +type RPCRequest struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params []string `json:"params"` + ID int `json:"id"` +} + +type BlockResponse struct { + Jsonrpc string `json:"jsonrpc"` + Id int `json:"id"` + Result json.RawMessage `json:"result"` + Error interface{} `json:"error"` +} +type BlobResponse struct { + BlobTxSidecar types.BlobTxSidecar `json:"blobSidecar"` + BlockNumber string `json:"blockNumber"` + BlockHash libcommon.Hash `json:"blockHash"` + TxIndex string `json:"txIndex"` + TxHash libcommon.Hash `json:"txHash"` +} + +func GetBlobSidecars(blockNumber uint64) types.BlobSidecars { + blockNum := hexutil.EncodeUint64(blockNumber) + request := RPCRequest{ + Jsonrpc: "2.0", + Method: "eth_getBlobSidecars", + Params: []string{blockNum}, + ID: 1, + } + + body, err := json.Marshal(request) + if err != nil { + fmt.Println("Error marshalling request:", err) + return nil + } + + resp, err := http.Post(mainNetURL, "application/json", bytes.NewBuffer(body)) + if err != nil { + fmt.Println("Error making request:", err) + return nil + } + defer resp.Body.Close() + + responseBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return nil + } + + var blockResponse BlockResponse + err = json.Unmarshal(responseBody, &blockResponse) + if err != nil { + fmt.Println("Error unmarshalling response a:", err) + return nil + } + + if blockResponse.Error != nil { + fmt.Println("Error from RPC:", blockResponse.Error) + return nil + } + + var blobResponse []*BlobResponse + err = json.Unmarshal(blockResponse.Result, &blobResponse) + if err != nil { + fmt.Println("Error unmarshalling response b:", err) + return nil + } + + var blobSidecars types.BlobSidecars + for _, blobSidecar := range blobResponse { + bn, _ := hexutil.DecodeBig(blobSidecar.BlockNumber) + tx, _ := hexutil.DecodeUint64(blobSidecar.TxIndex) + blob := &types.BlobSidecar{ + BlobTxSidecar: blobSidecar.BlobTxSidecar, + BlockNumber: bn, + BlockHash: blobSidecar.BlockHash, + TxIndex: tx, + TxHash: blobSidecar.TxHash, + } + blobSidecars = append(blobSidecars, blob) + } + return blobSidecars +} diff --git a/turbo/snapshotsync/freezeblocks/util_test.go b/turbo/snapshotsync/freezeblocks/util_test.go new file mode 100644 index 00000000000..14ec5b03a0b --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/util_test.go @@ -0,0 +1,29 @@ +package freezeblocks + +import ( + "github.com/ledgerwatch/erigon/core/types" + "reflect" + "testing" +) + +func TestGetBlobSidecars(t *testing.T) { + + tests := []struct { + name string + blockNumber uint64 + want types.BlobSidecars + }{ + { + name: "test1", + blockNumber: uint64(39565743), + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetBlobSidecars(tt.blockNumber); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetBlobSidecars() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/turbo/snapshotsync/snapshotsync.go b/turbo/snapshotsync/snapshotsync.go index f56a85b89de..4b890a64eca 100644 --- a/turbo/snapshotsync/snapshotsync.go +++ b/turbo/snapshotsync/snapshotsync.go @@ -71,6 +71,7 @@ func RequestSnapshotsDownload(ctx context.Context, downloadRequest []services.Do func WaitForDownloader(ctx context.Context, logPrefix string, histV3, blobs bool, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error { snapshots := blockReader.Snapshots() borSnapshots := blockReader.BorSnapshots() + bscSnapshots := blockReader.BscSnapshots() if blockReader.FreezingCfg().NoDownloader { if err := snapshots.ReopenFolder(); err != nil { return err @@ -80,6 +81,11 @@ func WaitForDownloader(ctx context.Context, logPrefix string, histV3, blobs bool return err } } + if cc.Parlia != nil { + if err := bscSnapshots.ReopenFolder(); err != nil { + return err + } + } return nil } @@ -87,7 +93,11 @@ func WaitForDownloader(ctx context.Context, logPrefix string, histV3, blobs bool if cc.Bor != nil { borSnapshots.Close() } + if cc.Parlia != nil { + bscSnapshots.Close() + } + NoParlia := cc.Parlia == nil //Corner cases: // - Erigon generated file X with hash H1. User upgraded Erigon. New version has preverified file X with hash H2. Must ignore H2 (don't send to Downloader) // - Erigon "download once": means restart/upgrade/downgrade must not download files (and will be fast) @@ -105,13 +115,13 @@ func WaitForDownloader(ctx context.Context, logPrefix string, histV3, blobs bool continue } } - if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) { + if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks")) { continue } if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") { continue } - if !blobs && strings.Contains(p.Name, "blobsidecars") { + if !blobs && strings.Contains(p.Name, "blobsidecars") && NoParlia { continue } downloadRequest = append(downloadRequest, services.NewDownloadRequest(p.Name, p.Hash)) @@ -193,6 +203,12 @@ func WaitForDownloader(ctx context.Context, logPrefix string, histV3, blobs bool } } + if cc.Parlia != nil { + if err := bscSnapshots.ReopenFolder(); err != nil { + return err + } + } + if err := agg.OpenFolder(); err != nil { return err } diff --git a/turbo/stages/genesis_test.go b/turbo/stages/genesis_test.go index 2d5642bddfd..1938c49d711 100644 --- a/turbo/stages/genesis_test.go +++ b/turbo/stages/genesis_test.go @@ -177,7 +177,7 @@ func TestSetupGenesis(t *testing.T) { t.Parallel() dirs := datadir.New(tmpdir) _, db, _ := temporaltest.NewTestDB(t, dirs) - blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New())) + blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, dirs.Snap, 0, log.New())) config, genesis, err := test.fn(db) // Check the return values. if !reflect.DeepEqual(err, test.wantErr) { diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index ad121d99ec0..55a0661e313 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -263,6 +263,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, logger) allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger) allBorSnapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger) + allBscSnapshots := freezeblocks.NewBscRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger) mock := &MockSentry{ Ctx: ctx, cancel: ctxCancel, DB: db, agg: agg, tb: tb, @@ -279,7 +280,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK }, PeerId: gointerfaces.ConvertHashToH512([64]byte{0x12, 0x34, 0x50}), // "12345" BlockSnapshots: allSnapshots, - BlockReader: freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots), + BlockReader: freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots), HistoryV3: cfg.HistoryV3, } if tb != nil { @@ -434,7 +435,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK return block, nil } - blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, mock.DB, mock.ChainConfig, mock.Notifications.Events, logger) + blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, mock.DB, nil, mock.ChainConfig, mock.Notifications.Events, logger) mock.Sync = stagedsync.New( cfg.Sync, stagedsync.DefaultStages(mock.Ctx,