From c9cb7b56c77eafad91cfb71f3a8599203fcee372 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 08:22:33 -0800 Subject: [PATCH 01/32] remove context --- module/block_iterator/height_based/iterator.go | 11 ----------- module/block_iterator/height_based/iterator_test.go | 3 +-- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go index e93bc4ee33e..e56c9348e90 100644 --- a/module/block_iterator/height_based/iterator.go +++ b/module/block_iterator/height_based/iterator.go @@ -1,7 +1,6 @@ package height_based import ( - "context" "fmt" "github.com/onflow/flow-go/model/flow" @@ -16,7 +15,6 @@ type HeightIterator struct { // config endHeight uint64 - ctx context.Context // state nextHeight uint64 @@ -28,14 +26,12 @@ var _ module.BlockIterator = (*HeightIterator)(nil) func NewHeightIterator( headers storage.Headers, progress module.IterateProgressWriter, - ctx context.Context, job module.IterateJob, ) (module.BlockIterator, error) { return &HeightIterator{ headers: headers, progress: progress, endHeight: job.End, - ctx: ctx, nextHeight: job.Start, }, nil } @@ -44,13 +40,6 @@ func NewHeightIterator( // it iterates from lower height to higher height. // when iterating a height, it iterates over all sibling blocks at that height func (b *HeightIterator) Next() (flow.Identifier, bool, error) { - // exit when the context is done - select { - case <-b.ctx.Done(): - return flow.ZeroID, false, nil - default: - } - if b.nextHeight > b.endHeight { return flow.ZeroID, false, nil } diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go index 6adcdf3827c..5c62e8b4f06 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/height_based/iterator_test.go @@ -1,7 +1,6 @@ package height_based import ( - "context" "fmt" "testing" @@ -35,7 +34,7 @@ func TestIterateHeight(t *testing.T) { // b0 is the root block, iterate from b1 to b3 job := module.IterateJob{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) - iter, err := NewHeightIterator(headers, progress, context.Background(), job) + iter, err := NewHeightIterator(headers, progress, job) require.NoError(t, err) // iterate through all blocks From f9976f16aae0d292bdac1e4ef52260183d95eaac Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 08:23:35 -0800 Subject: [PATCH 02/32] rename to IterateRange --- module/block_iterator.go | 10 +++++----- module/block_iterator/height_based/iterator.go | 2 +- module/block_iterator/height_based/iterator_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 593f8786edb..c600a2f4530 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -6,20 +6,20 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// IterateJob defines the range of blocks to iterate over +// IterateRange defines the range of blocks to iterate over // the range could be either view based range or height based range. // when specifying the range, the start and end are inclusive, and the end must be greater than or // equal to the start -type IterateJob struct { +type IterateRange struct { Start uint64 // the start of the range End uint64 // the end of the range } -// IterateJobCreator is an interface for creating iterate jobs +// IterateRangeCreator is an interface for creating iterate jobs type IteratorJobCreator interface { // CreateJob takes a progress reader which is used to read the progress of the iterator // and returns an iterate job that specifies the range of blocks to iterate over - CreateJob(IterateProgressReader) (IterateJob, error) + CreateJob(IterateProgressReader) (IterateRange, error) } // IterateProgressReader reads the progress of the iterator, useful for resuming the iteration @@ -71,7 +71,7 @@ type IteratorCreator interface { // if the progress is only saved at the end of the iteration, then if the iteration // was interrupted, then the iterator will start from the beginning of the range again, // which means some blocks might be iterated multiple times. - CreateIterator(IterateJob, IterateProgressWriter) (BlockIterator, error) + CreateIterator(IterateRange, IterateProgressWriter) (BlockIterator, error) } type IteratorFactory struct { diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go index e56c9348e90..7d2078637e4 100644 --- a/module/block_iterator/height_based/iterator.go +++ b/module/block_iterator/height_based/iterator.go @@ -26,7 +26,7 @@ var _ module.BlockIterator = (*HeightIterator)(nil) func NewHeightIterator( headers storage.Headers, progress module.IterateProgressWriter, - job module.IterateJob, + job module.IterateRange, ) (module.BlockIterator, error) { return &HeightIterator{ headers: headers, diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go index 5c62e8b4f06..1ccbde54743 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/height_based/iterator_test.go @@ -32,7 +32,7 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 - job := module.IterateJob{Start: b1.Height, End: b3.Height} + job := module.IterateRange{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) iter, err := NewHeightIterator(headers, progress, job) require.NoError(t, err) From de300c6b7450b07d27c50a89d7d5414c0c0e5186 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 08:25:23 -0800 Subject: [PATCH 03/32] rename to SaveState/ReadState --- module/block_iterator.go | 12 ++++++------ module/block_iterator/height_based/iterator.go | 2 +- module/block_iterator/height_based/iterator_test.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index c600a2f4530..65652c1b580 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -25,16 +25,16 @@ type IteratorJobCreator interface { // IterateProgressReader reads the progress of the iterator, useful for resuming the iteration // after restart type IterateProgressReader interface { - // ReadNext reads the next block to iterate + // LoadState reads the next block to iterate // caller must ensure the reader is created by the IterateProgressInitializer, - // otherwise ReadNext would return exception. - ReadNext() (uint64, error) + // otherwise LoadState would return exception. + LoadState() (uint64, error) } // IterateProgressWriter saves the progress of the iterator type IterateProgressWriter interface { - // SaveNext persists the next block to be iterated - SaveNext(uint64) error + // SaveState persists the next block to be iterated + SaveState(uint64) error } // IterateProgressInitializer is an interface for initializing the progress of the iterator @@ -55,7 +55,7 @@ type BlockIterator interface { // Checkpoint saves the current state of the iterator // so that it can be resumed later - // when Checkpoint is called, if SaveNextFunc is called with block A, + // when Checkpoint is called, if SaveStateFunc is called with block A, // then after restart, the iterator will resume from A. Checkpoint() error } diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go index 7d2078637e4..07b44ac5dd1 100644 --- a/module/block_iterator/height_based/iterator.go +++ b/module/block_iterator/height_based/iterator.go @@ -57,7 +57,7 @@ func (b *HeightIterator) Next() (flow.Identifier, bool, error) { // Checkpoint saves the iteration progress to storage func (b *HeightIterator) Checkpoint() error { - err := b.progress.SaveNext(b.nextHeight) + err := b.progress.SaveState(b.nextHeight) if err != nil { return fmt.Errorf("failed to save progress at view %v: %w", b.nextHeight, err) } diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go index 1ccbde54743..2093ebb13b1 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/height_based/iterator_test.go @@ -67,7 +67,7 @@ func TestIterateHeight(t *testing.T) { require.NoError(t, iter.Checkpoint()) - savedNextHeight, err := progress.ReadNext() + savedNextHeight, err := progress.LoadState() require.NoError(t, err) require.Equal(t, b3.Height+1, savedNextHeight, @@ -83,11 +83,11 @@ type saveNextHeight struct { var _ module.IterateProgressWriter = (*saveNextHeight)(nil) var _ module.IterateProgressReader = (*saveNextHeight)(nil) -func (s *saveNextHeight) SaveNext(height uint64) error { +func (s *saveNextHeight) SaveState(height uint64) error { s.savedNextHeight = height return nil } -func (s *saveNextHeight) ReadNext() (uint64, error) { +func (s *saveNextHeight) LoadState() (uint64, error) { return s.savedNextHeight, nil } From 7a9f9996e8e99c99e95efd8118431a2ba411f488 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 08:43:32 -0800 Subject: [PATCH 04/32] fix test cases --- module/block_iterator/height_based/iterator_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go index 2093ebb13b1..32994a3c535 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/height_based/iterator_test.go @@ -39,20 +39,18 @@ func TestIterateHeight(t *testing.T) { // iterate through all blocks visited := make(map[flow.Identifier]struct{}) - count := 0 for { id, ok, err := iter.Next() require.NoError(t, err) if !ok { break } - visited[id] = struct{}{} - // verify we don't iterate two many blocks - count++ - if count > len(bs) { - t.Fatal("visited too many blocks") - } + // preventing duplicate visit + _, ok = visited[id] + require.False(t, ok, fmt.Sprintf("block %v is visited twice", id)) + + visited[id] = struct{}{} } // verify all blocks are visited From 84b38e7f650ad93d003f8d39fc0c048eea07bdee Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 08:58:46 -0800 Subject: [PATCH 05/32] add todo --- module/block_iterator.go | 4 ++++ module/block_iterator/height_based/iterator.go | 17 ++++++++--------- .../height_based/iterator_test.go | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 65652c1b580..8f02772e4ae 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -51,6 +51,10 @@ type BlockIterator interface { // if the iteration is interrupted (e.g. by a restart), the iterator can be // resumed from the last checkpoint, which might result in the same block being // iterated again. + // TODO: once upgraded to go 1.23, consider using the Range iterator + // Range() iter.Seq2[flow.Identifier, error] + // so that the iterator can be used in a for loop: + // for blockID, err := range heightIterator.Range() Next() (blockID flow.Identifier, hasNext bool, exception error) // Checkpoint saves the current state of the iterator diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go index 07b44ac5dd1..2e98195c091 100644 --- a/module/block_iterator/height_based/iterator.go +++ b/module/block_iterator/height_based/iterator.go @@ -5,13 +5,12 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/storage" ) type HeightIterator struct { // dependencies - headers storage.Headers - progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration + getBlockIDByHeight func(uint64) (flow.Identifier, error) + progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration // config endHeight uint64 @@ -24,15 +23,15 @@ var _ module.BlockIterator = (*HeightIterator)(nil) // caller must ensure that both job.Start and job.End are finalized height func NewHeightIterator( - headers storage.Headers, + getBlockIDByHeight func(uint64) (flow.Identifier, error), progress module.IterateProgressWriter, job module.IterateRange, ) (module.BlockIterator, error) { return &HeightIterator{ - headers: headers, - progress: progress, - endHeight: job.End, - nextHeight: job.Start, + getBlockIDByHeight: getBlockIDByHeight, + progress: progress, + endHeight: job.End, + nextHeight: job.Start, }, nil } @@ -45,7 +44,7 @@ func (b *HeightIterator) Next() (flow.Identifier, bool, error) { } // TODO: use storage operation instead to avoid hitting cache - next, err := b.headers.BlockIDByHeight(b.nextHeight) + next, err := b.getBlockIDByHeight(b.nextHeight) if err != nil { return flow.ZeroID, false, fmt.Errorf("failed to fetch block at height %v: %w", b.nextHeight, err) } diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go index 32994a3c535..362c4807c48 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/height_based/iterator_test.go @@ -34,7 +34,7 @@ func TestIterateHeight(t *testing.T) { // b0 is the root block, iterate from b1 to b3 job := module.IterateRange{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) - iter, err := NewHeightIterator(headers, progress, job) + iter, err := NewHeightIterator(headers.BlockIDByHeight, progress, job) require.NoError(t, err) // iterate through all blocks From 4ebe9974d90aaf09153cd798456f996dc20d76e8 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 22 Jan 2025 09:27:37 -0800 Subject: [PATCH 06/32] add comments about not concurrent safe --- module/block_iterator.go | 3 +++ module/block_iterator/height_based/iterator.go | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/module/block_iterator.go b/module/block_iterator.go index 8f02772e4ae..b10b64f7451 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -47,6 +47,7 @@ type IterateProgressInitializer interface { // BlockIterator is an interface for iterating over blocks type BlockIterator interface { // Next returns the next block in the iterator + // Note: this method is not concurrent-safe // Note: a block will only be iterated once in a single iteration, however // if the iteration is interrupted (e.g. by a restart), the iterator can be // resumed from the last checkpoint, which might result in the same block being @@ -61,6 +62,8 @@ type BlockIterator interface { // so that it can be resumed later // when Checkpoint is called, if SaveStateFunc is called with block A, // then after restart, the iterator will resume from A. + // make sure to call this after all the jobs for processing the block IDs returned by + // Next() are completed. Checkpoint() error } diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go index 2e98195c091..9afb7bc0fc3 100644 --- a/module/block_iterator/height_based/iterator.go +++ b/module/block_iterator/height_based/iterator.go @@ -7,6 +7,8 @@ import ( "github.com/onflow/flow-go/module" ) +// HeightIterator is a block iterator that iterates over blocks by height +// it's not concurrent safe, so don't use it in multiple goroutines type HeightIterator struct { // dependencies getBlockIDByHeight func(uint64) (flow.Identifier, error) @@ -38,6 +40,7 @@ func NewHeightIterator( // Next returns the next block ID in the iteration // it iterates from lower height to higher height. // when iterating a height, it iterates over all sibling blocks at that height +// Note: this method is not concurrent-safe func (b *HeightIterator) Next() (flow.Identifier, bool, error) { if b.nextHeight > b.endHeight { return flow.ZeroID, false, nil @@ -55,6 +58,8 @@ func (b *HeightIterator) Next() (flow.Identifier, bool, error) { } // Checkpoint saves the iteration progress to storage +// make sure to call this after all the jobs for processing the block IDs returned by +// Next() are completed. func (b *HeightIterator) Checkpoint() error { err := b.progress.SaveState(b.nextHeight) if err != nil { From b39f33a49e0467ba362d134974fdd1c97baae00b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 8 Jan 2025 09:09:32 -0800 Subject: [PATCH 07/32] add creator --- module/block_iterator/common/progress.go | 27 +++++++++++ module/block_iterator/factory.go | 25 +++++++++++ module/block_iterator/height_based/creator.go | 29 ++++++++++++ .../height_based/initializer.go | 45 +++++++++++++++++++ module/block_iterator/height_based/job.go | 38 ++++++++++++++++ 5 files changed, 164 insertions(+) create mode 100644 module/block_iterator/common/progress.go create mode 100644 module/block_iterator/factory.go create mode 100644 module/block_iterator/height_based/creator.go create mode 100644 module/block_iterator/height_based/initializer.go create mode 100644 module/block_iterator/height_based/job.go diff --git a/module/block_iterator/common/progress.go b/module/block_iterator/common/progress.go new file mode 100644 index 00000000000..f7b2585b29b --- /dev/null +++ b/module/block_iterator/common/progress.go @@ -0,0 +1,27 @@ +package common + +import ( + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +type NextProgress struct { + store storage.ConsumerProgress +} + +var _ module.IterateProgressReader = (*NextProgress)(nil) +var _ module.IterateProgressWriter = (*NextProgress)(nil) + +func NewNextProgress(store storage.ConsumerProgress) *NextProgress { + return &NextProgress{ + store: store, + } +} + +func (n *NextProgress) ReadNext() (uint64, error) { + return n.store.ProcessedIndex() +} + +func (n *NextProgress) SaveNext(next uint64) error { + return n.store.SetProcessedIndex(next) +} diff --git a/module/block_iterator/factory.go b/module/block_iterator/factory.go new file mode 100644 index 00000000000..08fd8ec6fb7 --- /dev/null +++ b/module/block_iterator/factory.go @@ -0,0 +1,25 @@ +package block_iterator + +import ( + "context" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/block_iterator/height_based" + "github.com/onflow/flow-go/storage" +) + +func NewHeightIteratorFactory( + ctx context.Context, + headers storage.Headers, + progress storage.ConsumerProgress, + getRoot func() (*flow.Header, error), + latest func() (*flow.Header, error), +) (*module.IteratorFactory, error) { + + initializer := height_based.NewInitializer(progress, getRoot) + creator := height_based.NewHeightBasedIteratorCreator(ctx, headers) + jobCreator := height_based.NewHeightIteratorJobCreator(latest) + + return module.NewIteratorFactory(initializer, creator, jobCreator) +} diff --git a/module/block_iterator/height_based/creator.go b/module/block_iterator/height_based/creator.go new file mode 100644 index 00000000000..75191ef0ec3 --- /dev/null +++ b/module/block_iterator/height_based/creator.go @@ -0,0 +1,29 @@ +package height_based + +import ( + "context" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +type HeightBasedIteratorCreator struct { + ctx context.Context + headers storage.Headers +} + +var _ module.IteratorCreator = (*HeightBasedIteratorCreator)(nil) + +func NewHeightBasedIteratorCreator( + ctx context.Context, // for cancelling the iteration + headers storage.Headers, // for looking up block by height +) *HeightBasedIteratorCreator { + return &HeightBasedIteratorCreator{ + ctx: ctx, + headers: headers, + } +} + +func (h *HeightBasedIteratorCreator) CreateIterator(job module.IterateJob, writer module.IterateProgressWriter) (module.BlockIterator, error) { + return NewHeightIterator(h.headers, writer, h.ctx, job) +} diff --git a/module/block_iterator/height_based/initializer.go b/module/block_iterator/height_based/initializer.go new file mode 100644 index 00000000000..c8cac156616 --- /dev/null +++ b/module/block_iterator/height_based/initializer.go @@ -0,0 +1,45 @@ +package height_based + +import ( + "errors" + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/block_iterator/common" + "github.com/onflow/flow-go/storage" +) + +type HeightProgressInitializer struct { + progress storage.ConsumerProgress + getRoot func() (*flow.Header, error) +} + +var _ module.IterateProgressInitializer = (*HeightProgressInitializer)(nil) + +func NewInitializer(progress storage.ConsumerProgress, getRoot func() (*flow.Header, error)) *HeightProgressInitializer { + return &HeightProgressInitializer{ + progress: progress, + getRoot: getRoot, + } +} + +func (h *HeightProgressInitializer) Init() (module.IterateProgressReader, module.IterateProgressWriter, error) { + _, err := h.progress.ProcessedIndex() + if errors.Is(err, storage.ErrNotFound) { + root, err := h.getRoot() + if err != nil { + return nil, nil, fmt.Errorf("failed to get root block: %w", err) + } + + next := root.Height + 1 + err = h.progress.InitProcessedIndex(next) + if err != nil { + return nil, nil, fmt.Errorf("failed to init processed index: %w", err) + } + } + + nextProgress := common.NewNextProgress(h.progress) + + return nextProgress, nextProgress, nil +} diff --git a/module/block_iterator/height_based/job.go b/module/block_iterator/height_based/job.go new file mode 100644 index 00000000000..da485ff2415 --- /dev/null +++ b/module/block_iterator/height_based/job.go @@ -0,0 +1,38 @@ +package height_based + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" +) + +type HeightIteratorJobCreator struct { + latest func() (*flow.Header, error) +} + +var _ module.IteratorJobCreator = (*HeightIteratorJobCreator)(nil) + +func NewHeightIteratorJobCreator(latest func() (*flow.Header, error)) *HeightIteratorJobCreator { + return &HeightIteratorJobCreator{ + latest: latest, + } +} + +func (h *HeightIteratorJobCreator) CreateJob(reader module.IterateProgressReader) (module.IterateJob, error) { + next, err := reader.ReadNext() + if err != nil { + return module.IterateJob{}, fmt.Errorf("failed to read next height: %w", err) + } + + latest, err := h.latest() + if err != nil { + return module.IterateJob{}, fmt.Errorf("failed to get latest block: %w", err) + } + + // iterate from next to latest (inclusive) + return module.IterateJob{ + Start: next, + End: latest.Height, + }, nil +} From b74226a33a7f4c501f59cd869f331cc86fc7fde1 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 09:10:27 -0800 Subject: [PATCH 08/32] refactor to simplify --- module/block_iterator.go | 67 ++---------------- module/block_iterator/create.go | 31 +++++++++ module/block_iterator/factory.go | 25 ------- module/block_iterator/height_based/creator.go | 29 -------- .../block_iterator/height_based/iterator.go | 69 ------------------- module/block_iterator/height_based/job.go | 38 ---------- .../{height_based => }/initializer.go | 20 +++--- module/block_iterator/iterator.go | 68 ++++++++++++++++++ .../{height_based => }/iterator_test.go | 7 +- .../block_iterator/{common => }/progress.go | 6 +- module/block_iterator/range.go | 37 ++++++++++ 11 files changed, 157 insertions(+), 240 deletions(-) create mode 100644 module/block_iterator/create.go delete mode 100644 module/block_iterator/factory.go delete mode 100644 module/block_iterator/height_based/creator.go delete mode 100644 module/block_iterator/height_based/iterator.go delete mode 100644 module/block_iterator/height_based/job.go rename module/block_iterator/{height_based => }/initializer.go (53%) create mode 100644 module/block_iterator/iterator.go rename module/block_iterator/{height_based => }/iterator_test.go (92%) rename module/block_iterator/{common => }/progress.go (77%) create mode 100644 module/block_iterator/range.go diff --git a/module/block_iterator.go b/module/block_iterator.go index b10b64f7451..592829626f0 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -1,8 +1,6 @@ package module import ( - "fmt" - "github.com/onflow/flow-go/model/flow" ) @@ -15,11 +13,11 @@ type IterateRange struct { End uint64 // the end of the range } -// IterateRangeCreator is an interface for creating iterate jobs -type IteratorJobCreator interface { - // CreateJob takes a progress reader which is used to read the progress of the iterator - // and returns an iterate job that specifies the range of blocks to iterate over - CreateJob(IterateProgressReader) (IterateRange, error) +// IterateRangeCreator is an interface for creating iterate ranges +type IteratorRangeCreator interface { + // CreateRange takes a progress reader which is used to read the progress of the iterator + // and returns an iterate range that specifies the range of blocks to iterate over + CreateRange(IterateProgressReader) (IterateRange, error) } // IterateProgressReader reads the progress of the iterator, useful for resuming the iteration @@ -62,60 +60,7 @@ type BlockIterator interface { // so that it can be resumed later // when Checkpoint is called, if SaveStateFunc is called with block A, // then after restart, the iterator will resume from A. - // make sure to call this after all the jobs for processing the block IDs returned by + // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. Checkpoint() error } - -// IteratorCreator is an interface for creating block iterators -type IteratorCreator interface { - // CreateIterator takes iterate job which specifies the range of blocks to iterate over - // and a progress writer which is used to save the progress of the iterator, - // and returns a block iterator that can be used to iterate over the blocks - // Note: it's up to the implementation to decide how often the progress is saved, - // it is wise to consider the trade-off between the performance and the progress saving, - // if the progress is saved too often, it might impact the iteration performance, however, - // if the progress is only saved at the end of the iteration, then if the iteration - // was interrupted, then the iterator will start from the beginning of the range again, - // which means some blocks might be iterated multiple times. - CreateIterator(IterateRange, IterateProgressWriter) (BlockIterator, error) -} - -type IteratorFactory struct { - progressReader IterateProgressReader - progressWriter IterateProgressWriter - creator IteratorCreator - jobCreator IteratorJobCreator -} - -func NewIteratorFactory( - initializer IterateProgressInitializer, - creator IteratorCreator, - jobCreator IteratorJobCreator, -) (*IteratorFactory, error) { - progressReader, progressWriter, err := initializer.Init() - if err != nil { - return nil, fmt.Errorf("failed to initialize progress: %w", err) - } - - return &IteratorFactory{ - progressReader: progressReader, - progressWriter: progressWriter, - creator: creator, - jobCreator: jobCreator, - }, nil -} - -func (f *IteratorFactory) Create() (BlockIterator, error) { - job, err := f.jobCreator.CreateJob(f.progressReader) - if err != nil { - return nil, fmt.Errorf("failed to create job for block iteration: %w", err) - } - - iterator, err := f.creator.CreateIterator(job, f.progressWriter) - if err != nil { - return nil, fmt.Errorf("failed to create block iterator: %w", err) - } - - return iterator, nil -} diff --git a/module/block_iterator/create.go b/module/block_iterator/create.go new file mode 100644 index 00000000000..bff69700ddb --- /dev/null +++ b/module/block_iterator/create.go @@ -0,0 +1,31 @@ +package block_iterator + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func CreateIndexedBlockIterator( + getBlockIDByIndex func(uint64) (flow.Identifier, error), + progress storage.ConsumerProgress, + getRoot func() (uint64, error), + latest func() (uint64, error), +) (module.BlockIterator, error) { + + initializer := NewInitializer(progress, getRoot) + progressReader, progressWriter, err := initializer.Init() + if err != nil { + return nil, fmt.Errorf("failed to initialize progress: %w", err) + } + + rangeCreator := NewIteratorRangeCreator(latest) + iterRange, err := rangeCreator.CreateRange(progressReader) + if err != nil { + return nil, fmt.Errorf("failed to create range for block iteration: %w", err) + } + + return NewIndexedBlockIterator(getBlockIDByIndex, progressWriter, iterRange), nil +} diff --git a/module/block_iterator/factory.go b/module/block_iterator/factory.go deleted file mode 100644 index 08fd8ec6fb7..00000000000 --- a/module/block_iterator/factory.go +++ /dev/null @@ -1,25 +0,0 @@ -package block_iterator - -import ( - "context" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/module/block_iterator/height_based" - "github.com/onflow/flow-go/storage" -) - -func NewHeightIteratorFactory( - ctx context.Context, - headers storage.Headers, - progress storage.ConsumerProgress, - getRoot func() (*flow.Header, error), - latest func() (*flow.Header, error), -) (*module.IteratorFactory, error) { - - initializer := height_based.NewInitializer(progress, getRoot) - creator := height_based.NewHeightBasedIteratorCreator(ctx, headers) - jobCreator := height_based.NewHeightIteratorJobCreator(latest) - - return module.NewIteratorFactory(initializer, creator, jobCreator) -} diff --git a/module/block_iterator/height_based/creator.go b/module/block_iterator/height_based/creator.go deleted file mode 100644 index 75191ef0ec3..00000000000 --- a/module/block_iterator/height_based/creator.go +++ /dev/null @@ -1,29 +0,0 @@ -package height_based - -import ( - "context" - - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/storage" -) - -type HeightBasedIteratorCreator struct { - ctx context.Context - headers storage.Headers -} - -var _ module.IteratorCreator = (*HeightBasedIteratorCreator)(nil) - -func NewHeightBasedIteratorCreator( - ctx context.Context, // for cancelling the iteration - headers storage.Headers, // for looking up block by height -) *HeightBasedIteratorCreator { - return &HeightBasedIteratorCreator{ - ctx: ctx, - headers: headers, - } -} - -func (h *HeightBasedIteratorCreator) CreateIterator(job module.IterateJob, writer module.IterateProgressWriter) (module.BlockIterator, error) { - return NewHeightIterator(h.headers, writer, h.ctx, job) -} diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go deleted file mode 100644 index 9afb7bc0fc3..00000000000 --- a/module/block_iterator/height_based/iterator.go +++ /dev/null @@ -1,69 +0,0 @@ -package height_based - -import ( - "fmt" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module" -) - -// HeightIterator is a block iterator that iterates over blocks by height -// it's not concurrent safe, so don't use it in multiple goroutines -type HeightIterator struct { - // dependencies - getBlockIDByHeight func(uint64) (flow.Identifier, error) - progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration - - // config - endHeight uint64 - - // state - nextHeight uint64 -} - -var _ module.BlockIterator = (*HeightIterator)(nil) - -// caller must ensure that both job.Start and job.End are finalized height -func NewHeightIterator( - getBlockIDByHeight func(uint64) (flow.Identifier, error), - progress module.IterateProgressWriter, - job module.IterateRange, -) (module.BlockIterator, error) { - return &HeightIterator{ - getBlockIDByHeight: getBlockIDByHeight, - progress: progress, - endHeight: job.End, - nextHeight: job.Start, - }, nil -} - -// Next returns the next block ID in the iteration -// it iterates from lower height to higher height. -// when iterating a height, it iterates over all sibling blocks at that height -// Note: this method is not concurrent-safe -func (b *HeightIterator) Next() (flow.Identifier, bool, error) { - if b.nextHeight > b.endHeight { - return flow.ZeroID, false, nil - } - - // TODO: use storage operation instead to avoid hitting cache - next, err := b.getBlockIDByHeight(b.nextHeight) - if err != nil { - return flow.ZeroID, false, fmt.Errorf("failed to fetch block at height %v: %w", b.nextHeight, err) - } - - b.nextHeight++ - - return next, true, nil -} - -// Checkpoint saves the iteration progress to storage -// make sure to call this after all the jobs for processing the block IDs returned by -// Next() are completed. -func (b *HeightIterator) Checkpoint() error { - err := b.progress.SaveState(b.nextHeight) - if err != nil { - return fmt.Errorf("failed to save progress at view %v: %w", b.nextHeight, err) - } - return nil -} diff --git a/module/block_iterator/height_based/job.go b/module/block_iterator/height_based/job.go deleted file mode 100644 index da485ff2415..00000000000 --- a/module/block_iterator/height_based/job.go +++ /dev/null @@ -1,38 +0,0 @@ -package height_based - -import ( - "fmt" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module" -) - -type HeightIteratorJobCreator struct { - latest func() (*flow.Header, error) -} - -var _ module.IteratorJobCreator = (*HeightIteratorJobCreator)(nil) - -func NewHeightIteratorJobCreator(latest func() (*flow.Header, error)) *HeightIteratorJobCreator { - return &HeightIteratorJobCreator{ - latest: latest, - } -} - -func (h *HeightIteratorJobCreator) CreateJob(reader module.IterateProgressReader) (module.IterateJob, error) { - next, err := reader.ReadNext() - if err != nil { - return module.IterateJob{}, fmt.Errorf("failed to read next height: %w", err) - } - - latest, err := h.latest() - if err != nil { - return module.IterateJob{}, fmt.Errorf("failed to get latest block: %w", err) - } - - // iterate from next to latest (inclusive) - return module.IterateJob{ - Start: next, - End: latest.Height, - }, nil -} diff --git a/module/block_iterator/height_based/initializer.go b/module/block_iterator/initializer.go similarity index 53% rename from module/block_iterator/height_based/initializer.go rename to module/block_iterator/initializer.go index c8cac156616..b6c3a5d7681 100644 --- a/module/block_iterator/height_based/initializer.go +++ b/module/block_iterator/initializer.go @@ -1,30 +1,28 @@ -package height_based +package block_iterator import ( "errors" "fmt" - "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/module/block_iterator/common" "github.com/onflow/flow-go/storage" ) -type HeightProgressInitializer struct { +type ProgressInitializer struct { progress storage.ConsumerProgress - getRoot func() (*flow.Header, error) + getRoot func() (uint64, error) } -var _ module.IterateProgressInitializer = (*HeightProgressInitializer)(nil) +var _ module.IterateProgressInitializer = (*ProgressInitializer)(nil) -func NewInitializer(progress storage.ConsumerProgress, getRoot func() (*flow.Header, error)) *HeightProgressInitializer { - return &HeightProgressInitializer{ +func NewInitializer(progress storage.ConsumerProgress, getRoot func() (uint64, error)) *ProgressInitializer { + return &ProgressInitializer{ progress: progress, getRoot: getRoot, } } -func (h *HeightProgressInitializer) Init() (module.IterateProgressReader, module.IterateProgressWriter, error) { +func (h *ProgressInitializer) Init() (module.IterateProgressReader, module.IterateProgressWriter, error) { _, err := h.progress.ProcessedIndex() if errors.Is(err, storage.ErrNotFound) { root, err := h.getRoot() @@ -32,14 +30,14 @@ func (h *HeightProgressInitializer) Init() (module.IterateProgressReader, module return nil, nil, fmt.Errorf("failed to get root block: %w", err) } - next := root.Height + 1 + next := root + 1 err = h.progress.InitProcessedIndex(next) if err != nil { return nil, nil, fmt.Errorf("failed to init processed index: %w", err) } } - nextProgress := common.NewNextProgress(h.progress) + nextProgress := NewNextProgress(h.progress) return nextProgress, nextProgress, nil } diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go new file mode 100644 index 00000000000..66d2d0739bc --- /dev/null +++ b/module/block_iterator/iterator.go @@ -0,0 +1,68 @@ +package block_iterator + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" +) + +// IndexedBlockIterator is a block iterator that iterates over blocks by height +// it's not concurrent safe, so don't use it in multiple goroutines +type IndexedBlockIterator struct { + // dependencies + getBlockIDByIndex func(uint64) (flow.Identifier, error) + progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration + + // config + endIndex uint64 + + // state + nextIndex uint64 +} + +var _ module.BlockIterator = (*IndexedBlockIterator)(nil) + +// caller must ensure that both iterRange.Start and iterRange.End are finalized height +func NewIndexedBlockIterator( + getBlockIDByIndex func(uint64) (flow.Identifier, error), + progress module.IterateProgressWriter, + iterRange module.IterateRange, +) module.BlockIterator { + return &IndexedBlockIterator{ + getBlockIDByIndex: getBlockIDByIndex, + progress: progress, + endIndex: iterRange.End, + nextIndex: iterRange.Start, + } +} + +// Next returns the next block ID in the iteration +// it iterates from lower height to higher height. +// when iterating a height, it iterates over all sibling blocks at that height +// Note: this method is not concurrent-safe +func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { + if b.nextIndex > b.endIndex { + return flow.ZeroID, false, nil + } + + next, err := b.getBlockIDByIndex(b.nextIndex) + if err != nil { + return flow.ZeroID, false, fmt.Errorf("failed to fetch block at index (height or view) %v: %w", b.nextIndex, err) + } + + b.nextIndex++ + + return next, true, nil +} + +// Checkpoint saves the iteration progress to storage +// make sure to call this after all the blocks for processing the block IDs returned by +// Next() are completed. +func (b *IndexedBlockIterator) Checkpoint() error { + err := b.progress.SaveState(b.nextIndex) + if err != nil { + return fmt.Errorf("failed to save progress at view %v: %w", b.nextIndex, err) + } + return nil +} diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/iterator_test.go similarity index 92% rename from module/block_iterator/height_based/iterator_test.go rename to module/block_iterator/iterator_test.go index 362c4807c48..9069420c649 100644 --- a/module/block_iterator/height_based/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -1,4 +1,4 @@ -package height_based +package block_iterator import ( "fmt" @@ -32,10 +32,9 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 - job := module.IterateRange{Start: b1.Height, End: b3.Height} + iterRange := module.IterateRange{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) - iter, err := NewHeightIterator(headers.BlockIDByHeight, progress, job) - require.NoError(t, err) + iter := NewIndexedBlockIterator(headers.BlockIDByHeight, progress, iterRange) // iterate through all blocks visited := make(map[flow.Identifier]struct{}) diff --git a/module/block_iterator/common/progress.go b/module/block_iterator/progress.go similarity index 77% rename from module/block_iterator/common/progress.go rename to module/block_iterator/progress.go index f7b2585b29b..c9ee6788afd 100644 --- a/module/block_iterator/common/progress.go +++ b/module/block_iterator/progress.go @@ -1,4 +1,4 @@ -package common +package block_iterator import ( "github.com/onflow/flow-go/module" @@ -18,10 +18,10 @@ func NewNextProgress(store storage.ConsumerProgress) *NextProgress { } } -func (n *NextProgress) ReadNext() (uint64, error) { +func (n *NextProgress) LoadState() (uint64, error) { return n.store.ProcessedIndex() } -func (n *NextProgress) SaveNext(next uint64) error { +func (n *NextProgress) SaveState(next uint64) error { return n.store.SetProcessedIndex(next) } diff --git a/module/block_iterator/range.go b/module/block_iterator/range.go new file mode 100644 index 00000000000..05c69bc5be1 --- /dev/null +++ b/module/block_iterator/range.go @@ -0,0 +1,37 @@ +package block_iterator + +import ( + "fmt" + + "github.com/onflow/flow-go/module" +) + +type IteratorRangeCreator struct { + latest func() (uint64, error) +} + +var _ module.IteratorRangeCreator = (*IteratorRangeCreator)(nil) + +func NewIteratorRangeCreator(latest func() (uint64, error)) *IteratorRangeCreator { + return &IteratorRangeCreator{ + latest: latest, + } +} + +func (h *IteratorRangeCreator) CreateRange(reader module.IterateProgressReader) (module.IterateRange, error) { + next, err := reader.LoadState() + if err != nil { + return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) + } + + latest, err := h.latest() + if err != nil { + return module.IterateRange{}, fmt.Errorf("failed to get latest block: %w", err) + } + + // iterate from next to latest (inclusive) + return module.IterateRange{ + Start: next, + End: latest, + }, nil +} From 0141482a2da77f656a2fe0279067f73c6c51dff6 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 09:39:35 -0800 Subject: [PATCH 09/32] add view based and height based iterator --- module/block_iterator/create.go | 69 ++++++++++++++++++++++++++++++- module/block_iterator/iterator.go | 15 +++++-- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/module/block_iterator/create.go b/module/block_iterator/create.go index bff69700ddb..10bd66a1995 100644 --- a/module/block_iterator/create.go +++ b/module/block_iterator/create.go @@ -8,8 +8,9 @@ import ( "github.com/onflow/flow-go/storage" ) +// CreateIndexedBlockIterator creates a block iterator that iterates through blocks by index. func CreateIndexedBlockIterator( - getBlockIDByIndex func(uint64) (flow.Identifier, error), + getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), progress storage.ConsumerProgress, getRoot func() (uint64, error), latest func() (uint64, error), @@ -29,3 +30,69 @@ func CreateIndexedBlockIterator( return NewIndexedBlockIterator(getBlockIDByIndex, progressWriter, iterRange), nil } + +// CreateHeightBasedBlockIterator creates a block iterator that iterates through blocks +// from root to the latest (either finalized or sealed) by height. +func CreateHeightBasedBlockIterator( + getBlockIDByHeight func(height uint64) (flow.Identifier, error), + progress storage.ConsumerProgress, + getRoot func() (*flow.Header, error), + latest func() (*flow.Header, error), +) (module.BlockIterator, error) { + + return CreateIndexedBlockIterator( + func(height uint64) (flow.Identifier, bool, error) { + blockID, err := getBlockIDByHeight(height) + if err != nil { + return flow.Identifier{}, false, fmt.Errorf("failed to get block ID by height: %w", err) + } + // each height between root and latest (either finalized or sealed) must be indexed. + // so it's always true + alwaysIndexed := true + return blockID, alwaysIndexed, nil + }, + progress, + func() (uint64, error) { + root, err := getRoot() + if err != nil { + return 0, fmt.Errorf("failed to get root block: %w", err) + } + return root.Height, nil + }, + func() (uint64, error) { + latestBlock, err := latest() + if err != nil { + return 0, fmt.Errorf("failed to get latest block: %w", err) + } + return latestBlock.Height, nil + }, + ) +} + +// CreateViewBasedBlockIterator creates a block iterator that iterates through blocks +// from root to the latest (either finalized or sealed) by view. +func CreateViewBasedBlockIterator( + getBlockIDByView func(view uint64) (blockID flow.Identifier, viewIndexed bool, exception error), + progress storage.ConsumerProgress, + getRoot func() (*flow.Header, error), + latest func() (*flow.Header, error), +) (module.BlockIterator, error) { + return CreateIndexedBlockIterator( + getBlockIDByView, + progress, + func() (uint64, error) { + root, err := getRoot() + if err != nil { + return 0, fmt.Errorf("failed to get root block: %w", err) + } + return root.View, nil + }, + func() (uint64, error) { + latestBlock, err := latest() + if err != nil { + return 0, fmt.Errorf("failed to get latest block: %w", err) + } + return latestBlock.View, nil + }, + ) +} diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index 66d2d0739bc..945b84fb711 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -11,7 +11,7 @@ import ( // it's not concurrent safe, so don't use it in multiple goroutines type IndexedBlockIterator struct { // dependencies - getBlockIDByIndex func(uint64) (flow.Identifier, error) + getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error) progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration // config @@ -25,7 +25,7 @@ var _ module.BlockIterator = (*IndexedBlockIterator)(nil) // caller must ensure that both iterRange.Start and iterRange.End are finalized height func NewIndexedBlockIterator( - getBlockIDByIndex func(uint64) (flow.Identifier, error), + getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error), progress module.IterateProgressWriter, iterRange module.IterateRange, ) module.BlockIterator { @@ -46,11 +46,20 @@ func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { return flow.ZeroID, false, nil } - next, err := b.getBlockIDByIndex(b.nextIndex) + next, indexed, err := b.getBlockIDByIndex(b.nextIndex) if err != nil { return flow.ZeroID, false, fmt.Errorf("failed to fetch block at index (height or view) %v: %w", b.nextIndex, err) } + // if the block is not indexed, skip it. + // when we are iterating by view, it's possible that there is no block for certain views, in + // that case, we skip and iterate the next view + // when we are iterating by height, it's not possible that a height is not indexed, so indexed should + // always be true + if !indexed { + return flow.ZeroID, true, fmt.Errorf("block at index (height or view) %v is not indexed", b.nextIndex) + } + b.nextIndex++ return next, true, nil From f76ec144cfb59bd1c5ddaa215a379aa92096ed09 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 11:46:00 -0800 Subject: [PATCH 10/32] add test cases --- module/block_iterator/create.go | 12 ++- module/block_iterator/create_test.go | 117 +++++++++++++++++++++++++ module/block_iterator/iterator_test.go | 10 ++- 3 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 module/block_iterator/create_test.go diff --git a/module/block_iterator/create.go b/module/block_iterator/create.go index 10bd66a1995..2428a2df808 100644 --- a/module/block_iterator/create.go +++ b/module/block_iterator/create.go @@ -16,18 +16,22 @@ func CreateIndexedBlockIterator( latest func() (uint64, error), ) (module.BlockIterator, error) { - initializer := NewInitializer(progress, getRoot) - progressReader, progressWriter, err := initializer.Init() + // initialize the progress in storage, saving the root block index in storage + progressReader, progressWriter, err := NewInitializer(progress, getRoot).Init() if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } - rangeCreator := NewIteratorRangeCreator(latest) - iterRange, err := rangeCreator.CreateRange(progressReader) + // create a iteration range from the root block to the latest block + iterRange, err := NewIteratorRangeCreator(latest).CreateRange(progressReader) if err != nil { return nil, fmt.Errorf("failed to create range for block iteration: %w", err) } + // create a block iterator with + // the function to get block ID by index, + // the progress writer to update the progress in storage, + // and the iteration range return NewIndexedBlockIterator(getBlockIDByIndex, progressWriter, iterRange), nil } diff --git a/module/block_iterator/create_test.go b/module/block_iterator/create_test.go new file mode 100644 index 00000000000..37f24acd10c --- /dev/null +++ b/module/block_iterator/create_test.go @@ -0,0 +1,117 @@ +package block_iterator + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +// TestCanIterate: iterate through all heights from root to latest exactly once +// TestCanIterateNewLatest: iterate through all heights from root to latest, +// and after latest updated, it can iterate from latest + 1 to new latest +// TestCanResume: stop at a height, and take checkpoint, create a new iterator, +// verify it will resume from the next height to the latest +// TestIterateByView: iterate through all views +// TestCanSkipViewsIfNotIndexed: iterate through all views, and skip views that are not indexed + +func TestCanIterate(t *testing.T) { + root := &flow.Header{Height: 0} + // Create mock blocks + blocks := []*flow.Header{ + {Height: 1}, + {Height: 2}, + {Height: 3}, + {Height: 4}, + {Height: 5}, + } + + // Mock getBlockIDByHeight function + getBlockIDByHeight := func(height uint64) (flow.Identifier, error) { + for _, block := range blocks { + if block.Height == height { + return block.ID(), nil + } + } + return flow.Identifier{}, fmt.Errorf("block not found at height %d", height) + } + + // Mock progress tracker + progress := &mockProgress{} + + // Mock getRoot and latest functions + getRoot := func() (*flow.Header, error) { + return root, nil + } + latest := func() (*flow.Header, error) { + return blocks[len(blocks)-1], nil + } + + // Create iterator + iterator, err := CreateHeightBasedBlockIterator( + getBlockIDByHeight, + progress, + getRoot, + latest, + ) + require.NoError(t, err) + + // Iterate through blocks + visitedBlocks := make(map[flow.Identifier]int) + for { + blockID, ok, err := iterator.Next() + require.NoError(t, err) + if !ok { + break + } + visitedBlocks[blockID]++ + } + + // Verify all blocks were visited exactly once + for _, block := range blocks { + count, ok := visitedBlocks[block.ID()] + require.True(t, ok, "Block %v was not visited", block.Height) + require.Equal(t, 1, count, "Block %v was visited %d times, expected once", block.ID, count) + } + + // Verify no extra blocks were visited + require.Equal(t, len(blocks), len(visitedBlocks), "Unexpected number of blocks visited") + + // Verify the final checkpoint + require.NoError(t, iterator.Checkpoint()) + savedNextHeight, err := progress.ProcessedIndex() + require.NoError(t, err) + require.Equal(t, uint64(6), savedNextHeight, "Expected next height to be 6 (last height + 1)") +} + +type mockProgress struct { + height uint64 + initialized bool +} + +func (m *mockProgress) ProcessedIndex() (uint64, error) { + if !m.initialized { + return 0, fmt.Errorf("processed index not initialized: %w", storage.ErrNotFound) + } + return m.height, nil +} + +func (m *mockProgress) InitProcessedIndex(defaultIndex uint64) error { + if m.initialized { + return fmt.Errorf("processed index already initialized") + } + m.height = defaultIndex + m.initialized = true + return nil +} + +func (m *mockProgress) SetProcessedIndex(processed uint64) error { + if !m.initialized { + return fmt.Errorf("processed index not initialized") + } + m.height = processed + return nil +} diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index 9069420c649..fefd5c4875c 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -34,7 +34,15 @@ func TestIterateHeight(t *testing.T) { // b0 is the root block, iterate from b1 to b3 iterRange := module.IterateRange{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) - iter := NewIndexedBlockIterator(headers.BlockIDByHeight, progress, iterRange) + getBlockIDByIndex := func(height uint64) (flow.Identifier, bool, error) { + blockID, err := headers.BlockIDByHeight(height) + if err != nil { + return flow.ZeroID, false, err + } + + return blockID, true, nil + } + iter := NewIndexedBlockIterator(getBlockIDByIndex, progress, iterRange) // iterate through all blocks visited := make(map[flow.Identifier]struct{}) From fb93bd333607f6256993bb16b6aacea074d82a59 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 13:37:24 -0800 Subject: [PATCH 11/32] extract into creator --- module/block_iterator/create.go | 68 +++++++++++++++++---------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/module/block_iterator/create.go b/module/block_iterator/create.go index 2428a2df808..8f2ce4b3f1e 100644 --- a/module/block_iterator/create.go +++ b/module/block_iterator/create.go @@ -8,22 +8,37 @@ import ( "github.com/onflow/flow-go/storage" ) -// CreateIndexedBlockIterator creates a block iterator that iterates through blocks by index. -func CreateIndexedBlockIterator( +type Creator struct { + getBlockIDByIndex func(uint64) (flow.Identifier, bool, error) + progressReader module.IterateProgressReader + progressWriter module.IterateProgressWriter + latest func() (uint64, error) +} + +// NewCreator creates a block iterator that iterates through blocks by index. +func NewCreator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), progress storage.ConsumerProgress, - getRoot func() (uint64, error), + root uint64, latest func() (uint64, error), -) (module.BlockIterator, error) { - +) (*Creator, error) { // initialize the progress in storage, saving the root block index in storage - progressReader, progressWriter, err := NewInitializer(progress, getRoot).Init() + progressReader, progressWriter, err := NewInitializer(progress, root).Init() if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } + return &Creator{ + getBlockIDByIndex: getBlockIDByIndex, + progressReader: progressReader, + progressWriter: progressWriter, + latest: latest, + }, nil +} + +func (c *Creator) Create() (module.BlockIterator, error) { // create a iteration range from the root block to the latest block - iterRange, err := NewIteratorRangeCreator(latest).CreateRange(progressReader) + iterRange, err := NewIteratorRangeCreator(c.latest).CreateRange(c.progressReader) if err != nil { return nil, fmt.Errorf("failed to create range for block iteration: %w", err) } @@ -32,19 +47,19 @@ func CreateIndexedBlockIterator( // the function to get block ID by index, // the progress writer to update the progress in storage, // and the iteration range - return NewIndexedBlockIterator(getBlockIDByIndex, progressWriter, iterRange), nil + return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progressWriter, iterRange), nil } -// CreateHeightBasedBlockIterator creates a block iterator that iterates through blocks +// NewHeightBasedCreator creates a block iterator that iterates through blocks // from root to the latest (either finalized or sealed) by height. -func CreateHeightBasedBlockIterator( +func NewHeightBasedCreator( getBlockIDByHeight func(height uint64) (flow.Identifier, error), progress storage.ConsumerProgress, - getRoot func() (*flow.Header, error), + root *flow.Header, latest func() (*flow.Header, error), -) (module.BlockIterator, error) { +) (*Creator, error) { - return CreateIndexedBlockIterator( + return NewCreator( func(height uint64) (flow.Identifier, bool, error) { blockID, err := getBlockIDByHeight(height) if err != nil { @@ -56,13 +71,7 @@ func CreateHeightBasedBlockIterator( return blockID, alwaysIndexed, nil }, progress, - func() (uint64, error) { - root, err := getRoot() - if err != nil { - return 0, fmt.Errorf("failed to get root block: %w", err) - } - return root.Height, nil - }, + root.Height, func() (uint64, error) { latestBlock, err := latest() if err != nil { @@ -73,24 +82,19 @@ func CreateHeightBasedBlockIterator( ) } -// CreateViewBasedBlockIterator creates a block iterator that iterates through blocks +// NewViewBasedCreator creates a block iterator that iterates through blocks // from root to the latest (either finalized or sealed) by view. -func CreateViewBasedBlockIterator( +// since view has gaps, the iterator will skip views that have no blocks. +func NewViewBasedCreator( getBlockIDByView func(view uint64) (blockID flow.Identifier, viewIndexed bool, exception error), progress storage.ConsumerProgress, - getRoot func() (*flow.Header, error), + root *flow.Header, latest func() (*flow.Header, error), -) (module.BlockIterator, error) { - return CreateIndexedBlockIterator( +) (*Creator, error) { + return NewCreator( getBlockIDByView, progress, - func() (uint64, error) { - root, err := getRoot() - if err != nil { - return 0, fmt.Errorf("failed to get root block: %w", err) - } - return root.View, nil - }, + root.View, func() (uint64, error) { latestBlock, err := latest() if err != nil { From b3ae64bfec9d81f3382ac02006a9f91692bc8f63 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 14:00:47 -0800 Subject: [PATCH 12/32] rename to creator.go --- module/block_iterator/{create.go => creator.go} | 0 module/block_iterator/{create_test.go => creator_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename module/block_iterator/{create.go => creator.go} (100%) rename module/block_iterator/{create_test.go => creator_test.go} (100%) diff --git a/module/block_iterator/create.go b/module/block_iterator/creator.go similarity index 100% rename from module/block_iterator/create.go rename to module/block_iterator/creator.go diff --git a/module/block_iterator/create_test.go b/module/block_iterator/creator_test.go similarity index 100% rename from module/block_iterator/create_test.go rename to module/block_iterator/creator_test.go From 2481dfc3836e2e009a8f01437ebcba5ff5ea5804 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 14:01:03 -0800 Subject: [PATCH 13/32] add test cases --- module/block_iterator/creator_test.go | 259 ++++++++++++++++++++++++-- 1 file changed, 240 insertions(+), 19 deletions(-) diff --git a/module/block_iterator/creator_test.go b/module/block_iterator/creator_test.go index 37f24acd10c..30bf3aeac29 100644 --- a/module/block_iterator/creator_test.go +++ b/module/block_iterator/creator_test.go @@ -10,12 +10,11 @@ import ( "github.com/onflow/flow-go/storage" ) -// TestCanIterate: iterate through all heights from root to latest exactly once // TestCanIterateNewLatest: iterate through all heights from root to latest, -// and after latest updated, it can iterate from latest + 1 to new latest +// and after finish iteration, iterate again with an updated latest height, +// verify it can iterate from latest + 1 to new latest // TestCanResume: stop at a height, and take checkpoint, create a new iterator, // verify it will resume from the next height to the latest -// TestIterateByView: iterate through all views // TestCanSkipViewsIfNotIndexed: iterate through all views, and skip views that are not indexed func TestCanIterate(t *testing.T) { @@ -42,53 +41,275 @@ func TestCanIterate(t *testing.T) { // Mock progress tracker progress := &mockProgress{} - // Mock getRoot and latest functions - getRoot := func() (*flow.Header, error) { - return root, nil - } + // Mock latest functions latest := func() (*flow.Header, error) { return blocks[len(blocks)-1], nil } // Create iterator - iterator, err := CreateHeightBasedBlockIterator( + creator, err := NewHeightBasedCreator( getBlockIDByHeight, progress, - getRoot, + root, latest, ) require.NoError(t, err) + iterator, err := creator.Create() + require.NoError(t, err) + // Iterate through blocks - visitedBlocks := make(map[flow.Identifier]int) + visitedBlocks := make(map[flow.Identifier]struct{}) for { blockID, ok, err := iterator.Next() require.NoError(t, err) if !ok { break } - visitedBlocks[blockID]++ + visitedBlocks[blockID] = struct{}{} } // Verify all blocks were visited exactly once for _, block := range blocks { - count, ok := visitedBlocks[block.ID()] + _, ok := visitedBlocks[block.ID()] require.True(t, ok, "Block %v was not visited", block.Height) - require.Equal(t, 1, count, "Block %v was visited %d times, expected once", block.ID, count) + delete(visitedBlocks, block.ID()) } // Verify no extra blocks were visited - require.Equal(t, len(blocks), len(visitedBlocks), "Unexpected number of blocks visited") + require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") // Verify the final checkpoint - require.NoError(t, iterator.Checkpoint()) + next, err := iterator.Checkpoint() + require.NoError(t, err) + require.Equal(t, uint64(6), next, "Expected next height to be 6 (last height + 1)") savedNextHeight, err := progress.ProcessedIndex() require.NoError(t, err) require.Equal(t, uint64(6), savedNextHeight, "Expected next height to be 6 (last height + 1)") + + // Additional blocks to be added later + additionalBlocks := []*flow.Header{ + {Height: 6}, + {Height: 7}, + {Height: 8}, + } + + // Update blocks so that the latest block is updated, and getBlockIDByHeight + // will return the new blocks + blocks = append(blocks, additionalBlocks...) + + // Create another iterator + iterator, err = creator.Create() + require.NoError(t, err) + + // Iterate through initial blocks + for i := 0; i < len(additionalBlocks); i++ { + blockID, ok, err := iterator.Next() + require.NoError(t, err) + require.True(t, ok) + visitedBlocks[blockID] = struct{}{} + } + + // No more blocks to iterate + _, ok, err := iterator.Next() + require.NoError(t, err) + require.False(t, ok) + + // Verify all additional blocks were visited exactly once + for _, block := range additionalBlocks { + _, ok := visitedBlocks[block.ID()] + require.True(t, ok, "Block %v was not visited", block.Height) + delete(visitedBlocks, block.ID()) + } + + // Verify no extra blocks were visited + require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") + + // Verify the final checkpoint + next, err = iterator.Checkpoint() + require.NoError(t, err) + require.Equal(t, uint64(9), next, "Expected next height to be 9 (last height + 1)") + savedHeight, err := progress.ProcessedIndex() + require.NoError(t, err) + require.Equal(t, uint64(9), savedHeight, "Expected next height to be 9 (last height + 1)") +} + +func TestCanResume(t *testing.T) { + + root := &flow.Header{Height: 0} + // Create mock blocks + blocks := []*flow.Header{ + {Height: 1}, + {Height: 2}, + {Height: 3}, + {Height: 4}, + {Height: 5}, + } + + // Mock getBlockIDByHeight function + getBlockIDByHeight := func(height uint64) (flow.Identifier, error) { + for _, block := range blocks { + if block.Height == height { + return block.ID(), nil + } + } + return flow.Identifier{}, fmt.Errorf("block not found at height %d", height) + } + + // Mock progress tracker + progress := &mockProgress{} + + // Mock latest functions + latest := func() (*flow.Header, error) { + return blocks[len(blocks)-1], nil + } + + // Create iterator + creator, err := NewHeightBasedCreator( + getBlockIDByHeight, + progress, + root, + latest, + ) + require.NoError(t, err) + + iterator, err := creator.Create() + require.NoError(t, err) + + // Iterate through blocks + visitedBlocks := make(map[flow.Identifier]struct{}) + for i := 0; i < 3; i++ { // iterate up to Height 3 + blockID, ok, err := iterator.Next() + require.NoError(t, err) + if !ok { + break + } + visitedBlocks[blockID] = struct{}{} + } + + // save the progress + _, err = iterator.Checkpoint() + require.NoError(t, err) + + // Additional blocks to be added later + additionalBlocks := []*flow.Header{ + {Height: 6}, + {Height: 7}, + {Height: 8}, + } + + // Update blocks so that the latest block is updated, and getBlockIDByHeight + // will return the new blocks + blocks = append(blocks, additionalBlocks...) + + // simulate a restart by creating a new creator with a different latest + newCreator, err := NewHeightBasedCreator( + getBlockIDByHeight, + progress, + root, + latest, + ) + require.NoError(t, err) + + newIterator, err := newCreator.Create() + require.NoError(t, err) + + // iterate until the end + for { + blockID, ok, err := newIterator.Next() + require.NoError(t, err) + if !ok { + break + } + + visitedBlocks[blockID] = struct{}{} + } + + // verify all blocks are visited + for _, block := range blocks { + _, ok := visitedBlocks[block.ID()] + require.True(t, ok, "Block %v was not visited", block.Height) + delete(visitedBlocks, block.ID()) + } + + // Verify no extra blocks were visited + require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") +} + +func TestCanSkipViewsIfNotIndexed(t *testing.T) { + // Create mock blocks with some indexed and some not + blocks := []*flow.Header{ + {View: 1}, + {View: 2}, + {View: 3}, + {View: 5}, + {View: 7}, + } + + // Mock getBlockIDByHeight function + getBlockIDByView := func(view uint64) (blockID flow.Identifier, viewIndexed bool, exception error) { + for _, block := range blocks { + if block.View == view { + return block.ID(), true, nil + } + } + + return flow.Identifier{}, false, nil + } + + // Mock progress tracker + progress := &mockProgress{} + + // Mock getRoot and latest functions + root := &flow.Header{View: 0} + latest := func() (*flow.Header, error) { + return blocks[len(blocks)-1], nil + } + + // Create iterator + creator, err := NewViewBasedCreator( + getBlockIDByView, + progress, + root, + latest, + ) + require.NoError(t, err) + + iterator, err := creator.Create() + require.NoError(t, err) + + // Iterate through blocks + visitedBlocks := make(map[flow.Identifier]struct{}) + for { + blockID, ok, err := iterator.Next() + require.NoError(t, err) + if !ok { + break + } + visitedBlocks[blockID] = struct{}{} + } + + // Verify all blocks were visited exactly once + for _, block := range blocks { + _, ok := visitedBlocks[block.ID()] + require.True(t, ok, "Block %v was not visited", block.View) + delete(visitedBlocks, block.ID()) + } + + // Verify no extra blocks were visited + require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") + + // Verify the final checkpoint + next, err := iterator.Checkpoint() + require.NoError(t, err) + require.Equal(t, uint64(8), next, "Expected next height to be 8 (last height + 1)") + savedView, err := progress.ProcessedIndex() + require.NoError(t, err) + require.Equal(t, uint64(8), savedView, "Expected next view to be 8 (last View + 1)") } type mockProgress struct { - height uint64 + index uint64 initialized bool } @@ -96,14 +317,14 @@ func (m *mockProgress) ProcessedIndex() (uint64, error) { if !m.initialized { return 0, fmt.Errorf("processed index not initialized: %w", storage.ErrNotFound) } - return m.height, nil + return m.index, nil } func (m *mockProgress) InitProcessedIndex(defaultIndex uint64) error { if m.initialized { return fmt.Errorf("processed index already initialized") } - m.height = defaultIndex + m.index = defaultIndex m.initialized = true return nil } @@ -112,6 +333,6 @@ func (m *mockProgress) SetProcessedIndex(processed uint64) error { if !m.initialized { return fmt.Errorf("processed index not initialized") } - m.height = processed + m.index = processed return nil } From 748f5a3622dd5c2de9a40d9a242ea7840f2f5c8f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 14:01:30 -0800 Subject: [PATCH 14/32] update initializer to take root --- module/block_iterator/initializer.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/module/block_iterator/initializer.go b/module/block_iterator/initializer.go index b6c3a5d7681..5255b2d4e12 100644 --- a/module/block_iterator/initializer.go +++ b/module/block_iterator/initializer.go @@ -10,27 +10,22 @@ import ( type ProgressInitializer struct { progress storage.ConsumerProgress - getRoot func() (uint64, error) + root uint64 } var _ module.IterateProgressInitializer = (*ProgressInitializer)(nil) -func NewInitializer(progress storage.ConsumerProgress, getRoot func() (uint64, error)) *ProgressInitializer { +func NewInitializer(progress storage.ConsumerProgress, root uint64) *ProgressInitializer { return &ProgressInitializer{ progress: progress, - getRoot: getRoot, + root: root, } } func (h *ProgressInitializer) Init() (module.IterateProgressReader, module.IterateProgressWriter, error) { _, err := h.progress.ProcessedIndex() if errors.Is(err, storage.ErrNotFound) { - root, err := h.getRoot() - if err != nil { - return nil, nil, fmt.Errorf("failed to get root block: %w", err) - } - - next := root + 1 + next := h.root + 1 err = h.progress.InitProcessedIndex(next) if err != nil { return nil, nil, fmt.Errorf("failed to init processed index: %w", err) From 17f8bb33d8bfe47db3909e2278fe7072f3fee184 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 14:01:40 -0800 Subject: [PATCH 15/32] refactor Checkpoint method --- module/block_iterator.go | 2 +- module/block_iterator/iterator.go | 10 ++++++---- module/block_iterator/iterator_test.go | 5 +++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 592829626f0..0a20eb5edfe 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -62,5 +62,5 @@ type BlockIterator interface { // then after restart, the iterator will resume from A. // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. - Checkpoint() error + Checkpoint() (uint64, error) } diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index 945b84fb711..26102b108e5 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -57,7 +57,9 @@ func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { // when we are iterating by height, it's not possible that a height is not indexed, so indexed should // always be true if !indexed { - return flow.ZeroID, true, fmt.Errorf("block at index (height or view) %v is not indexed", b.nextIndex) + // iterate next height + b.nextIndex++ + return b.Next() } b.nextIndex++ @@ -68,10 +70,10 @@ func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { // Checkpoint saves the iteration progress to storage // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. -func (b *IndexedBlockIterator) Checkpoint() error { +func (b *IndexedBlockIterator) Checkpoint() (uint64, error) { err := b.progress.SaveState(b.nextIndex) if err != nil { - return fmt.Errorf("failed to save progress at view %v: %w", b.nextIndex, err) + return 0, fmt.Errorf("failed to save progress at view %v: %w", b.nextIndex, err) } - return nil + return b.nextIndex, nil } diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index fefd5c4875c..14f7b3c489c 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -69,8 +69,9 @@ func TestIterateHeight(t *testing.T) { require.Empty(t, visited) // save the next to iterate height and verify - - require.NoError(t, iter.Checkpoint()) + next, err := iter.Checkpoint() + require.NoError(t, err) + require.Equal(t, b3.Height+1, next) savedNextHeight, err := progress.LoadState() require.NoError(t, err) From daff4a181659455ef9e95d60a8bfcfc8352a183c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 20:14:58 -0800 Subject: [PATCH 16/32] refactor initializer and range --- module/block_iterator.go | 14 ---------- module/block_iterator/creator.go | 4 +-- module/block_iterator/initialize.go | 24 ++++++++++++++++ module/block_iterator/initializer.go | 38 -------------------------- module/block_iterator/iterator_test.go | 19 +++++-------- module/block_iterator/range.go | 16 ++--------- 6 files changed, 35 insertions(+), 80 deletions(-) create mode 100644 module/block_iterator/initialize.go delete mode 100644 module/block_iterator/initializer.go diff --git a/module/block_iterator.go b/module/block_iterator.go index 0a20eb5edfe..292820a5934 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -13,13 +13,6 @@ type IterateRange struct { End uint64 // the end of the range } -// IterateRangeCreator is an interface for creating iterate ranges -type IteratorRangeCreator interface { - // CreateRange takes a progress reader which is used to read the progress of the iterator - // and returns an iterate range that specifies the range of blocks to iterate over - CreateRange(IterateProgressReader) (IterateRange, error) -} - // IterateProgressReader reads the progress of the iterator, useful for resuming the iteration // after restart type IterateProgressReader interface { @@ -35,13 +28,6 @@ type IterateProgressWriter interface { SaveState(uint64) error } -// IterateProgressInitializer is an interface for initializing the progress of the iterator -// a initializer must be used to ensures the initial next block to be iterated is saved in -// storage before creating the block iterator -type IterateProgressInitializer interface { - Init() (IterateProgressReader, IterateProgressWriter, error) -} - // BlockIterator is an interface for iterating over blocks type BlockIterator interface { // Next returns the next block in the iterator diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 8f2ce4b3f1e..1594dda3cbd 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -23,7 +23,7 @@ func NewCreator( latest func() (uint64, error), ) (*Creator, error) { // initialize the progress in storage, saving the root block index in storage - progressReader, progressWriter, err := NewInitializer(progress, root).Init() + progressReader, progressWriter, err := InitializeProgress(progress, root) if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } @@ -38,7 +38,7 @@ func NewCreator( func (c *Creator) Create() (module.BlockIterator, error) { // create a iteration range from the root block to the latest block - iterRange, err := NewIteratorRangeCreator(c.latest).CreateRange(c.progressReader) + iterRange, err := CreateRange(c.progressReader, c.latest) if err != nil { return nil, fmt.Errorf("failed to create range for block iteration: %w", err) } diff --git a/module/block_iterator/initialize.go b/module/block_iterator/initialize.go new file mode 100644 index 00000000000..95efcf8a816 --- /dev/null +++ b/module/block_iterator/initialize.go @@ -0,0 +1,24 @@ +package block_iterator + +import ( + "errors" + "fmt" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func InitializeProgress(progress storage.ConsumerProgress, root uint64) (module.IterateProgressReader, module.IterateProgressWriter, error) { + _, err := progress.ProcessedIndex() + if errors.Is(err, storage.ErrNotFound) { + next := root + 1 + err = progress.InitProcessedIndex(next) + if err != nil { + return nil, nil, fmt.Errorf("failed to init processed index: %w", err) + } + } + + nextProgress := NewNextProgress(progress) + + return nextProgress, nextProgress, nil +} diff --git a/module/block_iterator/initializer.go b/module/block_iterator/initializer.go deleted file mode 100644 index 5255b2d4e12..00000000000 --- a/module/block_iterator/initializer.go +++ /dev/null @@ -1,38 +0,0 @@ -package block_iterator - -import ( - "errors" - "fmt" - - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/storage" -) - -type ProgressInitializer struct { - progress storage.ConsumerProgress - root uint64 -} - -var _ module.IterateProgressInitializer = (*ProgressInitializer)(nil) - -func NewInitializer(progress storage.ConsumerProgress, root uint64) *ProgressInitializer { - return &ProgressInitializer{ - progress: progress, - root: root, - } -} - -func (h *ProgressInitializer) Init() (module.IterateProgressReader, module.IterateProgressWriter, error) { - _, err := h.progress.ProcessedIndex() - if errors.Is(err, storage.ErrNotFound) { - next := h.root + 1 - err = h.progress.InitProcessedIndex(next) - if err != nil { - return nil, nil, fmt.Errorf("failed to init processed index: %w", err) - } - } - - nextProgress := NewNextProgress(h.progress) - - return nextProgress, nextProgress, nil -} diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index 14f7b3c489c..e6736c7c4ee 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -45,7 +45,7 @@ func TestIterateHeight(t *testing.T) { iter := NewIndexedBlockIterator(getBlockIDByIndex, progress, iterRange) // iterate through all blocks - visited := make(map[flow.Identifier]struct{}) + visited := make([]flow.Identifier, 0, len(bs)) for { id, ok, err := iter.Next() require.NoError(t, err) @@ -53,20 +53,15 @@ func TestIterateHeight(t *testing.T) { break } - // preventing duplicate visit - _, ok = visited[id] - require.False(t, ok, fmt.Sprintf("block %v is visited twice", id)) - - visited[id] = struct{}{} + visited = append(visited, id) } - // verify all blocks are visited - for _, b := range bs { - _, ok := visited[b.ID()] - require.True(t, ok, fmt.Sprintf("block %v is not visited", b.ID())) - delete(visited, b.ID()) + // verify all blocks are visited in the same order + for i, b := range bs { + require.Equal(t, b.ID(), visited[i]) } - require.Empty(t, visited) + + require.Equal(t, len(bs), len(visited)) // save the next to iterate height and verify next, err := iter.Checkpoint() diff --git a/module/block_iterator/range.go b/module/block_iterator/range.go index 05c69bc5be1..3b7d5933f3b 100644 --- a/module/block_iterator/range.go +++ b/module/block_iterator/range.go @@ -6,25 +6,13 @@ import ( "github.com/onflow/flow-go/module" ) -type IteratorRangeCreator struct { - latest func() (uint64, error) -} - -var _ module.IteratorRangeCreator = (*IteratorRangeCreator)(nil) - -func NewIteratorRangeCreator(latest func() (uint64, error)) *IteratorRangeCreator { - return &IteratorRangeCreator{ - latest: latest, - } -} - -func (h *IteratorRangeCreator) CreateRange(reader module.IterateProgressReader) (module.IterateRange, error) { +func CreateRange(reader module.IterateProgressReader, getLatest func() (uint64, error)) (module.IterateRange, error) { next, err := reader.LoadState() if err != nil { return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) } - latest, err := h.latest() + latest, err := getLatest() if err != nil { return module.IterateRange{}, fmt.Errorf("failed to get latest block: %w", err) } From 5ef8b183c4167074eecbc3fe8dfff36d63d0c16e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 20:20:23 -0800 Subject: [PATCH 17/32] refactor tests --- module/block_iterator/creator_test.go | 38 ++++++++++++--------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/module/block_iterator/creator_test.go b/module/block_iterator/creator_test.go index 30bf3aeac29..dc03b09e08a 100644 --- a/module/block_iterator/creator_test.go +++ b/module/block_iterator/creator_test.go @@ -59,25 +59,24 @@ func TestCanIterate(t *testing.T) { require.NoError(t, err) // Iterate through blocks - visitedBlocks := make(map[flow.Identifier]struct{}) + visitedBlocks := make([]flow.Identifier, 0, len(blocks)) for { blockID, ok, err := iterator.Next() require.NoError(t, err) if !ok { break } - visitedBlocks[blockID] = struct{}{} + + visitedBlocks = append(visitedBlocks, blockID) } // Verify all blocks were visited exactly once - for _, block := range blocks { - _, ok := visitedBlocks[block.ID()] - require.True(t, ok, "Block %v was not visited", block.Height) - delete(visitedBlocks, block.ID()) + for i, block := range blocks { + require.Equal(t, block.ID(), visitedBlocks[i], "Block %v was not visited", block.Height) } // Verify no extra blocks were visited - require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") + require.Equal(t, len(blocks), len(visitedBlocks), "Unexpected number of blocks visited") // Verify the final checkpoint next, err := iterator.Checkpoint() @@ -93,6 +92,7 @@ func TestCanIterate(t *testing.T) { {Height: 7}, {Height: 8}, } + visitedBlocks = make([]flow.Identifier, 0, len(additionalBlocks)) // Update blocks so that the latest block is updated, and getBlockIDByHeight // will return the new blocks @@ -107,7 +107,7 @@ func TestCanIterate(t *testing.T) { blockID, ok, err := iterator.Next() require.NoError(t, err) require.True(t, ok) - visitedBlocks[blockID] = struct{}{} + visitedBlocks = append(visitedBlocks, blockID) } // No more blocks to iterate @@ -116,14 +116,12 @@ func TestCanIterate(t *testing.T) { require.False(t, ok) // Verify all additional blocks were visited exactly once - for _, block := range additionalBlocks { - _, ok := visitedBlocks[block.ID()] - require.True(t, ok, "Block %v was not visited", block.Height) - delete(visitedBlocks, block.ID()) + for i, block := range additionalBlocks { + require.Equal(t, block.ID(), visitedBlocks[i], "Block %v was not visited", block.Height) } // Verify no extra blocks were visited - require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") + require.Equal(t, len(additionalBlocks), len(visitedBlocks), "Unexpected number of blocks visited") // Verify the final checkpoint next, err = iterator.Checkpoint() @@ -177,14 +175,14 @@ func TestCanResume(t *testing.T) { require.NoError(t, err) // Iterate through blocks - visitedBlocks := make(map[flow.Identifier]struct{}) + visitedBlocks := make([]flow.Identifier, 0, len(blocks)) for i := 0; i < 3; i++ { // iterate up to Height 3 blockID, ok, err := iterator.Next() require.NoError(t, err) if !ok { break } - visitedBlocks[blockID] = struct{}{} + visitedBlocks = append(visitedBlocks, blockID) } // save the progress @@ -222,18 +220,16 @@ func TestCanResume(t *testing.T) { break } - visitedBlocks[blockID] = struct{}{} + visitedBlocks = append(visitedBlocks, blockID) } // verify all blocks are visited - for _, block := range blocks { - _, ok := visitedBlocks[block.ID()] - require.True(t, ok, "Block %v was not visited", block.Height) - delete(visitedBlocks, block.ID()) + for i, block := range blocks { + require.Equal(t, block.ID(), visitedBlocks[i], "Block %v was not visited", block.Height) } // Verify no extra blocks were visited - require.Empty(t, visitedBlocks, "Unexpected number of blocks visited") + require.Equal(t, len(blocks), len(visitedBlocks), "Unexpected number of blocks visited") } func TestCanSkipViewsIfNotIndexed(t *testing.T) { From 2d3f4517318302e0dcab2176a0bb4de6c1120688 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 20:21:05 -0800 Subject: [PATCH 18/32] fix comments --- module/block_iterator/iterator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index 26102b108e5..6491648e9c8 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -39,7 +39,6 @@ func NewIndexedBlockIterator( // Next returns the next block ID in the iteration // it iterates from lower height to higher height. -// when iterating a height, it iterates over all sibling blocks at that height // Note: this method is not concurrent-safe func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { if b.nextIndex > b.endIndex { From 0e9cc43f857d0a446929c28ace8c968e924a4b37 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 23 Jan 2025 20:25:37 -0800 Subject: [PATCH 19/32] add Creator interface --- module/block_iterator.go | 8 ++++++++ module/block_iterator/creator.go | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/module/block_iterator.go b/module/block_iterator.go index 292820a5934..efa657e9c64 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -50,3 +50,11 @@ type BlockIterator interface { // Next() are completed. Checkpoint() (uint64, error) } + +// Creator creates block iterators. +// a block iterator iterates through a saved index to the latest block. +// after iterating through all the blocks in the range, the iterator can be discarded. +// a new block iterator can be created to iterate through the next range. +type Creator interface { + Create() (BlockIterator, error) +} diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 1594dda3cbd..59c0a7bef61 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -8,6 +8,10 @@ import ( "github.com/onflow/flow-go/storage" ) +// Creator creates block iterators. +// a block iterator iterates through a saved index to the latest block. +// after iterating through all the blocks in the range, the iterator can be discarded. +// a new block iterator can be created to iterate through the next range. type Creator struct { getBlockIDByIndex func(uint64) (flow.Identifier, bool, error) progressReader module.IterateProgressReader @@ -15,6 +19,8 @@ type Creator struct { latest func() (uint64, error) } +var _ module.Creator = (*Creator)(nil) + // NewCreator creates a block iterator that iterates through blocks by index. func NewCreator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), From 6fe4ab9db0aa57bb51a2bef465d8e207f0ac5b1a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 24 Jan 2025 09:09:41 -0800 Subject: [PATCH 20/32] rename to IteratorCreator --- module/block_iterator.go | 4 ++-- module/block_iterator/creator.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index efa657e9c64..249057e2fb7 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -51,10 +51,10 @@ type BlockIterator interface { Checkpoint() (uint64, error) } -// Creator creates block iterators. +// IteratorCreator creates block iterators. // a block iterator iterates through a saved index to the latest block. // after iterating through all the blocks in the range, the iterator can be discarded. // a new block iterator can be created to iterate through the next range. -type Creator interface { +type IteratorCreator interface { Create() (BlockIterator, error) } diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 59c0a7bef61..0e566e47cd5 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -19,7 +19,7 @@ type Creator struct { latest func() (uint64, error) } -var _ module.Creator = (*Creator)(nil) +var _ module.IteratorCreator = (*Creator)(nil) // NewCreator creates a block iterator that iterates through blocks by index. func NewCreator( From 0c47500a9303bdae56ea95cd53ff2a6e6662390d Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 24 Jan 2025 09:12:43 -0800 Subject: [PATCH 21/32] add IterateProgress --- module/block_iterator.go | 6 ++++++ module/block_iterator/creator.go | 14 ++++++-------- module/block_iterator/initialize.go | 6 +++--- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 249057e2fb7..8fd53772fc3 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -13,6 +13,12 @@ type IterateRange struct { End uint64 // the end of the range } +// IterateProgress is an interface for reading and writing the progress of the iterator +type IterateProgress interface { + IterateProgressReader + IterateProgressWriter +} + // IterateProgressReader reads the progress of the iterator, useful for resuming the iteration // after restart type IterateProgressReader interface { diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 0e566e47cd5..334dccd7823 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -14,8 +14,7 @@ import ( // a new block iterator can be created to iterate through the next range. type Creator struct { getBlockIDByIndex func(uint64) (flow.Identifier, bool, error) - progressReader module.IterateProgressReader - progressWriter module.IterateProgressWriter + progress module.IterateProgress latest func() (uint64, error) } @@ -24,27 +23,26 @@ var _ module.IteratorCreator = (*Creator)(nil) // NewCreator creates a block iterator that iterates through blocks by index. func NewCreator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), - progress storage.ConsumerProgress, + progressStorage storage.ConsumerProgress, root uint64, latest func() (uint64, error), ) (*Creator, error) { // initialize the progress in storage, saving the root block index in storage - progressReader, progressWriter, err := InitializeProgress(progress, root) + progress, err := InitializeProgress(progressStorage, root) if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } return &Creator{ getBlockIDByIndex: getBlockIDByIndex, - progressReader: progressReader, - progressWriter: progressWriter, + progress: progress, latest: latest, }, nil } func (c *Creator) Create() (module.BlockIterator, error) { // create a iteration range from the root block to the latest block - iterRange, err := CreateRange(c.progressReader, c.latest) + iterRange, err := CreateRange(c.progress, c.latest) if err != nil { return nil, fmt.Errorf("failed to create range for block iteration: %w", err) } @@ -53,7 +51,7 @@ func (c *Creator) Create() (module.BlockIterator, error) { // the function to get block ID by index, // the progress writer to update the progress in storage, // and the iteration range - return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progressWriter, iterRange), nil + return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), nil } // NewHeightBasedCreator creates a block iterator that iterates through blocks diff --git a/module/block_iterator/initialize.go b/module/block_iterator/initialize.go index 95efcf8a816..f889433b548 100644 --- a/module/block_iterator/initialize.go +++ b/module/block_iterator/initialize.go @@ -8,17 +8,17 @@ import ( "github.com/onflow/flow-go/storage" ) -func InitializeProgress(progress storage.ConsumerProgress, root uint64) (module.IterateProgressReader, module.IterateProgressWriter, error) { +func InitializeProgress(progress storage.ConsumerProgress, root uint64) (module.IterateProgress, error) { _, err := progress.ProcessedIndex() if errors.Is(err, storage.ErrNotFound) { next := root + 1 err = progress.InitProcessedIndex(next) if err != nil { - return nil, nil, fmt.Errorf("failed to init processed index: %w", err) + return nil, fmt.Errorf("failed to init processed index: %w", err) } } nextProgress := NewNextProgress(progress) - return nextProgress, nextProgress, nil + return nextProgress, nil } From abc85077851cfc87fdd527f862bd96fccb385089 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 24 Jan 2025 09:20:13 -0800 Subject: [PATCH 22/32] add comments --- module/block_iterator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 8fd53772fc3..e31de263f14 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -25,13 +25,13 @@ type IterateProgressReader interface { // LoadState reads the next block to iterate // caller must ensure the reader is created by the IterateProgressInitializer, // otherwise LoadState would return exception. - LoadState() (uint64, error) + LoadState() (progress uint64, exception error) } // IterateProgressWriter saves the progress of the iterator type IterateProgressWriter interface { // SaveState persists the next block to be iterated - SaveState(uint64) error + SaveState(uint64) (exception error) } // BlockIterator is an interface for iterating over blocks @@ -48,13 +48,13 @@ type BlockIterator interface { // for blockID, err := range heightIterator.Range() Next() (blockID flow.Identifier, hasNext bool, exception error) - // Checkpoint saves the current state of the iterator - // so that it can be resumed later + // Checkpoint saves the current state of the iterator so that it can be resumed later // when Checkpoint is called, if SaveStateFunc is called with block A, // then after restart, the iterator will resume from A. // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. - Checkpoint() (uint64, error) + // It returns the saved index (next index to iterate), and error returned are exceptions + Checkpoint() (savedProgress uint64, exception error) } // IteratorCreator creates block iterators. @@ -62,5 +62,5 @@ type BlockIterator interface { // after iterating through all the blocks in the range, the iterator can be discarded. // a new block iterator can be created to iterate through the next range. type IteratorCreator interface { - Create() (BlockIterator, error) + Create() (fromSavedIndexToLatest BlockIterator, exception error) } From f7157f2f65711fbfd13901d60b39d94c30d69cc7 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 24 Jan 2025 09:26:33 -0800 Subject: [PATCH 23/32] update Checkpoint --- module/block_iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index e31de263f14..5a2db6923af 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -54,7 +54,7 @@ type BlockIterator interface { // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. // It returns the saved index (next index to iterate), and error returned are exceptions - Checkpoint() (savedProgress uint64, exception error) + Checkpoint() (savedIndex uint64, exception error) } // IteratorCreator creates block iterators. From d55bef81580c0dc399228923a4c0a6f63ffcbeb0 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 24 Jan 2025 09:34:12 -0800 Subject: [PATCH 24/32] update iterator comments --- module/block_iterator/iterator.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index 6491648e9c8..a291074f0f3 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -7,12 +7,15 @@ import ( "github.com/onflow/flow-go/module" ) -// IndexedBlockIterator is a block iterator that iterates over blocks by height +// IndexedBlockIterator is a block iterator that iterates over blocks by height or view +// when index is height, it iterates from lower height to higher height +// when index is view, it iterates from lower view to higher view +// caller must ensure that the range is finalized, otherwise the iteration might miss some blocks // it's not concurrent safe, so don't use it in multiple goroutines type IndexedBlockIterator struct { // dependencies getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error) - progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration + progress module.IterateProgressWriter // for saving the next index to be iterated for resuming the iteration // config endIndex uint64 @@ -23,7 +26,7 @@ type IndexedBlockIterator struct { var _ module.BlockIterator = (*IndexedBlockIterator)(nil) -// caller must ensure that both iterRange.Start and iterRange.End are finalized height +// caller must ensure that both iterRange.Start and iterRange.End are finalized func NewIndexedBlockIterator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error), progress module.IterateProgressWriter, @@ -38,7 +41,7 @@ func NewIndexedBlockIterator( } // Next returns the next block ID in the iteration -// it iterates from lower height to higher height. +// it iterates from lower index to higher index. // Note: this method is not concurrent-safe func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { if b.nextIndex > b.endIndex { @@ -50,13 +53,10 @@ func (b *IndexedBlockIterator) Next() (flow.Identifier, bool, error) { return flow.ZeroID, false, fmt.Errorf("failed to fetch block at index (height or view) %v: %w", b.nextIndex, err) } - // if the block is not indexed, skip it. - // when we are iterating by view, it's possible that there is no block for certain views, in - // that case, we skip and iterate the next view - // when we are iterating by height, it's not possible that a height is not indexed, so indexed should - // always be true + // if the block is not indexed, skip it. This is only possible when we are iterating by view. + // when iterating by height, all blocks should be indexed, so `indexed` should always be true. if !indexed { - // iterate next height + // if the view is not indexed, then iterate next view b.nextIndex++ return b.Next() } From 415d1a7b2678336595178a8b6c2c75b0172e4e36 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 09:24:00 -0800 Subject: [PATCH 25/32] add executor --- module/block_iterator/executor/executor.go | 107 +++++++++ .../block_iterator/executor/executor_test.go | 206 ++++++++++++++++++ 2 files changed, 313 insertions(+) create mode 100644 module/block_iterator/executor/executor.go create mode 100644 module/block_iterator/executor/executor_test.go diff --git a/module/block_iterator/executor/executor.go b/module/block_iterator/executor/executor.go new file mode 100644 index 00000000000..2ec1cf7cabe --- /dev/null +++ b/module/block_iterator/executor/executor.go @@ -0,0 +1,107 @@ +package executor + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +// IterationExecutor allows the caller to customize the task to be executed for each block. +// for instance, the executor can prune data indexed by the block, or build another index for +// each iterated block. +type IterationExecutor interface { + // ExecuteByBlockID executes the task for the block indexed by the blockID + ExecuteByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) (exception error) +} + +// Sleeper allows the caller to slow down the iteration after each batch is committed +type Sleeper func() + +// IsBatchFull decides the batch size for each commit. +// it takes the number of blocks iterated in the current batch, +// and returns whether the batch is full. +type IsBatchFull func(iteratedCountInCurrentBatch int) bool + +// IterateExecuteAndCommitInBatch iterates over blocks and execute tasks with data that was indexed by the block. +// the update to the storage database is done in batch, and the batch is committed when it's full. +// the iteration progress is saved after batch is committed, so that the iteration progress +// can be resumed after restart. +// it sleeps after each batch is committed in order to minimizing the impact on the system. +func IterateExecuteAndCommitInBatch( + // iterator decides how to iterate over blocks + iter module.BlockIterator, + // executor decides what data in the storage will be updated for a certain block + executor IterationExecutor, + // db creates a new batch for each block, and passed to the executor for adding updates, + // the batch is commited when it's full + db storage.DB, + // isBatchFull decides the batch size for each commit. + isBatchFull IsBatchFull, + // sleeper allows the caller to slow down the iteration after each batch is committed + // in order to minimize the impact on the system + sleeper Sleeper, +) error { + batch := db.NewBatch() + iteratedCountInCurrentBatch := 0 + + for { + // iterate over each block until the end + blockID, hasNext, err := iter.Next() + if err != nil { + return err + } + + if !hasNext { + // commit last batch + err := commitAndCheckpoint(batch, iter) + if err != nil { + return err + } + + break + } + + // prune all the data indexed by the block + err = executor.ExecuteByBlockID(blockID, batch) + if err != nil { + return fmt.Errorf("failed to prune by block ID %v: %w", blockID, err) + } + iteratedCountInCurrentBatch++ + + // if batch is full, commit and sleep + if isBatchFull(iteratedCountInCurrentBatch) { + // commit the batch and save the progress + err := commitAndCheckpoint(batch, iter) + if err != nil { + return err + } + + // wait a bit to minimize the impact on the system + sleeper() + + // create a new batch, and reset iteratedCountInCurrentBatch + batch = db.NewBatch() + iteratedCountInCurrentBatch = 0 + } + } + + return nil +} + +// commitAndCheckpoint commits the batch and checkpoints the iterator +// so that the iteration progress can be resumed after restart. +func commitAndCheckpoint(batch storage.Batch, iter module.BlockIterator) error { + err := batch.Commit() + if err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + _, err = iter.Checkpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint iterator: %w", err) + } + + return nil +} diff --git a/module/block_iterator/executor/executor_test.go b/module/block_iterator/executor/executor_test.go new file mode 100644 index 00000000000..df2f76b3320 --- /dev/null +++ b/module/block_iterator/executor/executor_test.go @@ -0,0 +1,206 @@ +package executor_test + +import ( + "errors" + "fmt" + "testing" + + "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/block_iterator/executor" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/utils/unittest" +) + +// verify the executor is able to iterate through all blocks from the iterator. +func TestExecute(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + blockCount := 10 + + // prepare data + cdps := make([]*flow.ChunkDataPack, 0, blockCount) + bs := make([]flow.Identifier, 0, blockCount) + for i := 0; i < blockCount; i++ { + cdp := unittest.ChunkDataPackFixture(unittest.IdentifierFixture()) + cdps = append(cdps, cdp) + bs = append(bs, cdp.ChunkID) + } + + pdb := pebbleimpl.ToDB(db) + + // store the chunk data packs to be pruned later + for _, cdp := range cdps { + sc := storage.ToStoredChunkDataPack(cdp) + require.NoError(t, pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertChunkDataPack(rw.Writer(), sc) + })) + } + + // it's ok the chunk ids is used as block ids, because the iterator + // basically just iterate over identifiers + iter := &iterator{blocks: bs} + pr := &testExecutor{ + executeByBlockID: func(id flow.Identifier, batch storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPack(batch.Writer(), id) + }, + } + + sleeper := &sleeper{} + + // prune blocks + batchSize := 3 + require.NoError(t, executor.IterateExecuteAndCommitInBatch(iter, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + + // expect all blocks are pruned + for _, b := range bs { + // verify they are pruned + var c storage.StoredChunkDataPack + err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err) + } + + // the sleeper should be called 3 times, because blockCount blocks will be pruned in 3 batchs. + require.Equal(t, blockCount/batchSize, sleeper.count) + }) +} + +// verify the pruning can be interrupted and resumed +func TestExecuteCanBeResumed(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + blockCount := 10 + + cdps := make([]*flow.ChunkDataPack, 0, blockCount) + bs := make([]flow.Identifier, 0, blockCount) + for i := 0; i < blockCount; i++ { + cdp := unittest.ChunkDataPackFixture(unittest.IdentifierFixture()) + cdps = append(cdps, cdp) + bs = append(bs, cdp.ChunkID) + } + + pdb := pebbleimpl.ToDB(db) + + // store the chunk data packs to be pruned later + for _, cdp := range cdps { + sc := storage.ToStoredChunkDataPack(cdp) + require.NoError(t, pdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertChunkDataPack(rw.Writer(), sc) + })) + } + + // it's ok the chunk ids is used as block ids, because the iterator + // basically just iterate over identifiers + iter := &iterator{blocks: bs} + interrupted := fmt.Errorf("interrupted") + pruneUntilInterrupted := &testExecutor{ + executeByBlockID: func(id flow.Identifier, batch storage.ReaderBatchWriter) error { + // the 5th block will interrupt the pruning + // since the 5th block belongs to the 2nd batch, + // only the first batch is actually pruned, + // which means blocks from 0 to 2 are pruned + if id == bs[5] { + return interrupted // return sentinel error to interrupt the pruning + } + return operation.RemoveChunkDataPack(batch.Writer(), id) + }, + } + + sleeper := &sleeper{} + + // prune blocks until interrupted at block 5 + batchSize := 3 + err := executor.IterateExecuteAndCommitInBatch(iter, pruneUntilInterrupted, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep) + require.True(t, errors.Is(err, interrupted), fmt.Errorf("expected %v but got %v", interrupted, err)) + + // expect all blocks are pruned + for i, b := range bs { + // verify they are pruned + var c storage.StoredChunkDataPack + + if i < 3 { + // the first 3 blocks in the first batch are pruned + err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound for block %v but got %v", i, err) + continue + } + + // verify the remaining blocks are not pruned yet + require.NoError(t, operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)) + } + + // the sleeper should be called once + require.Equal(t, 5/batchSize, sleeper.count) + + // now resume the pruning + iterToAll := restoreBlockIterator(iter.blocks, iter.stored) + + pr := &testExecutor{ + executeByBlockID: func(id flow.Identifier, batch storage.ReaderBatchWriter) error { + return operation.RemoveChunkDataPack(batch.Writer(), id) + }, + } + + require.NoError(t, executor.IterateExecuteAndCommitInBatch(iterToAll, pr, pdb, func(count int) bool { return count >= batchSize }, sleeper.Sleep)) + + // verify all blocks are pruned + for _, b := range bs { + var c storage.StoredChunkDataPack + // the first 5 blocks are pruned + err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err) + } + }) +} + +type iterator struct { + blocks []flow.Identifier + cur int + stored int +} + +var _ module.BlockIterator = (*iterator)(nil) + +func (b *iterator) Next() (flow.Identifier, bool, error) { + if b.cur >= len(b.blocks) { + return flow.Identifier{}, false, nil + } + + id := b.blocks[b.cur] + b.cur++ + return id, true, nil +} + +func (b *iterator) Checkpoint() (uint64, error) { + b.stored = b.cur + return uint64(b.cur), nil +} + +func restoreBlockIterator(blocks []flow.Identifier, stored int) *iterator { + return &iterator{ + blocks: blocks, + cur: stored, + stored: stored, + } +} + +type sleeper struct { + count int +} + +func (s *sleeper) Sleep() { + s.count++ +} + +type testExecutor struct { + executeByBlockID func(id flow.Identifier, batchWriter storage.ReaderBatchWriter) error +} + +var _ executor.IterationExecutor = (*testExecutor)(nil) + +func (p *testExecutor) ExecuteByBlockID(id flow.Identifier, batchWriter storage.ReaderBatchWriter) error { + return p.executeByBlockID(id, batchWriter) +} From 6dbca16b6da9af399629022b4791449fe0c3b3ba Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 30 Jan 2025 11:33:03 -0800 Subject: [PATCH 26/32] move block iterator initializer and range to progress --- module/block_iterator/creator.go | 10 +++--- module/block_iterator/initialize.go | 24 --------------- module/block_iterator/iterator.go | 12 +++----- module/block_iterator/progress.go | 48 +++++++++++++++++++++++++---- module/block_iterator/range.go | 25 --------------- 5 files changed, 50 insertions(+), 69 deletions(-) delete mode 100644 module/block_iterator/initialize.go delete mode 100644 module/block_iterator/range.go diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 334dccd7823..b192c5abb18 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -14,8 +14,7 @@ import ( // a new block iterator can be created to iterate through the next range. type Creator struct { getBlockIDByIndex func(uint64) (flow.Identifier, bool, error) - progress module.IterateProgress - latest func() (uint64, error) + progress *NextProgress } var _ module.IteratorCreator = (*Creator)(nil) @@ -28,7 +27,7 @@ func NewCreator( latest func() (uint64, error), ) (*Creator, error) { // initialize the progress in storage, saving the root block index in storage - progress, err := InitializeProgress(progressStorage, root) + progress, err := NewNextProgress(progressStorage, root, latest) if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } @@ -36,13 +35,12 @@ func NewCreator( return &Creator{ getBlockIDByIndex: getBlockIDByIndex, progress: progress, - latest: latest, }, nil } func (c *Creator) Create() (module.BlockIterator, error) { - // create a iteration range from the root block to the latest block - iterRange, err := CreateRange(c.progress, c.latest) + // create a iteration range from the first un-iterated to the latest block + iterRange, err := c.progress.NextRange() if err != nil { return nil, fmt.Errorf("failed to create range for block iteration: %w", err) } diff --git a/module/block_iterator/initialize.go b/module/block_iterator/initialize.go deleted file mode 100644 index f889433b548..00000000000 --- a/module/block_iterator/initialize.go +++ /dev/null @@ -1,24 +0,0 @@ -package block_iterator - -import ( - "errors" - "fmt" - - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/storage" -) - -func InitializeProgress(progress storage.ConsumerProgress, root uint64) (module.IterateProgress, error) { - _, err := progress.ProcessedIndex() - if errors.Is(err, storage.ErrNotFound) { - next := root + 1 - err = progress.InitProcessedIndex(next) - if err != nil { - return nil, fmt.Errorf("failed to init processed index: %w", err) - } - } - - nextProgress := NewNextProgress(progress) - - return nextProgress, nil -} diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index a291074f0f3..81563dd0c78 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -14,21 +14,17 @@ import ( // it's not concurrent safe, so don't use it in multiple goroutines type IndexedBlockIterator struct { // dependencies - getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error) + getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error) progress module.IterateProgressWriter // for saving the next index to be iterated for resuming the iteration - - // config - endIndex uint64 - - // state - nextIndex uint64 + endIndex uint64 // the end index to iterate, this never change + nextIndex uint64 // the start index to iterate, this will be updated after each iteration } var _ module.BlockIterator = (*IndexedBlockIterator)(nil) // caller must ensure that both iterRange.Start and iterRange.End are finalized func NewIndexedBlockIterator( - getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, excpetion error), + getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), progress module.IterateProgressWriter, iterRange module.IterateRange, ) module.BlockIterator { diff --git a/module/block_iterator/progress.go b/module/block_iterator/progress.go index c9ee6788afd..db9599534a8 100644 --- a/module/block_iterator/progress.go +++ b/module/block_iterator/progress.go @@ -1,21 +1,34 @@ package block_iterator import ( + "errors" + "fmt" + "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" ) type NextProgress struct { - store storage.ConsumerProgress + store storage.ConsumerProgress + latest func() (uint64, error) } -var _ module.IterateProgressReader = (*NextProgress)(nil) -var _ module.IterateProgressWriter = (*NextProgress)(nil) +var _ module.IterateProgress = (*NextProgress)(nil) -func NewNextProgress(store storage.ConsumerProgress) *NextProgress { - return &NextProgress{ - store: store, +func NewNextProgress(store storage.ConsumerProgress, root uint64, latest func() (uint64, error)) (*NextProgress, error) { + _, err := store.ProcessedIndex() + if errors.Is(err, storage.ErrNotFound) { + next := root + 1 + err = store.InitProcessedIndex(next) + if err != nil { + return nil, fmt.Errorf("failed to init processed index: %w", err) + } } + + return &NextProgress{ + store: store, + latest: latest, + }, nil } func (n *NextProgress) LoadState() (uint64, error) { @@ -25,3 +38,26 @@ func (n *NextProgress) LoadState() (uint64, error) { func (n *NextProgress) SaveState(next uint64) error { return n.store.SetProcessedIndex(next) } + +// NextRange returns the next range of blocks to iterate over +func (n *NextProgress) NextRange() (module.IterateRange, error) { + next, err := n.LoadState() + if err != nil { + return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) + } + + latest, err := n.latest() + if err != nil { + return module.IterateRange{}, fmt.Errorf("failed to get latest block: %w", err) + } + + if latest < next { + return module.IterateRange{}, fmt.Errorf("latest block is less than next block: %d < %d", latest, next) + } + + // iterate from next to latest (inclusive) + return module.IterateRange{ + Start: next, + End: latest, + }, nil +} diff --git a/module/block_iterator/range.go b/module/block_iterator/range.go deleted file mode 100644 index 3b7d5933f3b..00000000000 --- a/module/block_iterator/range.go +++ /dev/null @@ -1,25 +0,0 @@ -package block_iterator - -import ( - "fmt" - - "github.com/onflow/flow-go/module" -) - -func CreateRange(reader module.IterateProgressReader, getLatest func() (uint64, error)) (module.IterateRange, error) { - next, err := reader.LoadState() - if err != nil { - return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) - } - - latest, err := getLatest() - if err != nil { - return module.IterateRange{}, fmt.Errorf("failed to get latest block: %w", err) - } - - // iterate from next to latest (inclusive) - return module.IterateRange{ - Start: next, - End: latest, - }, nil -} From 9b863ea0b04e6a98e64887f6a6d33090caba2ff8 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 30 Jan 2025 11:51:09 -0800 Subject: [PATCH 27/32] add progress tests --- module/block_iterator/progress_test.go | 51 ++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 module/block_iterator/progress_test.go diff --git a/module/block_iterator/progress_test.go b/module/block_iterator/progress_test.go new file mode 100644 index 00000000000..1911775804b --- /dev/null +++ b/module/block_iterator/progress_test.go @@ -0,0 +1,51 @@ +package block_iterator + +import ( + "testing" + + "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" + + storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestProgress(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + root := uint64(10) + + latest := uint64(20) + getLatest := func() (uint64, error) { + return latest, nil + } + + store := storagepebble.NewConsumerProgress(db, "test") + + progress, err := NewNextProgress(store, root, getLatest) + require.NoError(t, err) + + // initial state should be the next of root + next, err := progress.LoadState() + require.NoError(t, err) + require.Equal(t, root+1, next) + + rg, err := progress.NextRange() + require.NoError(t, err) + require.Equal(t, root+1, rg.Start) + require.Equal(t, latest, rg.End) + + // save the state + err = progress.SaveState(latest + 1) + require.NoError(t, err) + + // update latest + oldLatest := latest + latest = latest + 20 + rg, err = progress.NextRange() + require.NoError(t, err) + + // verify the new range + require.Equal(t, oldLatest+1, rg.Start) + require.Equal(t, latest, rg.End) + }) +} From a3833083c85068500ab9df7b4d839491eafc1fc4 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 30 Jan 2025 12:19:34 -0800 Subject: [PATCH 28/32] rename modules --- module/block_iterator.go | 19 +++++++++---------- module/block_iterator/creator.go | 4 ++-- module/block_iterator/iterator.go | 8 ++++---- module/block_iterator/iterator_test.go | 4 ++-- .../block_iterator/{progress.go => state.go} | 15 ++++++++------- .../{progress_test.go => state_test.go} | 2 +- 6 files changed, 26 insertions(+), 26 deletions(-) rename module/block_iterator/{progress.go => state.go} (66%) rename module/block_iterator/{progress_test.go => state_test.go} (94%) diff --git a/module/block_iterator.go b/module/block_iterator.go index 5a2db6923af..7cf186500ea 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -13,23 +13,22 @@ type IterateRange struct { End uint64 // the end of the range } -// IterateProgress is an interface for reading and writing the progress of the iterator -type IterateProgress interface { - IterateProgressReader - IterateProgressWriter +// IteratorState is an interface for reading and writing the progress of the iterator +type IteratorState interface { + IteratorStateReader + IteratorStateWriter } -// IterateProgressReader reads the progress of the iterator, useful for resuming the iteration +// IteratorStateReader reads the progress of the iterator, useful for resuming the iteration // after restart -type IterateProgressReader interface { +type IteratorStateReader interface { // LoadState reads the next block to iterate - // caller must ensure the reader is created by the IterateProgressInitializer, - // otherwise LoadState would return exception. + // caller must ensure the state is initialized, otherwise LoadState would return exception. LoadState() (progress uint64, exception error) } -// IterateProgressWriter saves the progress of the iterator -type IterateProgressWriter interface { +// IteratorStateWriter saves the progress of the iterator +type IteratorStateWriter interface { // SaveState persists the next block to be iterated SaveState(uint64) (exception error) } diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index b192c5abb18..8227b1ddb95 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -14,7 +14,7 @@ import ( // a new block iterator can be created to iterate through the next range. type Creator struct { getBlockIDByIndex func(uint64) (flow.Identifier, bool, error) - progress *NextProgress + progress *PersistentIteratorState } var _ module.IteratorCreator = (*Creator)(nil) @@ -27,7 +27,7 @@ func NewCreator( latest func() (uint64, error), ) (*Creator, error) { // initialize the progress in storage, saving the root block index in storage - progress, err := NewNextProgress(progressStorage, root, latest) + progress, err := NewPersistentIteratorState(progressStorage, root, latest) if err != nil { return nil, fmt.Errorf("failed to initialize progress: %w", err) } diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index 81563dd0c78..c555ec34c10 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -15,9 +15,9 @@ import ( type IndexedBlockIterator struct { // dependencies getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error) - progress module.IterateProgressWriter // for saving the next index to be iterated for resuming the iteration - endIndex uint64 // the end index to iterate, this never change - nextIndex uint64 // the start index to iterate, this will be updated after each iteration + progress module.IteratorStateWriter // for saving the next index to be iterated for resuming the iteration + endIndex uint64 // the end index to iterate, this never change + nextIndex uint64 // the start index to iterate, this will be updated after each iteration } var _ module.BlockIterator = (*IndexedBlockIterator)(nil) @@ -25,7 +25,7 @@ var _ module.BlockIterator = (*IndexedBlockIterator)(nil) // caller must ensure that both iterRange.Start and iterRange.End are finalized func NewIndexedBlockIterator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), - progress module.IterateProgressWriter, + progress module.IteratorStateWriter, iterRange module.IterateRange, ) module.BlockIterator { return &IndexedBlockIterator{ diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index e6736c7c4ee..ac25a6b5391 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -81,8 +81,8 @@ type saveNextHeight struct { savedNextHeight uint64 } -var _ module.IterateProgressWriter = (*saveNextHeight)(nil) -var _ module.IterateProgressReader = (*saveNextHeight)(nil) +var _ module.IteratorStateWriter = (*saveNextHeight)(nil) +var _ module.IteratorStateReader = (*saveNextHeight)(nil) func (s *saveNextHeight) SaveState(height uint64) error { s.savedNextHeight = height diff --git a/module/block_iterator/progress.go b/module/block_iterator/state.go similarity index 66% rename from module/block_iterator/progress.go rename to module/block_iterator/state.go index db9599534a8..262f5b74d4d 100644 --- a/module/block_iterator/progress.go +++ b/module/block_iterator/state.go @@ -8,14 +8,15 @@ import ( "github.com/onflow/flow-go/storage" ) -type NextProgress struct { +// PersistentIteratorState stores the state of the iterator in a persistent storage +type PersistentIteratorState struct { store storage.ConsumerProgress latest func() (uint64, error) } -var _ module.IterateProgress = (*NextProgress)(nil) +var _ module.IteratorState = (*PersistentIteratorState)(nil) -func NewNextProgress(store storage.ConsumerProgress, root uint64, latest func() (uint64, error)) (*NextProgress, error) { +func NewPersistentIteratorState(store storage.ConsumerProgress, root uint64, latest func() (uint64, error)) (*PersistentIteratorState, error) { _, err := store.ProcessedIndex() if errors.Is(err, storage.ErrNotFound) { next := root + 1 @@ -25,22 +26,22 @@ func NewNextProgress(store storage.ConsumerProgress, root uint64, latest func() } } - return &NextProgress{ + return &PersistentIteratorState{ store: store, latest: latest, }, nil } -func (n *NextProgress) LoadState() (uint64, error) { +func (n *PersistentIteratorState) LoadState() (uint64, error) { return n.store.ProcessedIndex() } -func (n *NextProgress) SaveState(next uint64) error { +func (n *PersistentIteratorState) SaveState(next uint64) error { return n.store.SetProcessedIndex(next) } // NextRange returns the next range of blocks to iterate over -func (n *NextProgress) NextRange() (module.IterateRange, error) { +func (n *PersistentIteratorState) NextRange() (module.IterateRange, error) { next, err := n.LoadState() if err != nil { return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) diff --git a/module/block_iterator/progress_test.go b/module/block_iterator/state_test.go similarity index 94% rename from module/block_iterator/progress_test.go rename to module/block_iterator/state_test.go index 1911775804b..cf805d574f0 100644 --- a/module/block_iterator/progress_test.go +++ b/module/block_iterator/state_test.go @@ -21,7 +21,7 @@ func TestProgress(t *testing.T) { store := storagepebble.NewConsumerProgress(db, "test") - progress, err := NewNextProgress(store, root, getLatest) + progress, err := NewPersistentIteratorState(store, root, getLatest) require.NoError(t, err) // initial state should be the next of root From 490ab01d3d8f4309eac4484bd0c2280c831770a9 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 16:37:47 -0800 Subject: [PATCH 29/32] add comments --- module/block_iterator/creator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 8227b1ddb95..260735e9182 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -20,6 +20,10 @@ type Creator struct { var _ module.IteratorCreator = (*Creator)(nil) // NewCreator creates a block iterator that iterates through blocks by index. +// the root is the block index to start iterating from. (it could either root height or root view) +// the latest is a function that returns the latest block index. +// since latest is a function, the caller can reuse the creator to create block iterator one +// after another to iterate from the root to the latest, and from last iterated to the new latest. func NewCreator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), progressStorage storage.ConsumerProgress, From 39af8a111aa9d1c1b75f9d207a15fb4351f99a10 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 08:33:43 -0800 Subject: [PATCH 30/32] refactor into IteratorRange --- module/block_iterator.go | 4 ++-- module/block_iterator/iterator.go | 2 +- module/block_iterator/iterator_test.go | 2 +- module/block_iterator/state.go | 10 +++++----- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index 7cf186500ea..e9917c126e3 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -4,11 +4,11 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// IterateRange defines the range of blocks to iterate over +// IteratorRange defines the range of blocks to iterate over // the range could be either view based range or height based range. // when specifying the range, the start and end are inclusive, and the end must be greater than or // equal to the start -type IterateRange struct { +type IteratorRange struct { Start uint64 // the start of the range End uint64 // the end of the range } diff --git a/module/block_iterator/iterator.go b/module/block_iterator/iterator.go index c555ec34c10..158630c4edf 100644 --- a/module/block_iterator/iterator.go +++ b/module/block_iterator/iterator.go @@ -26,7 +26,7 @@ var _ module.BlockIterator = (*IndexedBlockIterator)(nil) func NewIndexedBlockIterator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), progress module.IteratorStateWriter, - iterRange module.IterateRange, + iterRange module.IteratorRange, ) module.BlockIterator { return &IndexedBlockIterator{ getBlockIDByIndex: getBlockIDByIndex, diff --git a/module/block_iterator/iterator_test.go b/module/block_iterator/iterator_test.go index ac25a6b5391..7d8ca35446b 100644 --- a/module/block_iterator/iterator_test.go +++ b/module/block_iterator/iterator_test.go @@ -32,7 +32,7 @@ func TestIterateHeight(t *testing.T) { // create iterator // b0 is the root block, iterate from b1 to b3 - iterRange := module.IterateRange{Start: b1.Height, End: b3.Height} + iterRange := module.IteratorRange{Start: b1.Height, End: b3.Height} headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) getBlockIDByIndex := func(height uint64) (flow.Identifier, bool, error) { blockID, err := headers.BlockIDByHeight(height) diff --git a/module/block_iterator/state.go b/module/block_iterator/state.go index 262f5b74d4d..2c8c2605371 100644 --- a/module/block_iterator/state.go +++ b/module/block_iterator/state.go @@ -41,23 +41,23 @@ func (n *PersistentIteratorState) SaveState(next uint64) error { } // NextRange returns the next range of blocks to iterate over -func (n *PersistentIteratorState) NextRange() (module.IterateRange, error) { +func (n *PersistentIteratorState) NextRange() (module.IteratorRange, error) { next, err := n.LoadState() if err != nil { - return module.IterateRange{}, fmt.Errorf("failed to read next height: %w", err) + return module.IteratorRange{}, fmt.Errorf("failed to read next height: %w", err) } latest, err := n.latest() if err != nil { - return module.IterateRange{}, fmt.Errorf("failed to get latest block: %w", err) + return module.IteratorRange{}, fmt.Errorf("failed to get latest block: %w", err) } if latest < next { - return module.IterateRange{}, fmt.Errorf("latest block is less than next block: %d < %d", latest, next) + return module.IteratorRange{}, fmt.Errorf("latest block is less than next block: %d < %d", latest, next) } // iterate from next to latest (inclusive) - return module.IterateRange{ + return module.IteratorRange{ Start: next, End: latest, }, nil From 6eb19416ceaaf0a639605de07c7e7637d69ae190 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 08:39:31 -0800 Subject: [PATCH 31/32] update tests --- module/block_iterator/state_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/module/block_iterator/state_test.go b/module/block_iterator/state_test.go index cf805d574f0..05ca6e72b92 100644 --- a/module/block_iterator/state_test.go +++ b/module/block_iterator/state_test.go @@ -38,6 +38,10 @@ func TestProgress(t *testing.T) { err = progress.SaveState(latest + 1) require.NoError(t, err) + next, err = progress.LoadState() + require.NoError(t, err) + require.Equal(t, latest+1, next) + // update latest oldLatest := latest latest = latest + 20 From 95c3acd5856ed3778146c091c2e88881e70536ba Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 31 Jan 2025 09:27:36 -0800 Subject: [PATCH 32/32] handle the case where there is no block to iterator --- module/block_iterator.go | 7 ++-- module/block_iterator/creator.go | 13 ++++--- module/block_iterator/creator_test.go | 52 ++++++++++++++++++++++++--- module/block_iterator/state.go | 18 +++++++--- module/block_iterator/state_test.go | 23 +++++++++--- 5 files changed, 93 insertions(+), 20 deletions(-) diff --git a/module/block_iterator.go b/module/block_iterator.go index e9917c126e3..dc6ba7e7c37 100644 --- a/module/block_iterator.go +++ b/module/block_iterator.go @@ -52,7 +52,8 @@ type BlockIterator interface { // then after restart, the iterator will resume from A. // make sure to call this after all the blocks for processing the block IDs returned by // Next() are completed. - // It returns the saved index (next index to iterate), and error returned are exceptions + // It returns the saved index (next index to iterate) + // any error returned are exceptions Checkpoint() (savedIndex uint64, exception error) } @@ -60,6 +61,8 @@ type BlockIterator interface { // a block iterator iterates through a saved index to the latest block. // after iterating through all the blocks in the range, the iterator can be discarded. // a new block iterator can be created to iterate through the next range. +// if there is no block to iterate, hasNext is false +// any error returned are exception type IteratorCreator interface { - Create() (fromSavedIndexToLatest BlockIterator, exception error) + Create() (fromSavedIndexToLatest BlockIterator, hasNext bool, exception error) } diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 260735e9182..09278373cf7 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -42,18 +42,23 @@ func NewCreator( }, nil } -func (c *Creator) Create() (module.BlockIterator, error) { +func (c *Creator) Create() (iter module.BlockIterator, hasNext bool, exception error) { // create a iteration range from the first un-iterated to the latest block - iterRange, err := c.progress.NextRange() + iterRange, hasNext, err := c.progress.NextRange() if err != nil { - return nil, fmt.Errorf("failed to create range for block iteration: %w", err) + return nil, false, fmt.Errorf("failed to create range for block iteration: %w", err) + } + + if !hasNext { + // no block to iterate + return nil, false, nil } // create a block iterator with // the function to get block ID by index, // the progress writer to update the progress in storage, // and the iteration range - return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), nil + return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), true, nil } // NewHeightBasedCreator creates a block iterator that iterates through blocks diff --git a/module/block_iterator/creator_test.go b/module/block_iterator/creator_test.go index dc03b09e08a..61aa58a17a5 100644 --- a/module/block_iterator/creator_test.go +++ b/module/block_iterator/creator_test.go @@ -16,6 +16,7 @@ import ( // TestCanResume: stop at a height, and take checkpoint, create a new iterator, // verify it will resume from the next height to the latest // TestCanSkipViewsIfNotIndexed: iterate through all views, and skip views that are not indexed +// TestCanSkipIfThereIsNoBlockToIterate: skip iterationg if there is no block to iterate func TestCanIterate(t *testing.T) { root := &flow.Header{Height: 0} @@ -55,8 +56,9 @@ func TestCanIterate(t *testing.T) { ) require.NoError(t, err) - iterator, err := creator.Create() + iterator, hasNext, err := creator.Create() require.NoError(t, err) + require.True(t, hasNext) // Iterate through blocks visitedBlocks := make([]flow.Identifier, 0, len(blocks)) @@ -99,8 +101,9 @@ func TestCanIterate(t *testing.T) { blocks = append(blocks, additionalBlocks...) // Create another iterator - iterator, err = creator.Create() + iterator, hasNext, err = creator.Create() require.NoError(t, err) + require.True(t, hasNext) // Iterate through initial blocks for i := 0; i < len(additionalBlocks); i++ { @@ -171,8 +174,9 @@ func TestCanResume(t *testing.T) { ) require.NoError(t, err) - iterator, err := creator.Create() + iterator, hasNext, err := creator.Create() require.NoError(t, err) + require.True(t, hasNext) // Iterate through blocks visitedBlocks := make([]flow.Identifier, 0, len(blocks)) @@ -209,8 +213,9 @@ func TestCanResume(t *testing.T) { ) require.NoError(t, err) - newIterator, err := newCreator.Create() + newIterator, hasNext, err := newCreator.Create() require.NoError(t, err) + require.True(t, hasNext) // iterate until the end for { @@ -271,8 +276,9 @@ func TestCanSkipViewsIfNotIndexed(t *testing.T) { ) require.NoError(t, err) - iterator, err := creator.Create() + iterator, hasNext, err := creator.Create() require.NoError(t, err) + require.True(t, hasNext) // Iterate through blocks visitedBlocks := make(map[flow.Identifier]struct{}) @@ -304,6 +310,42 @@ func TestCanSkipViewsIfNotIndexed(t *testing.T) { require.Equal(t, uint64(8), savedView, "Expected next view to be 8 (last View + 1)") } +func TestCanSkipIfThereIsNoBlockToIterate(t *testing.T) { + // Set up root block + root := &flow.Header{Height: 10} + + // Mock getBlockIDByHeight function + getBlockIDByHeight := func(height uint64) (flow.Identifier, error) { + return flow.Identifier{}, fmt.Errorf("block not found at height %d", height) + } + + // Mock progress tracker + progress := &mockProgress{} + + // Mock latest function that returns the same height as root + latest := func() (*flow.Header, error) { + return root, nil + } + + // Create iterator + creator, err := NewHeightBasedCreator( + getBlockIDByHeight, + progress, + root, + latest, + ) + require.NoError(t, err) + + // Create the iterator + _, hasNext, err := creator.Create() + require.NoError(t, err) + require.False(t, hasNext, "Expected no blocks to iterate") + + savedHeight, err := progress.ProcessedIndex() + require.NoError(t, err) + require.Equal(t, root.Height+1, savedHeight, "Expected saved height to be root height + 1") +} + type mockProgress struct { index uint64 initialized bool diff --git a/module/block_iterator/state.go b/module/block_iterator/state.go index 2c8c2605371..457169e4fc3 100644 --- a/module/block_iterator/state.go +++ b/module/block_iterator/state.go @@ -33,6 +33,7 @@ func NewPersistentIteratorState(store storage.ConsumerProgress, root uint64, lat } func (n *PersistentIteratorState) LoadState() (uint64, error) { + // TODO: adding cache return n.store.ProcessedIndex() } @@ -41,24 +42,31 @@ func (n *PersistentIteratorState) SaveState(next uint64) error { } // NextRange returns the next range of blocks to iterate over -func (n *PersistentIteratorState) NextRange() (module.IteratorRange, error) { +// the range is inclusive, and the end is the latest block +// if there is no block to iterate, hasNext is false +func (n *PersistentIteratorState) NextRange() (rg module.IteratorRange, hasNext bool, exception error) { next, err := n.LoadState() if err != nil { - return module.IteratorRange{}, fmt.Errorf("failed to read next height: %w", err) + return module.IteratorRange{}, false, fmt.Errorf("failed to read next height: %w", err) } latest, err := n.latest() if err != nil { - return module.IteratorRange{}, fmt.Errorf("failed to get latest block: %w", err) + return module.IteratorRange{}, false, fmt.Errorf("failed to get latest block: %w", err) + } + + // if the next is the next of the latest, then there is no block to iterate + if latest+1 == next { + return module.IteratorRange{}, false, nil } if latest < next { - return module.IteratorRange{}, fmt.Errorf("latest block is less than next block: %d < %d", latest, next) + return module.IteratorRange{}, false, fmt.Errorf("latest block is less than next block: %d < %d", latest, next) } // iterate from next to latest (inclusive) return module.IteratorRange{ Start: next, End: latest, - }, nil + }, true, nil } diff --git a/module/block_iterator/state_test.go b/module/block_iterator/state_test.go index 05ca6e72b92..f60ad325e61 100644 --- a/module/block_iterator/state_test.go +++ b/module/block_iterator/state_test.go @@ -24,13 +24,14 @@ func TestProgress(t *testing.T) { progress, err := NewPersistentIteratorState(store, root, getLatest) require.NoError(t, err) - // initial state should be the next of root + // 1. verify initial state should be the next of root next, err := progress.LoadState() require.NoError(t, err) require.Equal(t, root+1, next) - rg, err := progress.NextRange() + rg, hasNext, err := progress.NextRange() require.NoError(t, err) + require.True(t, hasNext) require.Equal(t, root+1, rg.Start) require.Equal(t, latest, rg.End) @@ -38,18 +39,32 @@ func TestProgress(t *testing.T) { err = progress.SaveState(latest + 1) require.NoError(t, err) + // 2. verify the saved state next, err = progress.LoadState() require.NoError(t, err) require.Equal(t, latest+1, next) - // update latest + // 3. verify when latest is updated to a higher height + // the end height of the next range should be updated oldLatest := latest latest = latest + 20 - rg, err = progress.NextRange() + rg, hasNext, err = progress.NextRange() require.NoError(t, err) + require.True(t, hasNext) // verify the new range require.Equal(t, oldLatest+1, rg.Start) require.Equal(t, latest, rg.End) + + // 4. verify when state is up to date, and latest + // does not change, the next range should include no block + err = progress.SaveState(latest + 1) + require.NoError(t, err) + + // verify that NextRange will return an error indicating that + // there is no block to iterate + rg, hasNext, err = progress.NextRange() + require.NoError(t, err) + require.False(t, hasNext) }) }