From 48ecf6a3685dfcb7702c047f355744b4f5ab0cdd Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 3 Jan 2024 10:36:04 -0700 Subject: [PATCH] feat: findPruneableHeaders unit tests --- header/headertest/testing.go | 24 ++++++-- pruner/finder.go | 44 +++++++++++++- pruner/service.go | 5 ++ pruner/service_test.go | 115 +++++++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 5 deletions(-) diff --git a/header/headertest/testing.go b/header/headertest/testing.go index 9907fd7eb4..05f325bcbb 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -42,6 +42,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] { return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3), 10) } +func NewCustomStore( + t *testing.T, + generator headertest.Generator[*header.ExtendedHeader], + numHeaders int, +) libhead.Store[*header.ExtendedHeader] { + return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders) +} + // NewTestSuite setups a new test suite with a given number of validators. func NewTestSuite(t *testing.T, num int) *TestSuite { valSet, vals := RandValidatorSet(num, 10) @@ -77,8 +85,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader { return eh } -func MakeCommit(blockID types.BlockID, height int64, round int32, - voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) { +func MakeCommit( + blockID types.BlockID, height int64, round int32, + voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time, +) (*types.Commit, error) { // all sign for i := 0; i < len(validators); i++ { @@ -152,7 +162,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader { } func (s *TestSuite) GenRawHeader( - height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader { + height uint64, lastHeader, lastCommit, dataHash libhead.Hash, +) *header.RawHeader { rh := RandRawHeader(s.t) rh.Height = int64(height) rh.Time = time.Now() @@ -204,6 +215,11 @@ func (s *TestSuite) nextProposer() *types.Validator { // RandExtendedHeader provides an ExtendedHeader fixture. func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { + timestamp := time.Now() + return RandExtendedHeaderAtTimestamp(t, timestamp) +} + +func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader { dah := share.EmptyRoot() rh := RandRawHeader(t) @@ -214,7 +230,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet) blockID := RandBlockID(t) blockID.Hash = rh.Hash() - commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now()) + commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp) require.NoError(t, err) return &header.ExtendedHeader{ diff --git a/pruner/finder.go b/pruner/finder.go index c26dce4826..60cba4d9f5 100644 --- a/pruner/finder.go +++ b/pruner/finder.go @@ -2,6 +2,7 @@ package pruner import ( "context" + "fmt" "sync/atomic" "time" @@ -30,15 +31,27 @@ func newCheckpoint(ds datastore.Datastore) *checkpoint { // (outside the sampling window). func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) { lastPruned := s.lastPruned() + pruneCutoff := time.Now().Add(time.Duration(-s.window)) estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow + head, err := s.getter.Head(ctx) + if err != nil { + return nil, err + } + if head.Height() < estimatedCutoffHeight { + estimatedCutoffHeight = head.Height() + } + headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight) if err != nil { return nil, err } // if our estimated range didn't cover enough headers, we need to fetch more + // TODO: This is really inefficient in the case that lastPruned is the default value, or if the + // node has been offline for a long time. Instead of increasing the boundary by one in the for + // loop we could increase by a range every iteration for { lastHeader := headers[len(headers)-1] if lastHeader.Time().After(pruneCutoff) { @@ -60,12 +73,41 @@ func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedH } // we can ignore the rest of the headers since they are all newer than the cutoff - return headers[:i-1], nil + return headers[:i], nil } } return headers, nil } +// initializeCheckpoint initializes the checkpoint, storing the earliest header in the chain. +func (s *Service) initializeCheckpoint(ctx context.Context) error { + firstHeader, err := s.getter.GetByHeight(ctx, 1) + if err != nil { + return fmt.Errorf("failed to initialize checkpoint: %w", err) + } + + return s.updateCheckpoint(ctx, firstHeader) +} + +// loadCheckpoint loads the last checkpoint from disk, initializing it if it does not already exist. +func (s *Service) loadCheckpoint(ctx context.Context) error { + bin, err := s.checkpoint.ds.Get(ctx, lastPrunedHeaderKey) + if err != nil { + if err == datastore.ErrNotFound { + return s.initializeCheckpoint(ctx) + } + return fmt.Errorf("failed to load checkpoint: %w", err) + } + + var lastPruned header.ExtendedHeader + if err := lastPruned.UnmarshalJSON(bin); err != nil { + return fmt.Errorf("failed to load checkpoint: %w", err) + } + + s.checkpoint.lastPrunedHeader.Store(&lastPruned) + return nil +} + // updateCheckpoint updates the checkpoint with the last pruned header height // and persists it to disk. func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error { diff --git a/pruner/service.go b/pruner/service.go index 5b5a52b59a..08ea26167f 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -60,6 +60,11 @@ func NewService( func (s *Service) Start(context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) + err := s.loadCheckpoint(s.ctx) + if err != nil { + return err + } + go s.prune() return nil } diff --git a/pruner/service_test.go b/pruner/service_test.go index 9e48fe959e..34bf32863a 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -60,6 +60,94 @@ func TestService(t *testing.T) { t.Log(len(mp.deletedHeaderHashes)) // TODO @renaynay: expect something here } +func TestFindPruneableHeaders(t *testing.T) { + testCases := []struct { + name string + availWindow AvailabilityWindow + blockTime time.Duration + startTime time.Time + headerAmount int + expectedLength int + }{ + { + name: "Estimated range matches expected", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make two weeks of headers + headerAmount: 2 * (24 * 7), + startTime: time.Now().Add(-2 * time.Hour * 24 * 7), + // One week of headers are pruneable + expectedLength: 24 * 7, + }, + { + name: "Estimated range not sufficient but finds the correct tail", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make three weeks of headers + headerAmount: 3 * (24 * 7), + startTime: time.Now().Add(-3 * time.Hour * 24 * 7), + // Two weeks of headers are pruneable + expectedLength: 2 * 24 * 7, + }, + { + name: "No pruneable headers", + // Availability window is two weeks + availWindow: AvailabilityWindow(2 * time.Hour * 24 * 7), + blockTime: time.Hour, + // Make one week of headers + headerAmount: 24 * 7, + startTime: time.Now().Add(-time.Hour * 24 * 7), + // No headers are pruneable + expectedLength: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime) + store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount) + + mp := &mockPruner{} + + serv := NewService( + mp, + tc.availWindow, + store, + sync.MutexWrap(datastore.NewMapDatastore()), + tc.blockTime, + ) + + err := serv.Start(ctx) + require.NoError(t, err) + + pruneable, err := serv.findPruneableHeaders(ctx) + require.NoError(t, err) + require.Len(t, pruneable, tc.expectedLength) + + pruneableCutoff := time.Now().Add(-time.Duration(tc.availWindow)) + // All returned headers are older than the availability window + for _, h := range pruneable { + require.WithinRange(t, h.Time(), tc.startTime, pruneableCutoff) + } + + // The next header after the last pruneable header is too new to prune + if len(pruneable) != 0 { + lastPruneable := pruneable[len(pruneable)-1] + if lastPruneable.Height() != store.Height() { + firstUnpruneable, err := store.GetByHeight(ctx, lastPruneable.Height()+1) + require.NoError(t, err) + require.WithinRange(t, firstUnpruneable.Time(), pruneableCutoff, time.Now()) + } + } + }) + } +} + type mockPruner struct { deletedHeaderHashes []hdr.Hash } @@ -70,3 +158,30 @@ func (mp *mockPruner) Prune(_ context.Context, headers ...*header.ExtendedHeader } return nil } + +type SpacedHeaderGenerator struct { + t *testing.T + TimeBetweenHeaders time.Duration + currentTime time.Time + currentHeight int64 +} + +func NewSpacedHeaderGenerator( + t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration, +) *SpacedHeaderGenerator { + return &SpacedHeaderGenerator{ + t: t, + TimeBetweenHeaders: timeBetweenHeaders, + currentTime: startTime, + currentHeight: 1, + } +} + +func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader { + h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime) + h.RawHeader.Height = shg.currentHeight + h.RawHeader.Time = shg.currentTime + shg.currentHeight++ + shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders) + return h +}