From 3a368b22243d0e73dc7c462227d30ba9a8c78bab Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 5 Dec 2023 18:57:14 +0100 Subject: [PATCH 1/3] feat(nodebuilder/prune): Enable sampling window for light nodes prog prog prog prog prog prog feat: initial untested findPruneableHeaders impl change provides in nodebuilder provide properly to fx --- das/daser.go | 2 + das/daser_test.go | 37 ++++++++++++ go.mod | 2 + go.sum | 2 - libs/utils/resetctx.go | 4 +- nodebuilder/prune/constructors.go | 23 ++++++++ nodebuilder/prune/module.go | 10 +++- pruner/finder.go | 84 ++++++++++++++++++++++++++ pruner/light/window.go | 2 + pruner/params.go | 35 +++++++++++ pruner/service.go | 98 +++++++++++++++++++++++++++++-- pruner/service_test.go | 72 +++++++++++++++++++++++ 12 files changed, 362 insertions(+), 9 deletions(-) create mode 100644 nodebuilder/prune/constructors.go create mode 100644 pruner/finder.go create mode 100644 pruner/params.go create mode 100644 pruner/service_test.go diff --git a/das/daser.go b/das/daser.go index 40eee3d316..7d569f7e0b 100644 --- a/das/daser.go +++ b/das/daser.go @@ -151,6 +151,8 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error { // short-circuit if pruning is enabled and the header is outside the // availability window if !d.isWithinSamplingWindow(h) { + log.Debugw("skipping header outside sampling window", "height", h.Height(), + "time", h.Time()) return nil } diff --git a/das/daser_test.go b/das/daser_test.go index fd1eb39f7d..9eec6392cc 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -2,6 +2,7 @@ package das import ( "context" + "strconv" "testing" "time" @@ -244,6 +245,42 @@ func TestDASerSampleTimeout(t *testing.T) { } } +// TestDASer_SamplingWindow tests the sampling window determination +// for headers. +func TestDASer_SamplingWindow(t *testing.T) { + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + sub := new(headertest.Subscriber) + fserv := &fraudtest.DummyService[*header.ExtendedHeader]{} + getter := getterStub{} + avail := mocks.NewMockAvailability(gomock.NewController(t)) + + // create and start DASer + daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1), + WithSamplingWindow(time.Second)) + require.NoError(t, err) + + var tests = []struct { + timestamp time.Time + withinWindow bool + }{ + {timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false}, + {timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true}, + {timestamp: time.Now().Add(-(time.Hour)), withinWindow: false}, + {timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false}, + {timestamp: time.Now(), withinWindow: true}, + } + + for i, tt := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + eh := headertest.RandExtendedHeader(t) + eh.RawHeader.Time = tt.timestamp + + assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh)) + }) + } + +} + // createDASerSubcomponents takes numGetter (number of headers // to store in mockGetter) and numSub (number of headers to store // in the mock header.Subscriber), returning a newly instantiated diff --git a/go.mod b/go.mod index 6609aeba05..a60cc81648 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/celestiaorg/celestia-node go 1.21.1 +replace github.com/celestiaorg/go-header => /Users/rene/go/src/github.com/renaynay/go-header + require ( cosmossdk.io/errors v1.0.0 cosmossdk.io/math v1.2.0 diff --git a/go.sum b/go.sum index a8ad8ad148..6a580543d2 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,6 @@ github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXv github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY= github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I= github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= -github.com/celestiaorg/go-header v0.5.1 h1:1s1lw4fcCHalNK0qw/0a3cxg3ezx3Hl020znIxPZvtk= -github.com/celestiaorg/go-header v0.5.1/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= diff --git a/libs/utils/resetctx.go b/libs/utils/resetctx.go index 3014ba81db..a108cc27b4 100644 --- a/libs/utils/resetctx.go +++ b/libs/utils/resetctx.go @@ -1,6 +1,8 @@ package utils -import "context" +import ( + "context" +) // ResetContextOnError returns a fresh context if the given context has an error. func ResetContextOnError(ctx context.Context) context.Context { diff --git a/nodebuilder/prune/constructors.go b/nodebuilder/prune/constructors.go new file mode 100644 index 0000000000..8cc58aecd9 --- /dev/null +++ b/nodebuilder/prune/constructors.go @@ -0,0 +1,23 @@ +package prune + +import ( + "github.com/ipfs/go-datastore" + + hdr "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/pruner" +) + +func newPrunerService( + p pruner.Pruner, + window pruner.AvailabilityWindow, + getter hdr.Store[*header.ExtendedHeader], + ds datastore.Batching, + opts ...pruner.Option, +) *pruner.Service { + // TODO @renaynay: remove this once pruning implementation + opts = append(opts, pruner.WithDisabledGC()) + return pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...) +} diff --git a/nodebuilder/prune/module.go b/nodebuilder/prune/module.go index 330ef21cdc..b0723f96b1 100644 --- a/nodebuilder/prune/module.go +++ b/nodebuilder/prune/module.go @@ -2,6 +2,7 @@ package prune import ( "context" + "fmt" "go.uber.org/fx" @@ -12,9 +13,11 @@ import ( ) func ConstructModule(tp node.Type) fx.Option { + fmt.Print("\n\n\n\nconstructing pruning module\n\n\n\n") + baseComponents := fx.Options( fx.Provide(fx.Annotate( - pruner.NewService, + newPrunerService, fx.OnStart(func(ctx context.Context, p *pruner.Service) error { return p.Start(ctx) }), @@ -22,6 +25,9 @@ func ConstructModule(tp node.Type) fx.Option { return p.Stop(ctx) }), )), + // This is necessary to invoke the pruner service as independent thanks to a + // quirk in FX. + fx.Invoke(func(p *pruner.Service) {}), ) switch tp { @@ -39,7 +45,7 @@ func ConstructModule(tp node.Type) fx.Option { fx.Provide(func() pruner.Pruner { return light.NewPruner() }), - fx.Supply(archival.Window), // TODO @renaynay: turn this into light.Window in following PR + fx.Supply(light.Window), ) default: panic("unknown node type") diff --git a/pruner/finder.go b/pruner/finder.go new file mode 100644 index 0000000000..c26dce4826 --- /dev/null +++ b/pruner/finder.go @@ -0,0 +1,84 @@ +package pruner + +import ( + "context" + "sync/atomic" + "time" + + "github.com/ipfs/go-datastore" + + "github.com/celestiaorg/celestia-node/header" +) + +var ( + lastPrunedHeaderKey = datastore.NewKey("last_pruned_header") +) + +type checkpoint struct { + ds datastore.Datastore + + lastPrunedHeader atomic.Pointer[header.ExtendedHeader] + + // TODO @renaynay: keep track of failed roots to retry in separate job +} + +func newCheckpoint(ds datastore.Datastore) *checkpoint { + return &checkpoint{ds: ds} +} + +// findPruneableHeaders returns all headers that are eligible for pruning +// (outside the sampling window). +func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) { + lastPruned := s.lastPruned() + pruneCutoff := time.Now().Add(time.Duration(-s.window)) + estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow + + headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight) + if err != nil { + return nil, err + } + + // if our estimated range didn't cover enough headers, we need to fetch more + for { + lastHeader := headers[len(headers)-1] + if lastHeader.Time().After(pruneCutoff) { + break + } + + nextHeader, err := s.getter.GetByHeight(ctx, lastHeader.Height()+1) + if err != nil { + return nil, err + } + headers = append(headers, nextHeader) + } + + for i, h := range headers { + if h.Time().After(pruneCutoff) { + if i == 0 { + // we can't prune anything + return nil, nil + } + + // we can ignore the rest of the headers since they are all newer than the cutoff + return headers[:i-1], nil + } + } + return headers, nil +} + +// updateCheckpoint updates the checkpoint with the last pruned header height +// and persists it to disk. +func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error { + s.checkpoint.lastPrunedHeader.Store(lastPruned) + + bin, err := lastPruned.MarshalJSON() + if err != nil { + return err + } + + return s.checkpoint.ds.Put(ctx, lastPrunedHeaderKey, bin) +} + +func (s *Service) lastPruned() *header.ExtendedHeader { + return s.checkpoint.lastPrunedHeader.Load() +} diff --git a/pruner/light/window.go b/pruner/light/window.go index 53bfe4a163..dc1a9e4444 100644 --- a/pruner/light/window.go +++ b/pruner/light/window.go @@ -6,4 +6,6 @@ import ( "github.com/celestiaorg/celestia-node/pruner" ) +// Window is the availability window for light nodes in the Celestia +// network (30 days). const Window = pruner.AvailabilityWindow(time.Second * 86400 * 30) diff --git a/pruner/params.go b/pruner/params.go new file mode 100644 index 0000000000..4a0e25fdb1 --- /dev/null +++ b/pruner/params.go @@ -0,0 +1,35 @@ +package pruner + +import ( + "time" +) + +type Option func(*Params) + +type Params struct { + // gcCycle is the frequency at which the pruning Service + // runs the ticker. If set to 0, the Service will not run. + gcCycle time.Duration +} + +func DefaultParams() Params { + return Params{ + gcCycle: time.Hour, + } +} + +// WithGCCycle configures how often the pruning Service +// triggers a pruning cycle. +func WithGCCycle(cycle time.Duration) Option { + return func(p *Params) { + p.gcCycle = cycle + } +} + +// WithDisabledGC disables the pruning Service's pruning +// routine. +func WithDisabledGC() Option { + return func(p *Params) { + p.gcCycle = time.Duration(0) + } +} diff --git a/pruner/service.go b/pruner/service.go index f67265977a..5b5a52b59a 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -2,24 +2,114 @@ package pruner import ( "context" + "fmt" + "time" + + "github.com/ipfs/go-datastore" + + hdr "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" ) // Service handles the pruning routine for the node using the // prune Pruner. type Service struct { pruner Pruner + window AvailabilityWindow + + getter hdr.Getter[*header.ExtendedHeader] // TODO @renaynay: expects a header service with access to sync head + + checkpoint *checkpoint + numBlocksInWindow uint64 + + ctx context.Context + cancel context.CancelFunc + doneCh chan struct{} + + params Params } -func NewService(p Pruner) *Service { +func NewService( + p Pruner, + window AvailabilityWindow, + getter hdr.Getter[*header.ExtendedHeader], + ds datastore.Datastore, + blockTime time.Duration, + opts ...Option, +) *Service { + params := DefaultParams() + for _, opt := range opts { + opt(¶ms) + } + + // TODO @renaynay + numBlocksInWindow := uint64(time.Duration(window) / blockTime) + return &Service{ - pruner: p, + pruner: p, + window: window, + getter: getter, + checkpoint: newCheckpoint(ds), + numBlocksInWindow: numBlocksInWindow, + doneCh: make(chan struct{}), + params: params, } } func (s *Service) Start(context.Context) error { + s.ctx, s.cancel = context.WithCancel(context.Background()) + + go s.prune() return nil } -func (s *Service) Stop(context.Context) error { - return nil +func (s *Service) Stop(ctx context.Context) error { + s.cancel() + + select { + case <-s.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("pruner unable to exit within context deadline") + } +} + +func (s *Service) prune() { + if s.params.gcCycle == time.Duration(0) { + // Service is disabled, exit + close(s.doneCh) + return + } + + ticker := time.NewTicker(s.params.gcCycle) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + close(s.doneCh) + return + case <-ticker.C: + headers, err := s.findPruneableHeaders(s.ctx) + if err != nil { + // TODO @renaynay: record + report errors properly + continue + } + // TODO @renaynay: make deadline a param ? / configurable? + pruneCtx, cancel := context.WithDeadline(s.ctx, time.Now().Add(time.Minute)) + err = s.pruner.Prune(pruneCtx, headers...) + cancel() + if err != nil { + // TODO @renaynay: record + report errors properly + continue + } + + err = s.updateCheckpoint(s.ctx, headers[len(headers)-1]) + if err != nil { + // TODO @renaynay: record + report errors properly + continue + } + } + } } diff --git a/pruner/service_test.go b/pruner/service_test.go new file mode 100644 index 0000000000..9e48fe959e --- /dev/null +++ b/pruner/service_test.go @@ -0,0 +1,72 @@ +package pruner + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + hdr "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/header/headertest" +) + +/* + | toPrune | availability window | +*/ + +// TODO @renaynay: tweak/document +var ( + availWindow = AvailabilityWindow(time.Millisecond) + blockTime = time.Millisecond * 100 + gcCycle = time.Millisecond * 500 +) + +func TestService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := headertest.NewStore(t) + + mp := &mockPruner{} + + serv := NewService( + mp, + availWindow, + store, + sync.MutexWrap(datastore.NewMapDatastore()), + blockTime, + WithGCCycle(gcCycle), + ) + + gen, err := store.GetByHeight(ctx, 1) + require.NoError(t, err) + + err = serv.updateCheckpoint(ctx, gen) + require.NoError(t, err) + + err = serv.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Second) + + err = serv.Stop(ctx) + require.NoError(t, err) + + t.Log(len(mp.deletedHeaderHashes)) // TODO @renaynay: expect something here +} + +type mockPruner struct { + deletedHeaderHashes []hdr.Hash +} + +func (mp *mockPruner) Prune(_ context.Context, headers ...*header.ExtendedHeader) error { + for _, h := range headers { + mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, h.Hash()) + } + return nil +} From 48ecf6a3685dfcb7702c047f355744b4f5ab0cdd Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 3 Jan 2024 10:36:04 -0700 Subject: [PATCH 2/3] feat: findPruneableHeaders unit tests --- header/headertest/testing.go | 24 ++++++-- pruner/finder.go | 44 +++++++++++++- pruner/service.go | 5 ++ pruner/service_test.go | 115 +++++++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 5 deletions(-) diff --git a/header/headertest/testing.go b/header/headertest/testing.go index 9907fd7eb4..05f325bcbb 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -42,6 +42,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] { return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3), 10) } +func NewCustomStore( + t *testing.T, + generator headertest.Generator[*header.ExtendedHeader], + numHeaders int, +) libhead.Store[*header.ExtendedHeader] { + return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders) +} + // NewTestSuite setups a new test suite with a given number of validators. func NewTestSuite(t *testing.T, num int) *TestSuite { valSet, vals := RandValidatorSet(num, 10) @@ -77,8 +85,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader { return eh } -func MakeCommit(blockID types.BlockID, height int64, round int32, - voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) { +func MakeCommit( + blockID types.BlockID, height int64, round int32, + voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time, +) (*types.Commit, error) { // all sign for i := 0; i < len(validators); i++ { @@ -152,7 +162,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader { } func (s *TestSuite) GenRawHeader( - height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader { + height uint64, lastHeader, lastCommit, dataHash libhead.Hash, +) *header.RawHeader { rh := RandRawHeader(s.t) rh.Height = int64(height) rh.Time = time.Now() @@ -204,6 +215,11 @@ func (s *TestSuite) nextProposer() *types.Validator { // RandExtendedHeader provides an ExtendedHeader fixture. func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { + timestamp := time.Now() + return RandExtendedHeaderAtTimestamp(t, timestamp) +} + +func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader { dah := share.EmptyRoot() rh := RandRawHeader(t) @@ -214,7 +230,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet) blockID := RandBlockID(t) blockID.Hash = rh.Hash() - commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now()) + commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp) require.NoError(t, err) return &header.ExtendedHeader{ diff --git a/pruner/finder.go b/pruner/finder.go index c26dce4826..60cba4d9f5 100644 --- a/pruner/finder.go +++ b/pruner/finder.go @@ -2,6 +2,7 @@ package pruner import ( "context" + "fmt" "sync/atomic" "time" @@ -30,15 +31,27 @@ func newCheckpoint(ds datastore.Datastore) *checkpoint { // (outside the sampling window). func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) { lastPruned := s.lastPruned() + pruneCutoff := time.Now().Add(time.Duration(-s.window)) estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow + head, err := s.getter.Head(ctx) + if err != nil { + return nil, err + } + if head.Height() < estimatedCutoffHeight { + estimatedCutoffHeight = head.Height() + } + headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight) if err != nil { return nil, err } // if our estimated range didn't cover enough headers, we need to fetch more + // TODO: This is really inefficient in the case that lastPruned is the default value, or if the + // node has been offline for a long time. Instead of increasing the boundary by one in the for + // loop we could increase by a range every iteration for { lastHeader := headers[len(headers)-1] if lastHeader.Time().After(pruneCutoff) { @@ -60,12 +73,41 @@ func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedH } // we can ignore the rest of the headers since they are all newer than the cutoff - return headers[:i-1], nil + return headers[:i], nil } } return headers, nil } +// initializeCheckpoint initializes the checkpoint, storing the earliest header in the chain. +func (s *Service) initializeCheckpoint(ctx context.Context) error { + firstHeader, err := s.getter.GetByHeight(ctx, 1) + if err != nil { + return fmt.Errorf("failed to initialize checkpoint: %w", err) + } + + return s.updateCheckpoint(ctx, firstHeader) +} + +// loadCheckpoint loads the last checkpoint from disk, initializing it if it does not already exist. +func (s *Service) loadCheckpoint(ctx context.Context) error { + bin, err := s.checkpoint.ds.Get(ctx, lastPrunedHeaderKey) + if err != nil { + if err == datastore.ErrNotFound { + return s.initializeCheckpoint(ctx) + } + return fmt.Errorf("failed to load checkpoint: %w", err) + } + + var lastPruned header.ExtendedHeader + if err := lastPruned.UnmarshalJSON(bin); err != nil { + return fmt.Errorf("failed to load checkpoint: %w", err) + } + + s.checkpoint.lastPrunedHeader.Store(&lastPruned) + return nil +} + // updateCheckpoint updates the checkpoint with the last pruned header height // and persists it to disk. func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error { diff --git a/pruner/service.go b/pruner/service.go index 5b5a52b59a..08ea26167f 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -60,6 +60,11 @@ func NewService( func (s *Service) Start(context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) + err := s.loadCheckpoint(s.ctx) + if err != nil { + return err + } + go s.prune() return nil } diff --git a/pruner/service_test.go b/pruner/service_test.go index 9e48fe959e..34bf32863a 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -60,6 +60,94 @@ func TestService(t *testing.T) { t.Log(len(mp.deletedHeaderHashes)) // TODO @renaynay: expect something here } +func TestFindPruneableHeaders(t *testing.T) { + testCases := []struct { + name string + availWindow AvailabilityWindow + blockTime time.Duration + startTime time.Time + headerAmount int + expectedLength int + }{ + { + name: "Estimated range matches expected", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make two weeks of headers + headerAmount: 2 * (24 * 7), + startTime: time.Now().Add(-2 * time.Hour * 24 * 7), + // One week of headers are pruneable + expectedLength: 24 * 7, + }, + { + name: "Estimated range not sufficient but finds the correct tail", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make three weeks of headers + headerAmount: 3 * (24 * 7), + startTime: time.Now().Add(-3 * time.Hour * 24 * 7), + // Two weeks of headers are pruneable + expectedLength: 2 * 24 * 7, + }, + { + name: "No pruneable headers", + // Availability window is two weeks + availWindow: AvailabilityWindow(2 * time.Hour * 24 * 7), + blockTime: time.Hour, + // Make one week of headers + headerAmount: 24 * 7, + startTime: time.Now().Add(-time.Hour * 24 * 7), + // No headers are pruneable + expectedLength: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime) + store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount) + + mp := &mockPruner{} + + serv := NewService( + mp, + tc.availWindow, + store, + sync.MutexWrap(datastore.NewMapDatastore()), + tc.blockTime, + ) + + err := serv.Start(ctx) + require.NoError(t, err) + + pruneable, err := serv.findPruneableHeaders(ctx) + require.NoError(t, err) + require.Len(t, pruneable, tc.expectedLength) + + pruneableCutoff := time.Now().Add(-time.Duration(tc.availWindow)) + // All returned headers are older than the availability window + for _, h := range pruneable { + require.WithinRange(t, h.Time(), tc.startTime, pruneableCutoff) + } + + // The next header after the last pruneable header is too new to prune + if len(pruneable) != 0 { + lastPruneable := pruneable[len(pruneable)-1] + if lastPruneable.Height() != store.Height() { + firstUnpruneable, err := store.GetByHeight(ctx, lastPruneable.Height()+1) + require.NoError(t, err) + require.WithinRange(t, firstUnpruneable.Time(), pruneableCutoff, time.Now()) + } + } + }) + } +} + type mockPruner struct { deletedHeaderHashes []hdr.Hash } @@ -70,3 +158,30 @@ func (mp *mockPruner) Prune(_ context.Context, headers ...*header.ExtendedHeader } return nil } + +type SpacedHeaderGenerator struct { + t *testing.T + TimeBetweenHeaders time.Duration + currentTime time.Time + currentHeight int64 +} + +func NewSpacedHeaderGenerator( + t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration, +) *SpacedHeaderGenerator { + return &SpacedHeaderGenerator{ + t: t, + TimeBetweenHeaders: timeBetweenHeaders, + currentTime: startTime, + currentHeight: 1, + } +} + +func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader { + h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime) + h.RawHeader.Height = shg.currentHeight + h.RawHeader.Time = shg.currentTime + shg.currentHeight++ + shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders) + return h +} From 395471e857af12454da03c807d1ada9f59f20402 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:33:16 +0100 Subject: [PATCH 3/3] go.mod: remove replace --- go.mod | 2 -- go.sum | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index a60cc81648..6609aeba05 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/celestiaorg/celestia-node go 1.21.1 -replace github.com/celestiaorg/go-header => /Users/rene/go/src/github.com/renaynay/go-header - require ( cosmossdk.io/errors v1.0.0 cosmossdk.io/math v1.2.0 diff --git a/go.sum b/go.sum index 6a580543d2..a8ad8ad148 100644 --- a/go.sum +++ b/go.sum @@ -372,6 +372,8 @@ github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXv github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY= github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I= github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= +github.com/celestiaorg/go-header v0.5.1 h1:1s1lw4fcCHalNK0qw/0a3cxg3ezx3Hl020znIxPZvtk= +github.com/celestiaorg/go-header v0.5.1/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=