From 04f231a40083a5c1cf501abc7c46f39e2bf132f1 Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Fri, 5 Apr 2024 14:09:43 -0500 Subject: [PATCH] Initsync skip local blobs (#13827) * wip - init-sync skip available blob req * satisfy deep source * gaz * don't need to sort blobs; simplify blobRequest stack * wip debug log to watch blob skip behavior * unit tests for new blob req generator * refactor to reduce blob req func count * log when WaitForSummarizer fails --------- Co-authored-by: Kasey Kirkham --- beacon-chain/das/availability.go | 9 + beacon-chain/das/cache.go | 16 +- beacon-chain/db/filesystem/BUILD.bazel | 2 +- beacon-chain/db/filesystem/blob.go | 2 +- .../db/filesystem/{ephemeral.go => mock.go} | 16 +- beacon-chain/db/filesystem/pruner.go | 34 +- beacon-chain/sync/initial-sync/BUILD.bazel | 3 +- .../sync/initial-sync/blocks_fetcher.go | 171 ++++++--- .../sync/initial-sync/blocks_fetcher_test.go | 347 ++++++++++++------ .../sync/initial-sync/blocks_queue.go | 10 +- .../sync/initial-sync/blocks_queue_test.go | 36 +- beacon-chain/sync/initial-sync/round_robin.go | 68 ++-- 12 files changed, 460 insertions(+), 254 deletions(-) rename beacon-chain/db/filesystem/{ephemeral.go => mock.go} (84%) diff --git a/beacon-chain/das/availability.go b/beacon-chain/das/availability.go index 1729d0410cfc..7a8a2105838a 100644 --- a/beacon-chain/das/availability.go +++ b/beacon-chain/das/availability.go @@ -94,6 +94,15 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri entry := s.cache.ensure(key) defer s.cache.delete(key) root := b.Root() + sumz, err := s.store.WaitForSummarizer(ctx) + if err != nil { + log.WithField("root", fmt.Sprintf("%#x", b.Root())). + WithError(err). + Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable") + } else { + entry.setDiskSummary(sumz.Summary(root)) + } + // Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent. // We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather // ignore their response and decrease their peer score. diff --git a/beacon-chain/das/cache.go b/beacon-chain/das/cache.go index ed940c412e1c..3e8d781bead9 100644 --- a/beacon-chain/das/cache.go +++ b/beacon-chain/das/cache.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" @@ -59,7 +60,12 @@ func (c *cache) delete(key cacheKey) { // cacheEntry holds a fixed-length cache of BlobSidecars. type cacheEntry struct { - scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob + scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob + diskSummary filesystem.BlobStorageSummary +} + +func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) { + e.diskSummary = sum } // stash adds an item to the in-memory cache of BlobSidecars. @@ -81,9 +87,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error { // the cache do not match those found in the block. If err is nil, then all expected // commitments were found in the cache and the sidecar slice return value can be used // to perform a DA check against the cached sidecars. +// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded. func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) { + if e.diskSummary.AllAvailable(kc.count()) { + return nil, nil + } scs := make([]blocks.ROBlob, kc.count()) for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ { + // We already have this blob, we don't need to write it or validate it. + if e.diskSummary.HasIndex(i) { + continue + } if kc[i] == nil { if e.scs[i] != nil { return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment) diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 406857686be8..bc97cdd89b98 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -5,9 +5,9 @@ go_library( srcs = [ "blob.go", "cache.go", - "ephemeral.go", "log.go", "metrics.go", + "mock.go", "pruner.go", ], importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem", diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 3da9a01ab91b..3fa4e4ee62a3 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -121,7 +121,7 @@ var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialize // BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to // determine which blobs are available. func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) { - if bs.pruner == nil { + if bs == nil || bs.pruner == nil { return nil, ErrBlobStorageSummarizerUnavailable } return bs.pruner.waitForCache(ctx) diff --git a/beacon-chain/db/filesystem/ephemeral.go b/beacon-chain/db/filesystem/mock.go similarity index 84% rename from beacon-chain/db/filesystem/ephemeral.go rename to beacon-chain/db/filesystem/mock.go index 729f815bbc6a..ba079e7cc4dc 100644 --- a/beacon-chain/db/filesystem/ephemeral.go +++ b/beacon-chain/db/filesystem/mock.go @@ -12,7 +12,7 @@ import ( // improving test performance and simplifying cleanup. func NewEphemeralBlobStorage(t testing.TB) *BlobStorage { fs := afero.NewMemMapFs() - pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache()) if err != nil { t.Fatal("test setup issue", err) } @@ -23,7 +23,7 @@ func NewEphemeralBlobStorage(t testing.TB) *BlobStorage { // in order to interact with it outside the parameters of the BlobStorage api. func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) { fs := afero.NewMemMapFs() - pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache()) if err != nil { t.Fatal("test setup issue", err) } @@ -61,3 +61,15 @@ func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage) bs := &BlobStorage{fs: fs} return &BlobMocker{fs: fs, bs: bs}, bs } + +func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStorageSummarizer { + c := newBlobStorageCache() + for k, v := range set { + for i := range v { + if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil { + t.Fatal(err) + } + } + } + return c +} diff --git a/beacon-chain/db/filesystem/pruner.go b/beacon-chain/db/filesystem/pruner.go index cb5a10121b65..e42855071c51 100644 --- a/beacon-chain/db/filesystem/pruner.go +++ b/beacon-chain/db/filesystem/pruner.go @@ -33,17 +33,32 @@ type blobPruner struct { prunedBefore atomic.Uint64 windowSize primitives.Slot cache *blobStorageCache - cacheWarmed chan struct{} + cacheReady chan struct{} + warmed bool fs afero.Fs } -func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) { +type prunerOpt func(*blobPruner) error + +func withWarmedCache() prunerOpt { + return func(p *blobPruner) error { + return p.warmCache() + } +} + +func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) { r, err := slots.EpochStart(retain + retentionBuffer) if err != nil { return nil, errors.Wrap(err, "could not set retentionSlots") } cw := make(chan struct{}) - return &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw}, nil + p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw} + for _, o := range opts { + if err := o(p); err != nil { + return nil, err + } + } + return p, nil } // notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache @@ -57,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e return nil } go func() { + p.Lock() + defer p.Unlock() if err := p.prune(primitives.Slot(pruned)); err != nil { log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest) } @@ -74,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot { } func (p *blobPruner) warmCache() error { + p.Lock() + defer p.Unlock() if err := p.prune(0); err != nil { return err } - close(p.cacheWarmed) + if !p.warmed { + p.warmed = true + close(p.cacheReady) + } return nil } func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) { select { - case <-p.cacheWarmed: + case <-p.cacheReady: return p.cache, nil case <-ctx.Done(): return nil, ctx.Err() @@ -94,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error // It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs). // This is so that we keep a slight buffer and blobs are deleted after n+2 epochs. func (p *blobPruner) prune(pruneBefore primitives.Slot) error { - p.Lock() - defer p.Unlock() start := time.Now() totalPruned, totalErr := 0, 0 // Customize logging/metrics behavior for the initial cache warmup when slot=0. diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 8b83925a2435..3d4ab052ec1e 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -33,7 +33,6 @@ go_library( "//beacon-chain/verification:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/params:go_default_library", - "//consensus-types:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", @@ -83,10 +82,10 @@ go_test( "//beacon-chain/p2p/types:go_default_library", "//beacon-chain/startup:go_default_library", "//beacon-chain/sync:go_default_library", - "//beacon-chain/sync/verify:go_default_library", "//beacon-chain/verification:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 13bb09e2ce28..396a95abb4e2 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" @@ -18,7 +19,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/params" - consensus_types "github.com/prysmaticlabs/prysm/v5/consensus-types" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" blocks2 "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" @@ -27,6 +27,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/crypto/rand" "github.com/prysmaticlabs/prysm/v5/math" p2ppb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -76,6 +77,7 @@ type blocksFetcherConfig struct { db db.ReadOnlyDatabase peerFilterCapacityWeight float64 mode syncMode + bs filesystem.BlobStorageSummarizer } // blocksFetcher is a service to fetch chain data from peers. @@ -91,6 +93,7 @@ type blocksFetcher struct { ctxMap prysmsync.ContextByteVersions p2p p2p.P2P db db.ReadOnlyDatabase + bs filesystem.BlobStorageSummarizer blocksPerPeriod uint64 rateLimiter *leakybucket.Collector peerLocks map[peer.ID]*peerLock @@ -149,6 +152,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc ctxMap: cfg.ctxMap, p2p: cfg.p2p, db: cfg.db, + bs: cfg.bs, blocksPerPeriod: uint64(blocksPerPeriod), rateLimiter: rateLimiter, peerLocks: make(map[peer.ID]*peerLock), @@ -372,22 +376,17 @@ func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBl return rb, nil } -func blobRequest(bwb []blocks2.BlockWithROBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest { - if len(bwb) == 0 { - return nil - } - lowest := lowestSlotNeedsBlob(blobWindowStart, bwb) - if lowest == nil { - return nil - } - highest := bwb[len(bwb)-1].Block.Block().Slot() - return &p2ppb.BlobSidecarsByRangeRequest{ - StartSlot: *lowest, - Count: uint64(highest.SubSlot(*lowest)) + 1, - } +type commitmentCount struct { + slot primitives.Slot + root [32]byte + count int } -func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithROBlobs) *primitives.Slot { +type commitmentCountList []commitmentCount + +// countCommitments makes a list of all blocks that have commitments that need to be satisfied. +// This gives us a representation to finish building the request that is lightweight and readable for testing. +func countCommitments(bwb []blocks2.BlockWithROBlobs, retentionStart primitives.Slot) commitmentCountList { if len(bwb) == 0 { return nil } @@ -397,8 +396,13 @@ func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWith if bwb[len(bwb)-1].Block.Block().Slot() < retentionStart { return nil } - for _, b := range bwb { + fc := make([]commitmentCount, 0, len(bwb)) + for i := range bwb { + b := bwb[i] slot := b.Block.Block().Slot() + if b.Block.Version() < version.Deneb { + continue + } if slot < retentionStart { continue } @@ -406,67 +410,116 @@ func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWith if err != nil || len(commits) == 0 { continue } - return &slot + fc = append(fc, commitmentCount{slot: slot, root: b.Block.Root(), count: len(commits)}) } - return nil + return fc } -func sortBlobs(blobs []blocks.ROBlob) []blocks.ROBlob { - sort.Slice(blobs, func(i, j int) bool { - if blobs[i].Slot() == blobs[j].Slot() { - return blobs[i].Index < blobs[j].Index +// func slotRangeForCommitmentCounts(cc []commitmentCount, bs filesystem.BlobStorageSummarizer) *blobRange { +func (cc commitmentCountList) blobRange(bs filesystem.BlobStorageSummarizer) *blobRange { + if len(cc) == 0 { + return nil + } + // If we don't have a blob summarizer, can't check local blobs, request blobs over complete range. + if bs == nil { + return &blobRange{low: cc[0].slot, high: cc[len(cc)-1].slot} + } + for i := range cc { + hci := cc[i] + // This list is always ordered by increasing slot, per req/resp validation rules. + // Skip through slots until we find one with missing blobs. + if bs.Summary(hci.root).AllAvailable(hci.count) { + continue } - return blobs[i].Slot() < blobs[j].Slot() - }) + // The slow of the first missing blob is the lower bound. + // If we don't find an upper bound, we'll have a 1 slot request (same low/high). + needed := &blobRange{low: hci.slot, high: hci.slot} + // Iterate backward through the list to find the highest missing slot above the lower bound. + // Return the complete range as soon as we find it; if lower bound is already the last element, + // or if we never find an upper bound, we'll fall through to the bounds being equal after this loop. + for z := len(cc) - 1; z > i; z-- { + hcz := cc[z] + if bs.Summary(hcz.root).AllAvailable(hcz.count) { + continue + } + needed.high = hcz.slot + return needed + } + return needed + } + return nil +} - return blobs +type blobRange struct { + low primitives.Slot + high primitives.Slot +} + +func (r *blobRange) Request() *p2ppb.BlobSidecarsByRangeRequest { + if r == nil { + return nil + } + return &p2ppb.BlobSidecarsByRangeRequest{ + StartSlot: r.low, + Count: uint64(r.high.SubSlot(r.low)) + 1, + } } var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses") var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments") -func verifyAndPopulateBlobs(bwb []blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, blobWindowStart primitives.Slot) ([]blocks2.BlockWithROBlobs, error) { - // Assumes bwb has already been sorted by sortedBlockWithVerifiedBlobSlice. - blobs = sortBlobs(blobs) - blobi := 0 - // Loop over all blocks, and each time a commitment is observed, advance the index into the blob slice. - // The assumption is that the blob slice contains a value for every commitment in the blocks it is based on, - // correctly ordered by slot and blob index. - for i, bb := range bwb { - block := bb.Block.Block() - if block.Slot() < blobWindowStart { +func verifyAndPopulateBlobs(bwb []blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, req *p2ppb.BlobSidecarsByRangeRequest, bss filesystem.BlobStorageSummarizer) ([]blocks2.BlockWithROBlobs, error) { + blobsByRoot := make(map[[32]byte][]blocks.ROBlob) + for i := range blobs { + if blobs[i].Slot() < req.StartSlot { continue } - commits, err := block.Body().BlobKzgCommitments() + br := blobs[i].BlockRoot() + blobsByRoot[br] = append(blobsByRoot[br], blobs[i]) + } + for i := range bwb { + bwi, err := populateBlock(bwb[i], blobsByRoot[bwb[i].Block.Root()], req, bss) if err != nil { - if errors.Is(err, consensus_types.ErrUnsupportedField) { - log. - WithField("blockSlot", block.Slot()). - WithField("retentionStart", blobWindowStart). - Warn("block with slot within blob retention period has version which does not support commitments") + if errors.Is(err, errDidntPopulate) { continue } - return nil, err + return bwb, err } - bb.Blobs = make([]blocks.ROBlob, len(commits)) - for ci := range commits { - // There are more expected commitments in this block, but we've run out of blobs from the response - // (out-of-bound error guard). - if blobi == len(blobs) { - return nil, missingCommitError(bb.Block.Root(), bb.Block.Block().Slot(), commits[ci:]) - } - bl := blobs[blobi] - if err := verify.BlobAlignsWithBlock(bl, bb.Block); err != nil { - return nil, err - } - bb.Blobs[ci] = bl - blobi += 1 - } - bwb[i] = bb + bwb[i] = bwi } return bwb, nil } +var errDidntPopulate = errors.New("skipping population of block") + +func populateBlock(bw blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, req *p2ppb.BlobSidecarsByRangeRequest, bss filesystem.BlobStorageSummarizer) (blocks2.BlockWithROBlobs, error) { + blk := bw.Block + if blk.Version() < version.Deneb || blk.Block().Slot() < req.StartSlot { + return bw, errDidntPopulate + } + commits, err := blk.Block().Body().BlobKzgCommitments() + if err != nil { + return bw, errDidntPopulate + } + if len(commits) == 0 { + return bw, errDidntPopulate + } + // Drop blobs on the floor if we already have them. + if bss != nil && bss.Summary(blk.Root()).AllAvailable(len(commits)) { + return bw, errDidntPopulate + } + if len(commits) != len(blobs) { + return bw, missingCommitError(blk.Root(), blk.Block().Slot(), commits) + } + for ci := range commits { + if err := verify.BlobAlignsWithBlock(blobs[ci], blk); err != nil { + return bw, err + } + } + bw.Blobs = blobs + return bw, nil +} + func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) error { missStr := make([]string, 0, len(missing)) for k := range missing { @@ -488,7 +541,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl return nil, err } // Construct request message based on observed interval of blocks in need of blobs. - req := blobRequest(bwb, blobWindowStart) + req := countCommitments(bwb, blobWindowStart).blobRange(f.bs).Request() if req == nil { return bwb, nil } @@ -508,7 +561,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl continue } f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) - robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) + robs, err := verifyAndPopulateBlobs(bwb, blobs, req, f.bs) if err != nil { log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response") continue diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 0852d8f7f5c5..e60f951b51a5 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math" - "math/rand" "sort" "sync" "testing" @@ -14,13 +13,14 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" @@ -960,28 +960,7 @@ func TestTimeToWait(t *testing.T) { } } -func TestSortBlobs(t *testing.T) { - _, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) - shuffled := make([]blocks.ROBlob, len(blobs)) - for i := range blobs { - shuffled[i] = blobs[i] - } - rand.Shuffle(len(shuffled), func(i, j int) { - shuffled[i], shuffled[j] = shuffled[j], shuffled[i] - }) - sorted := sortBlobs(shuffled) - require.Equal(t, len(sorted), len(shuffled)) - for i := range blobs { - expect := blobs[i] - actual := sorted[i] - require.Equal(t, expect.Slot(), actual.Slot()) - require.Equal(t, expect.Index, actual.Index) - require.Equal(t, bytesutil.ToBytes48(expect.KzgCommitment), bytesutil.ToBytes48(actual.KzgCommitment)) - require.Equal(t, expect.BlockRoot(), actual.BlockRoot()) - } -} - -func TestLowestSlotNeedsBlob(t *testing.T) { +func TestBlobRangeForBlocks(t *testing.T) { blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) for i := range blks { @@ -990,12 +969,12 @@ func TestLowestSlotNeedsBlob(t *testing.T) { retentionStart := primitives.Slot(5) bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs) require.NoError(t, err) - lowest := lowestSlotNeedsBlob(retentionStart, bwb) - require.Equal(t, retentionStart, *lowest) + bounds := countCommitments(bwb, retentionStart).blobRange(nil) + require.Equal(t, retentionStart, bounds.low) higher := primitives.Slot(len(blks) + 1) - lowest = lowestSlotNeedsBlob(higher, bwb) - var nilSlot *primitives.Slot - require.Equal(t, nilSlot, lowest) + bounds = countCommitments(bwb, higher).blobRange(nil) + var nilBounds *blobRange + require.Equal(t, nilBounds, bounds) blks, _ = util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) sbbs = make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) @@ -1008,14 +987,14 @@ func TestLowestSlotNeedsBlob(t *testing.T) { next := bwb[6].Block.Block().Slot() skip := bwb[5].Block.Block() bwb[5].Block, _ = util.GenerateTestDenebBlockWithSidecar(t, skip.ParentRoot(), skip.Slot(), 0) - lowest = lowestSlotNeedsBlob(retentionStart, bwb) - require.Equal(t, next, *lowest) + bounds = countCommitments(bwb, retentionStart).blobRange(nil) + require.Equal(t, next, bounds.low) } func TestBlobRequest(t *testing.T) { var nilReq *ethpb.BlobSidecarsByRangeRequest // no blocks - req := blobRequest([]blocks.BlockWithROBlobs{}, 0) + req := countCommitments([]blocks.BlockWithROBlobs{}, 0).blobRange(nil).Request() require.Equal(t, nilReq, req) blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) @@ -1027,26 +1006,180 @@ func TestBlobRequest(t *testing.T) { maxBlkSlot := primitives.Slot(len(blks) - 1) tooHigh := primitives.Slot(len(blks) + 1) - req = blobRequest(bwb, tooHigh) + req = countCommitments(bwb, tooHigh).blobRange(nil).Request() require.Equal(t, nilReq, req) - req = blobRequest(bwb, maxBlkSlot) + req = countCommitments(bwb, maxBlkSlot).blobRange(nil).Request() require.Equal(t, uint64(1), req.Count) require.Equal(t, maxBlkSlot, req.StartSlot) halfway := primitives.Slot(5) - req = blobRequest(bwb, halfway) + req = countCommitments(bwb, halfway).blobRange(nil).Request() require.Equal(t, halfway, req.StartSlot) // adding 1 to include the halfway slot itself require.Equal(t, uint64(1+maxBlkSlot-halfway), req.Count) before := bwb[0].Block.Block().Slot() allAfter := bwb[1:] - req = blobRequest(allAfter, before) + req = countCommitments(allAfter, before).blobRange(nil).Request() require.Equal(t, allAfter[0].Block.Block().Slot(), req.StartSlot) require.Equal(t, len(allAfter), int(req.Count)) } +func TestCountCommitments(t *testing.T) { + // no blocks + // blocks before retention start filtered + // blocks without commitments filtered + // pre-deneb filtered + // variety of commitment counts are accurate, from 1 to max + type testcase struct { + name string + bwb func(t *testing.T, c testcase) []blocks.BlockWithROBlobs + numBlocks int + retStart primitives.Slot + resCount int + } + cases := []testcase{ + { + name: "nil blocks is safe", + bwb: func(t *testing.T, c testcase) []blocks.BlockWithROBlobs { + return nil + }, + retStart: 0, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + bwb := c.bwb(t, c) + cc := countCommitments(bwb, c.retStart) + require.Equal(t, c.resCount, len(cc)) + }) + } +} + +func TestCommitmentCountList(t *testing.T) { + cases := []struct { + name string + cc commitmentCountList + bss func(*testing.T) filesystem.BlobStorageSummarizer + expected *blobRange + request *ethpb.BlobSidecarsByRangeRequest + }{ + { + name: "nil commitmentCount is safe", + cc: nil, + expected: nil, + request: nil, + }, + { + name: "nil bss, single slot", + cc: []commitmentCount{ + {slot: 11235, count: 1}, + }, + expected: &blobRange{low: 11235, high: 11235}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 1}, + }, + { + name: "nil bss, sparse slots", + cc: []commitmentCount{ + {slot: 11235, count: 1}, + {slot: 11240, count: fieldparams.MaxBlobsPerBlock}, + {slot: 11250, count: 3}, + }, + expected: &blobRange{low: 11235, high: 11250}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 16}, + }, + { + name: "AllAvailable in middle, some avail low, none high", + bss: func(t *testing.T) filesystem.BlobStorageSummarizer { + onDisk := map[[32]byte][]int{ + bytesutil.ToBytes32([]byte("0")): {0, 1}, + bytesutil.ToBytes32([]byte("1")): {0, 1, 2, 3, 4, 5}, + } + return filesystem.NewMockBlobStorageSummarizer(t, onDisk) + }, + cc: []commitmentCount{ + {slot: 0, count: 3, root: bytesutil.ToBytes32([]byte("0"))}, + {slot: 5, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("1"))}, + {slot: 15, count: 3}, + }, + expected: &blobRange{low: 0, high: 15}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 0, Count: 16}, + }, + { + name: "AllAvailable at high and low", + bss: func(t *testing.T) filesystem.BlobStorageSummarizer { + onDisk := map[[32]byte][]int{ + bytesutil.ToBytes32([]byte("0")): {0, 1}, + bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5}, + } + return filesystem.NewMockBlobStorageSummarizer(t, onDisk) + }, + cc: []commitmentCount{ + {slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))}, + {slot: 5, count: 3}, + {slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))}, + }, + expected: &blobRange{low: 5, high: 5}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 1}, + }, + { + name: "AllAvailable at high and low, adjacent range in middle", + bss: func(t *testing.T) filesystem.BlobStorageSummarizer { + onDisk := map[[32]byte][]int{ + bytesutil.ToBytes32([]byte("0")): {0, 1}, + bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5}, + } + return filesystem.NewMockBlobStorageSummarizer(t, onDisk) + }, + cc: []commitmentCount{ + {slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))}, + {slot: 5, count: 3}, + {slot: 6, count: 3}, + {slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))}, + }, + expected: &blobRange{low: 5, high: 6}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 2}, + }, + { + name: "AllAvailable at high and low, range in middle", + bss: func(t *testing.T) filesystem.BlobStorageSummarizer { + onDisk := map[[32]byte][]int{ + bytesutil.ToBytes32([]byte("0")): {0, 1}, + bytesutil.ToBytes32([]byte("1")): {0, 1}, + bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5}, + } + return filesystem.NewMockBlobStorageSummarizer(t, onDisk) + }, + cc: []commitmentCount{ + {slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))}, + {slot: 5, count: 3, root: bytesutil.ToBytes32([]byte("1"))}, + {slot: 10, count: 3}, + {slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))}, + }, + expected: &blobRange{low: 5, high: 10}, + request: ðpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 6}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var bss filesystem.BlobStorageSummarizer + if c.bss != nil { + bss = c.bss(t) + } + br := c.cc.blobRange(bss) + require.DeepEqual(t, c.expected, br) + if c.request == nil { + require.IsNil(t, br.Request()) + } else { + req := br.Request() + require.DeepEqual(t, req.StartSlot, c.request.StartSlot) + require.DeepEqual(t, req.Count, c.request.Count) + } + }) + } +} + func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROBlobs, []blocks.ROBlob) { blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks) sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) @@ -1058,91 +1191,75 @@ func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROB return bwb, blobs } -func TestVerifyAndPopulateBlobs(t *testing.T) { - bwb, blobs := testSequenceBlockWithBlob(t, 10) - lastBlobIdx := len(blobs) - 1 - // Blocks are all before the retention window, blobs argument is ignored. - windowAfter := bwb[len(bwb)-1].Block.Block().Slot() + 1 - _, err := verifyAndPopulateBlobs(bwb, nil, windowAfter) - require.NoError(t, err) - - firstBlockSlot := bwb[0].Block.Block().Slot() - // slice off blobs for the last block so we hit the out of bounds / blob exhaustion check. - _, err = verifyAndPopulateBlobs(bwb, blobs[0:len(blobs)-6], firstBlockSlot) - require.ErrorIs(t, err, errMissingBlobsForBlockCommitments) - - bwb, blobs = testSequenceBlockWithBlob(t, 10) - // Misalign the slots of the blobs for the first block to simulate them being missing from the response. - offByOne := blobs[0].Slot() - for i := range blobs { - if blobs[i].Slot() == offByOne { - blobs[i].SignedBlockHeader.Header.Slot = offByOne + 1 - } +func testReqFromResp(bwb []blocks.BlockWithROBlobs) *ethpb.BlobSidecarsByRangeRequest { + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: bwb[0].Block.Block().Slot(), + Count: uint64(bwb[len(bwb)-1].Block.Block().Slot()-bwb[0].Block.Block().Slot()) + 1, } - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned) - - bwb, blobs = testSequenceBlockWithBlob(t, 10) - blobs[lastBlobIdx], err = blocks.NewROBlobWithRoot(blobs[lastBlobIdx].BlobSidecar, blobs[0].BlockRoot()) - require.NoError(t, err) - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned) - - bwb, blobs = testSequenceBlockWithBlob(t, 10) - blobs[lastBlobIdx].Index = 100 - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrIncorrectBlobIndex) - - bwb, blobs = testSequenceBlockWithBlob(t, 10) - blobs[lastBlobIdx].SignedBlockHeader.Header.ProposerIndex = 100 - blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar) - require.NoError(t, err) - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned) - - bwb, blobs = testSequenceBlockWithBlob(t, 10) - blobs[lastBlobIdx].SignedBlockHeader.Header.ParentRoot = blobs[0].SignedBlockHeader.Header.ParentRoot - blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar) - require.NoError(t, err) - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned) - - var emptyKzg [48]byte - bwb, blobs = testSequenceBlockWithBlob(t, 10) - blobs[lastBlobIdx].KzgCommitment = emptyKzg[:] - blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar) - require.NoError(t, err) - _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.ErrorIs(t, err, verify.ErrMismatchedBlobCommitments) +} - // happy path - bwb, blobs = testSequenceBlockWithBlob(t, 10) +func TestVerifyAndPopulateBlobs(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + bwb, blobs := testSequenceBlockWithBlob(t, 10) - expectedCommits := make(map[[48]byte]bool) - for _, bl := range blobs { - expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true - } - // The assertions using this map expect all commitments to be unique, so make sure that stays true. - require.Equal(t, len(blobs), len(expectedCommits)) + expectedCommits := make(map[[48]byte]bool) + for _, bl := range blobs { + expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true + } + require.Equal(t, len(blobs), len(expectedCommits)) - bwb, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) - require.NoError(t, err) - for _, bw := range bwb { - commits, err := bw.Block.Block().Body().BlobKzgCommitments() + bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil) + require.NoError(t, err) + for _, bw := range bwb { + commits, err := bw.Block.Block().Body().BlobKzgCommitments() + require.NoError(t, err) + require.Equal(t, len(commits), len(bw.Blobs)) + for i := range commits { + bc := bytesutil.ToBytes48(commits[i]) + require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment)) + // Since we delete entries we've seen, duplicates will cause an error here. + _, ok := expectedCommits[bc] + // Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them. + require.Equal(t, true, ok) + delete(expectedCommits, bc) + } + } + // We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end. + require.Equal(t, 0, len(expectedCommits)) + }) + t.Run("missing blobs", func(t *testing.T) { + bwb, blobs := testSequenceBlockWithBlob(t, 10) + _, err := verifyAndPopulateBlobs(bwb, blobs[1:], testReqFromResp(bwb), nil) + require.ErrorIs(t, err, errMissingBlobsForBlockCommitments) + }) + t.Run("no blobs for last block", func(t *testing.T) { + bwb, blobs := testSequenceBlockWithBlob(t, 10) + lastIdx := len(bwb) - 1 + lastBlk := bwb[lastIdx].Block + cmts, err := lastBlk.Block().Body().BlobKzgCommitments() require.NoError(t, err) - require.Equal(t, len(commits), len(bw.Blobs)) - for i := range commits { - bc := bytesutil.ToBytes48(commits[i]) - require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment)) - // Since we delete entries we've seen, duplicates will cause an error here. - _, ok := expectedCommits[bc] - // Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them. - require.Equal(t, true, ok) - delete(expectedCommits, bc) + blobs = blobs[0 : len(blobs)-len(cmts)] + lastBlk, _ = util.GenerateTestDenebBlockWithSidecar(t, lastBlk.Block().ParentRoot(), lastBlk.Block().Slot(), 0) + bwb[lastIdx].Block = lastBlk + _, err = verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil) + require.NoError(t, err) + }) + t.Run("blobs not copied if all locally available", func(t *testing.T) { + bwb, blobs := testSequenceBlockWithBlob(t, 10) + // r1 only has some blobs locally available, so we'll still copy them all. + // r7 has all blobs locally available, so we shouldn't copy them. + i1, i7 := 1, 7 + r1, r7 := bwb[i1].Block.Root(), bwb[i7].Block.Root() + onDisk := map[[32]byte][]int{ + r1: {0, 1}, + r7: {0, 1, 2, 3, 4, 5}, } - } - // We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end. - require.Equal(t, 0, len(expectedCommits)) + bss := filesystem.NewMockBlobStorageSummarizer(t, onDisk) + bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), bss) + require.NoError(t, err) + require.Equal(t, 6, len(bwb[i1].Blobs)) + require.Equal(t, 0, len(bwb[i7].Blobs)) + }) } func TestBatchLimit(t *testing.T) { diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 20c3248dddcb..44461bd4fd7b 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" @@ -69,6 +70,7 @@ type blocksQueueConfig struct { p2p p2p.P2P db db.ReadOnlyDatabase mode syncMode + bs filesystem.BlobStorageSummarizer } // blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers) @@ -101,12 +103,16 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { blocksFetcher := cfg.blocksFetcher if blocksFetcher == nil { + if cfg.bs == nil { + log.Warn("rpc fetcher starting without blob availability cache, duplicate blobs may be requested.") + } blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{ ctxMap: cfg.ctxMap, chain: cfg.chain, p2p: cfg.p2p, db: cfg.db, clock: cfg.clock, + bs: cfg.bs, }) } highestExpectedSlot := cfg.highestExpectedSlot @@ -139,7 +145,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { queue.smm.addEventHandler(eventDataReceived, stateScheduled, queue.onDataReceivedEvent(ctx)) queue.smm.addEventHandler(eventTick, stateDataParsed, queue.onReadyToSendEvent(ctx)) queue.smm.addEventHandler(eventTick, stateSkipped, queue.onProcessSkippedEvent(ctx)) - queue.smm.addEventHandler(eventTick, stateSent, queue.onCheckStaleEvent(ctx)) + queue.smm.addEventHandler(eventTick, stateSent, onCheckStaleEvent(ctx)) return queue } @@ -451,7 +457,7 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn // onCheckStaleEvent is an event that allows to mark stale epochs, // so that they can be re-processed. -func (_ *blocksQueue) onCheckStaleEvent(ctx context.Context) eventHandlerFn { +func onCheckStaleEvent(ctx context.Context) eventHandlerFn { return func(m *stateMachine, in interface{}) (stateID, error) { if ctx.Err() != nil { return m.state, ctx.Err() diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 96833402b5ac..8a2ba65f517a 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -971,24 +971,12 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { } func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { - blockBatchLimit := flags.Get().BlockBatchLimit - mc, p2p, _ := initializeTestServices(t, []primitives.Slot{}, []*peerData{}) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - chain: mc, - p2p: p2p, - }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - blocksFetcher: fetcher, - chain: mc, - highestExpectedSlot: primitives.Slot(blockBatchLimit), - }) - handlerFn := queue.onCheckStaleEvent(ctx) + handlerFn := onCheckStaleEvent(ctx) cancel() updatedState, err := handlerFn(&stateMachine{ state: stateSkipped, @@ -998,16 +986,10 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { }) t.Run("invalid input state", func(t *testing.T) { - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - blocksFetcher: fetcher, - chain: mc, - highestExpectedSlot: primitives.Slot(blockBatchLimit), - }) - invalidStates := []stateID{stateNew, stateScheduled, stateDataParsed, stateSkipped} for _, state := range invalidStates { t.Run(state.String(), func(t *testing.T) { - handlerFn := queue.onCheckStaleEvent(ctx) + handlerFn := onCheckStaleEvent(ctx) updatedState, err := handlerFn(&stateMachine{ state: state, }, nil) @@ -1018,12 +1000,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { }) t.Run("process non stale machine", func(t *testing.T) { - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - blocksFetcher: fetcher, - chain: mc, - highestExpectedSlot: primitives.Slot(blockBatchLimit), - }) - handlerFn := queue.onCheckStaleEvent(ctx) + handlerFn := onCheckStaleEvent(ctx) updatedState, err := handlerFn(&stateMachine{ state: stateSent, updated: prysmTime.Now().Add(-staleEpochTimeout / 2), @@ -1034,12 +1011,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { }) t.Run("process stale machine", func(t *testing.T) { - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - blocksFetcher: fetcher, - chain: mc, - highestExpectedSlot: primitives.Slot(blockBatchLimit), - }) - handlerFn := queue.onCheckStaleEvent(ctx) + handlerFn := onCheckStaleEvent(ctx) updatedState, err := handlerFn(&stateMachine{ state: stateSent, updated: prysmTime.Now().Add(-staleEpochTimeout), diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index c91bd1ec4322..5d96478148f5 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -62,7 +63,39 @@ func (s *Service) roundRobinSync(genesis time.Time) error { return s.syncToNonFinalizedEpoch(ctx, genesis) } -// syncToFinalizedEpoch sync from head to best known finalized epoch. +func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.Slot, mode syncMode) (*blocksQueue, error) { + vr := s.clock.GenesisValidatorsRoot() + ctxMap, err := sync.ContextByteVersionsForValRoot(vr) + if err != nil { + return nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + } + + summarizer, err := s.cfg.BlobStorage.WaitForSummarizer(ctx) + if err != nil { + // The summarizer is an optional optimization, we can continue without, only stop if there is a different error. + if !errors.Is(err, filesystem.ErrBlobStorageSummarizerUnavailable) { + return nil, err + } + summarizer = nil // This should already be nil, but we'll set it just to be safe. + } + cfg := &blocksQueueConfig{ + p2p: s.cfg.P2P, + db: s.cfg.DB, + chain: s.cfg.Chain, + clock: s.clock, + ctxMap: ctxMap, + highestExpectedSlot: highestSlot, + mode: mode, + bs: summarizer, + } + queue := newBlocksQueue(ctx, cfg) + if err := queue.start(); err != nil { + return nil, err + } + return queue, nil +} + +// syncToFinalizedEpoch sync from head to the best known finalized epoch. func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) error { highestFinalizedSlot, err := slots.EpochStart(s.highestFinalizedEpoch()) if err != nil { @@ -74,28 +107,12 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e return nil } - vr := s.clock.GenesisValidatorsRoot() - ctxMap, err := sync.ContextByteVersionsForValRoot(vr) + queue, err := s.startBlocksQueue(ctx, highestFinalizedSlot, modeStopOnFinalizedEpoch) if err != nil { - return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) - } - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - p2p: s.cfg.P2P, - db: s.cfg.DB, - chain: s.cfg.Chain, - clock: s.clock, - ctxMap: ctxMap, - highestExpectedSlot: highestFinalizedSlot, - mode: modeStopOnFinalizedEpoch, - }) - if err := queue.start(); err != nil { return err } for data := range queue.fetchedData { - // If blobs are available. Verify blobs and blocks are consistence. - // We can't import a block if there's no associated blob within DA bound. - // The blob has to pass aggregated proof check. s.processFetchedData(ctx, genesis, s.cfg.Chain.HeadSlot(), data) } @@ -113,21 +130,8 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e // syncToNonFinalizedEpoch sync from head to best known non-finalized epoch supported by majority // of peers (no less than MinimumSyncPeers*2 peers). func (s *Service) syncToNonFinalizedEpoch(ctx context.Context, genesis time.Time) error { - vr := s.clock.GenesisValidatorsRoot() - ctxMap, err := sync.ContextByteVersionsForValRoot(vr) + queue, err := s.startBlocksQueue(ctx, slots.Since(genesis), modeNonConstrained) if err != nil { - return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) - } - queue := newBlocksQueue(ctx, &blocksQueueConfig{ - p2p: s.cfg.P2P, - db: s.cfg.DB, - chain: s.cfg.Chain, - clock: s.clock, - ctxMap: ctxMap, - highestExpectedSlot: slots.Since(genesis), - mode: modeNonConstrained, - }) - if err := queue.start(); err != nil { return err } for data := range queue.fetchedData {