diff --git a/go.mod b/go.mod index ca449edfe7..7b774b6b3d 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/benbjohnson/clock v1.3.5 github.com/celestiaorg/celestia-app v1.4.0 github.com/celestiaorg/go-fraud v0.2.0 - github.com/celestiaorg/go-header v0.5.3 + github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808 github.com/celestiaorg/go-libp2p-messenger v0.2.0 github.com/celestiaorg/nmt v0.20.0 github.com/celestiaorg/rsmt2d v0.11.0 diff --git a/go.sum b/go.sum index 6fe259ff0c..c6cb4f2afb 100644 --- a/go.sum +++ b/go.sum @@ -368,8 +368,8 @@ github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 h1:Lj73O3S+KJ github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403/go.mod h1:cCGM1UoMvyTk8k62mkc+ReVu8iHBCtSBAAL4wYU7KEI= github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I= github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= -github.com/celestiaorg/go-header v0.5.3 h1:8CcflT6aIlcQXKNWcMekoBNs3EU50mEmDp17gbn1pP4= -github.com/celestiaorg/go-header v0.5.3/go.mod h1:7BVR6myjRfACbqW1de6s8OjuK66XzHm8MpFNYr0G+nU= +github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808 h1:TvOOMX0rSSTp4DCu53hn2NeaGcHwEMQ8S58TltFggRg= +github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808/go.mod h1:7BVR6myjRfACbqW1de6s8OjuK66XzHm8MpFNYr0G+nU= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= diff --git a/nodebuilder/pruner/constructors.go b/nodebuilder/pruner/constructors.go index 150d217f76..1b84d19d0d 100644 --- a/nodebuilder/pruner/constructors.go +++ b/nodebuilder/pruner/constructors.go @@ -17,12 +17,17 @@ func newPrunerService( ds datastore.Batching, opts ...pruner.Option, ) (*pruner.Service, error) { - serv := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...) + serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...) + if err != nil { + return nil, err + } + if MetricsEnabled { err := pruner.WithPrunerMetrics(serv) if err != nil { return nil, err } } + return serv, nil } diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index b0bec0f062..9545d51347 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -63,10 +63,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { // in which case, this can be enabled. case node.Light: return fx.Module("prune", - baseComponents, - fx.Provide(func() pruner.Pruner { - return light.NewPruner() - }), fx.Supply(light.Window), ) default: diff --git a/pruner/finder.go b/pruner/finder.go index 1cee6f26d3..187537a981 100644 --- a/pruner/finder.go +++ b/pruner/finder.go @@ -19,24 +19,21 @@ func (s *Service) findPruneableHeaders( lastPruned *header.ExtendedHeader, ) ([]*header.ExtendedHeader, error) { pruneCutoff := time.Now().UTC().Add(time.Duration(-s.window)) - estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow - head, err := s.getter.Head(ctx) + if !lastPruned.Time().UTC().Before(pruneCutoff) { + // this can happen when the network is young and all blocks + // are still within the AvailabilityWindow + return nil, nil + } + + estimatedCutoffHeight, err := s.calculateEstimatedCutoff(ctx, lastPruned, pruneCutoff) if err != nil { - log.Errorw("failed to get Head from header store", "error", err) return nil, err } - if head.Height() < estimatedCutoffHeight { - estimatedCutoffHeight = head.Height() - } log.Debugw("finder: fetching header range", "last pruned", lastPruned.Height(), "target height", estimatedCutoffHeight) - if estimatedCutoffHeight-lastPruned.Height() > maxHeadersPerLoop { - estimatedCutoffHeight = lastPruned.Height() + maxHeadersPerLoop - } - headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight) if err != nil { log.Errorw("failed to get range from header store", "from", lastPruned.Height(), @@ -85,3 +82,28 @@ func (s *Service) findPruneableHeaders( } return headers, nil } + +func (s *Service) calculateEstimatedCutoff( + ctx context.Context, + lastPruned *header.ExtendedHeader, + pruneCutoff time.Time, +) (uint64, error) { + estimatedRange := uint64(pruneCutoff.UTC().Sub(lastPruned.Time().UTC()) / s.blockTime) + estimatedCutoffHeight := lastPruned.Height() + estimatedRange + + head, err := s.getter.Head(ctx) + if err != nil { + log.Errorw("failed to get Head from header store", "error", err) + return 0, err + } + + if head.Height() < estimatedCutoffHeight { + estimatedCutoffHeight = head.Height() + } + + if estimatedCutoffHeight-lastPruned.Height() > maxHeadersPerLoop { + estimatedCutoffHeight = lastPruned.Height() + maxHeadersPerLoop + } + + return estimatedCutoffHeight, nil +} diff --git a/pruner/params.go b/pruner/params.go index 7a9b00336c..69755b89f6 100644 --- a/pruner/params.go +++ b/pruner/params.go @@ -1,6 +1,7 @@ package pruner import ( + "fmt" "time" ) @@ -12,6 +13,13 @@ type Params struct { gcCycle time.Duration } +func (p *Params) Validate() error { + if p.gcCycle == time.Duration(0) { + return fmt.Errorf("invalid GC cycle given, value should be positive and non-zero") + } + return nil +} + func DefaultParams() Params { return Params{ gcCycle: time.Minute * 5, @@ -26,14 +34,6 @@ func WithGCCycle(cycle time.Duration) Option { } } -// WithDisabledGC disables the pruning Service's pruning -// routine. -func WithDisabledGC() Option { - return func(p *Params) { - p.gcCycle = time.Duration(0) - } -} - // WithPrunerMetrics is a utility function to turn on pruner metrics and that is // expected to be "invoked" by the fx lifecycle. func WithPrunerMetrics(s *Service) error { diff --git a/pruner/service.go b/pruner/service.go index 7a58ff82e1..52879852f0 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -26,7 +26,7 @@ type Service struct { ds datastore.Datastore checkpoint *checkpoint - numBlocksInWindow uint64 + blockTime time.Duration ctx context.Context cancel context.CancelFunc @@ -43,24 +43,26 @@ func NewService( ds datastore.Datastore, blockTime time.Duration, opts ...Option, -) *Service { +) (*Service, error) { params := DefaultParams() for _, opt := range opts { opt(¶ms) } - numBlocksInWindow := uint64(time.Duration(window) / blockTime) + if err := params.Validate(); err != nil { + return nil, err + } return &Service{ - pruner: p, - window: window, - getter: getter, - checkpoint: &checkpoint{FailedHeaders: map[uint64]struct{}{}}, - ds: namespace.Wrap(ds, storePrefix), - numBlocksInWindow: numBlocksInWindow, - doneCh: make(chan struct{}), - params: params, - } + pruner: p, + window: window, + getter: getter, + checkpoint: &checkpoint{FailedHeaders: map[uint64]struct{}{}}, + ds: namespace.Wrap(ds, storePrefix), + blockTime: blockTime, + doneCh: make(chan struct{}), + params: params, + }, nil } func (s *Service) Start(context.Context) error { @@ -90,11 +92,6 @@ func (s *Service) Stop(ctx context.Context) error { func (s *Service) run() { defer close(s.doneCh) - if s.params.gcCycle == time.Duration(0) { - // Service is disabled, exit - return - } - ticker := time.NewTicker(s.params.gcCycle) defer ticker.Stop() diff --git a/pruner/service_test.go b/pruner/service_test.go index 15efc0d3e0..27f138fff0 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -38,17 +38,18 @@ func TestService(t *testing.T) { mp := &mockPruner{} - serv := NewService( + serv, err := NewService( mp, AvailabilityWindow(time.Millisecond*2), store, sync.MutexWrap(datastore.NewMapDatastore()), blockTime, ) + require.NoError(t, err) serv.ctx, serv.cancel = ctx, cancel - err := serv.loadCheckpoint(ctx) + err = serv.loadCheckpoint(ctx) require.NoError(t, err) time.Sleep(time.Millisecond * 2) @@ -79,16 +80,18 @@ func TestService_FailedAreRecorded(t *testing.T) { failHeight: map[uint64]int{4: 0, 5: 0, 13: 0}, } - serv := NewService( + serv, err := NewService( mp, AvailabilityWindow(time.Millisecond*20), store, sync.MutexWrap(datastore.NewMapDatastore()), blockTime, ) + require.NoError(t, err) + serv.ctx = ctx - err := serv.loadCheckpoint(ctx) + err = serv.loadCheckpoint(ctx) require.NoError(t, err) // ensures at least 13 blocks are prune-able @@ -122,16 +125,16 @@ func TestServiceCheckpointing(t *testing.T) { mp := &mockPruner{} - serv := NewService( + serv, err := NewService( mp, AvailabilityWindow(time.Second), store, sync.MutexWrap(datastore.NewMapDatastore()), time.Millisecond, - WithGCCycle(0), // we do not need to run GC in this test ) + require.NoError(t, err) - err := serv.Start(ctx) + err = serv.loadCheckpoint(ctx) require.NoError(t, err) // ensure checkpoint was initialized correctly @@ -142,9 +145,6 @@ func TestServiceCheckpointing(t *testing.T) { err = serv.updateCheckpoint(ctx, uint64(3), map[uint64]struct{}{2: {}}) require.NoError(t, err) - err = serv.Stop(ctx) - require.NoError(t, err) - // ensure checkpoint was updated correctly in datastore err = serv.loadCheckpoint(ctx) require.NoError(t, err) @@ -174,16 +174,17 @@ func TestPrune_LargeNumberOfBlocks(t *testing.T) { mp := &mockPruner{failHeight: make(map[uint64]int, 0)} - serv := NewService( + serv, err := NewService( mp, availabilityWindow, store, sync.MutexWrap(datastore.NewMapDatastore()), blockTime, ) + require.NoError(t, err) serv.ctx = ctx - err := serv.loadCheckpoint(ctx) + err = serv.loadCheckpoint(ctx) require.NoError(t, err) // ensures availability window has passed @@ -253,15 +254,16 @@ func TestFindPruneableHeaders(t *testing.T) { mp := &mockPruner{} - serv := NewService( + serv, err := NewService( mp, tc.availWindow, store, sync.MutexWrap(datastore.NewMapDatastore()), tc.blockTime, ) + require.NoError(t, err) - err := serv.Start(ctx) + err = serv.Start(ctx) require.NoError(t, err) lastPruned, err := serv.lastPruned(ctx)