Skip to content

Commit

Permalink
feat: findPruneableHeaders unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd authored and renaynay committed Jan 4, 2024
1 parent 3a368b2 commit 48ecf6a
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 5 deletions.
24 changes: 20 additions & 4 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3), 10)
}

func NewCustomStore(
t *testing.T,
generator headertest.Generator[*header.ExtendedHeader],
numHeaders int,
) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders)
}

// NewTestSuite setups a new test suite with a given number of validators.
func NewTestSuite(t *testing.T, num int) *TestSuite {
valSet, vals := RandValidatorSet(num, 10)
Expand Down Expand Up @@ -77,8 +85,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader {
return eh
}

func MakeCommit(blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) {
func MakeCommit(
blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time,
) (*types.Commit, error) {

// all sign
for i := 0; i < len(validators); i++ {
Expand Down Expand Up @@ -152,7 +162,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader {
}

func (s *TestSuite) GenRawHeader(
height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader {
height uint64, lastHeader, lastCommit, dataHash libhead.Hash,
) *header.RawHeader {
rh := RandRawHeader(s.t)
rh.Height = int64(height)
rh.Time = time.Now()
Expand Down Expand Up @@ -204,6 +215,11 @@ func (s *TestSuite) nextProposer() *types.Validator {

// RandExtendedHeader provides an ExtendedHeader fixture.
func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
timestamp := time.Now()
return RandExtendedHeaderAtTimestamp(t, timestamp)
}

func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader {
dah := share.EmptyRoot()

rh := RandRawHeader(t)
Expand All @@ -214,7 +230,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet)
blockID := RandBlockID(t)
blockID.Hash = rh.Hash()
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now())
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp)
require.NoError(t, err)

return &header.ExtendedHeader{
Expand Down
44 changes: 43 additions & 1 deletion pruner/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pruner

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -30,15 +31,27 @@ func newCheckpoint(ds datastore.Datastore) *checkpoint {
// (outside the sampling window).
func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) {
lastPruned := s.lastPruned()

pruneCutoff := time.Now().Add(time.Duration(-s.window))
estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow

head, err := s.getter.Head(ctx)
if err != nil {
return nil, err
}
if head.Height() < estimatedCutoffHeight {
estimatedCutoffHeight = head.Height()
}

headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight)
if err != nil {
return nil, err
}

// if our estimated range didn't cover enough headers, we need to fetch more
// TODO: This is really inefficient in the case that lastPruned is the default value, or if the
// node has been offline for a long time. Instead of increasing the boundary by one in the for
// loop we could increase by a range every iteration
for {
lastHeader := headers[len(headers)-1]
if lastHeader.Time().After(pruneCutoff) {
Expand All @@ -60,12 +73,41 @@ func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedH
}

// we can ignore the rest of the headers since they are all newer than the cutoff
return headers[:i-1], nil
return headers[:i], nil
}
}
return headers, nil
}

// initializeCheckpoint initializes the checkpoint, storing the earliest header in the chain.
func (s *Service) initializeCheckpoint(ctx context.Context) error {
firstHeader, err := s.getter.GetByHeight(ctx, 1)
if err != nil {
return fmt.Errorf("failed to initialize checkpoint: %w", err)
}

return s.updateCheckpoint(ctx, firstHeader)
}

// loadCheckpoint loads the last checkpoint from disk, initializing it if it does not already exist.
func (s *Service) loadCheckpoint(ctx context.Context) error {
bin, err := s.checkpoint.ds.Get(ctx, lastPrunedHeaderKey)
if err != nil {
if err == datastore.ErrNotFound {
return s.initializeCheckpoint(ctx)
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}

var lastPruned header.ExtendedHeader
if err := lastPruned.UnmarshalJSON(bin); err != nil {
return fmt.Errorf("failed to load checkpoint: %w", err)
}

s.checkpoint.lastPrunedHeader.Store(&lastPruned)
return nil
}

// updateCheckpoint updates the checkpoint with the last pruned header height
// and persists it to disk.
func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error {
Expand Down
5 changes: 5 additions & 0 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func NewService(
func (s *Service) Start(context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())

err := s.loadCheckpoint(s.ctx)
if err != nil {
return err
}

go s.prune()
return nil
}
Expand Down
115 changes: 115 additions & 0 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,94 @@ func TestService(t *testing.T) {
t.Log(len(mp.deletedHeaderHashes)) // TODO @renaynay: expect something here
}

func TestFindPruneableHeaders(t *testing.T) {
testCases := []struct {
name string
availWindow AvailabilityWindow
blockTime time.Duration
startTime time.Time
headerAmount int
expectedLength int
}{
{
name: "Estimated range matches expected",
// Availability window is one week
availWindow: AvailabilityWindow(time.Hour * 24 * 7),
blockTime: time.Hour,
// Make two weeks of headers
headerAmount: 2 * (24 * 7),
startTime: time.Now().Add(-2 * time.Hour * 24 * 7),
// One week of headers are pruneable
expectedLength: 24 * 7,
},
{
name: "Estimated range not sufficient but finds the correct tail",
// Availability window is one week
availWindow: AvailabilityWindow(time.Hour * 24 * 7),
blockTime: time.Hour,
// Make three weeks of headers
headerAmount: 3 * (24 * 7),
startTime: time.Now().Add(-3 * time.Hour * 24 * 7),
// Two weeks of headers are pruneable
expectedLength: 2 * 24 * 7,
},
{
name: "No pruneable headers",
// Availability window is two weeks
availWindow: AvailabilityWindow(2 * time.Hour * 24 * 7),
blockTime: time.Hour,
// Make one week of headers
headerAmount: 24 * 7,
startTime: time.Now().Add(-time.Hour * 24 * 7),
// No headers are pruneable
expectedLength: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime)
store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount)

mp := &mockPruner{}

serv := NewService(
mp,
tc.availWindow,
store,
sync.MutexWrap(datastore.NewMapDatastore()),
tc.blockTime,
)

err := serv.Start(ctx)
require.NoError(t, err)

pruneable, err := serv.findPruneableHeaders(ctx)
require.NoError(t, err)
require.Len(t, pruneable, tc.expectedLength)

pruneableCutoff := time.Now().Add(-time.Duration(tc.availWindow))
// All returned headers are older than the availability window
for _, h := range pruneable {
require.WithinRange(t, h.Time(), tc.startTime, pruneableCutoff)
}

// The next header after the last pruneable header is too new to prune
if len(pruneable) != 0 {
lastPruneable := pruneable[len(pruneable)-1]
if lastPruneable.Height() != store.Height() {
firstUnpruneable, err := store.GetByHeight(ctx, lastPruneable.Height()+1)
require.NoError(t, err)
require.WithinRange(t, firstUnpruneable.Time(), pruneableCutoff, time.Now())
}
}
})
}
}

type mockPruner struct {
deletedHeaderHashes []hdr.Hash
}
Expand All @@ -70,3 +158,30 @@ func (mp *mockPruner) Prune(_ context.Context, headers ...*header.ExtendedHeader
}
return nil
}

type SpacedHeaderGenerator struct {
t *testing.T
TimeBetweenHeaders time.Duration
currentTime time.Time
currentHeight int64
}

func NewSpacedHeaderGenerator(
t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration,
) *SpacedHeaderGenerator {
return &SpacedHeaderGenerator{
t: t,
TimeBetweenHeaders: timeBetweenHeaders,
currentTime: startTime,
currentHeight: 1,
}
}

func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader {
h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime)
h.RawHeader.Height = shg.currentHeight
h.RawHeader.Time = shg.currentTime
shg.currentHeight++
shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders)
return h
}

0 comments on commit 48ecf6a

Please sign in to comment.