Skip to content

Commit

Permalink
feat(pruner): change finder algo to use blockTime, remove disabledGC …
Browse files Browse the repository at this point in the history
…opt,
  • Loading branch information
renaynay committed Mar 15, 2024
1 parent f4cbf7b commit d69a5cb
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app v1.4.0
github.com/celestiaorg/go-fraud v0.2.0
github.com/celestiaorg/go-header v0.5.3
github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808
github.com/celestiaorg/go-libp2p-messenger v0.2.0
github.com/celestiaorg/nmt v0.20.0
github.com/celestiaorg/rsmt2d v0.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 h1:Lj73O3S+KJ
github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403/go.mod h1:cCGM1UoMvyTk8k62mkc+ReVu8iHBCtSBAAL4wYU7KEI=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc=
github.com/celestiaorg/go-header v0.5.3 h1:8CcflT6aIlcQXKNWcMekoBNs3EU50mEmDp17gbn1pP4=
github.com/celestiaorg/go-header v0.5.3/go.mod h1:7BVR6myjRfACbqW1de6s8OjuK66XzHm8MpFNYr0G+nU=
github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808 h1:TvOOMX0rSSTp4DCu53hn2NeaGcHwEMQ8S58TltFggRg=
github.com/celestiaorg/go-header v0.5.5-0.20240315130212-bdda26aab808/go.mod h1:7BVR6myjRfACbqW1de6s8OjuK66XzHm8MpFNYr0G+nU=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
7 changes: 6 additions & 1 deletion nodebuilder/pruner/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ func newPrunerService(
ds datastore.Batching,
opts ...pruner.Option,
) (*pruner.Service, error) {
serv := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
if err != nil {
return nil, err
}

if MetricsEnabled {
err := pruner.WithPrunerMetrics(serv)
if err != nil {
return nil, err
}
}

return serv, nil
}
4 changes: 0 additions & 4 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// in which case, this can be enabled.
case node.Light:
return fx.Module("prune",
baseComponents,
fx.Provide(func() pruner.Pruner {
return light.NewPruner()
}),
fx.Supply(light.Window),
)
default:
Expand Down
42 changes: 32 additions & 10 deletions pruner/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ func (s *Service) findPruneableHeaders(
lastPruned *header.ExtendedHeader,
) ([]*header.ExtendedHeader, error) {
pruneCutoff := time.Now().UTC().Add(time.Duration(-s.window))
estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow

head, err := s.getter.Head(ctx)
if !lastPruned.Time().UTC().Before(pruneCutoff) {
// this can happen when the network is young and all blocks
// are still within the AvailabilityWindow
return nil, nil
}

estimatedCutoffHeight, err := s.calculateEstimatedCutoff(ctx, lastPruned, pruneCutoff)
if err != nil {
log.Errorw("failed to get Head from header store", "error", err)
return nil, err
}
if head.Height() < estimatedCutoffHeight {
estimatedCutoffHeight = head.Height()
}

log.Debugw("finder: fetching header range", "last pruned", lastPruned.Height(),
"target height", estimatedCutoffHeight)

if estimatedCutoffHeight-lastPruned.Height() > maxHeadersPerLoop {
estimatedCutoffHeight = lastPruned.Height() + maxHeadersPerLoop
}

headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight)
if err != nil {
log.Errorw("failed to get range from header store", "from", lastPruned.Height(),
Expand Down Expand Up @@ -85,3 +82,28 @@ func (s *Service) findPruneableHeaders(
}
return headers, nil
}

func (s *Service) calculateEstimatedCutoff(
ctx context.Context,
lastPruned *header.ExtendedHeader,
pruneCutoff time.Time,
) (uint64, error) {
estimatedRange := uint64(pruneCutoff.UTC().Sub(lastPruned.Time().UTC()) / s.blockTime)
estimatedCutoffHeight := lastPruned.Height() + estimatedRange

head, err := s.getter.Head(ctx)
if err != nil {
log.Errorw("failed to get Head from header store", "error", err)
return 0, err
}

if head.Height() < estimatedCutoffHeight {
estimatedCutoffHeight = head.Height()
}

if estimatedCutoffHeight-lastPruned.Height() > maxHeadersPerLoop {
estimatedCutoffHeight = lastPruned.Height() + maxHeadersPerLoop
}

return estimatedCutoffHeight, nil
}
16 changes: 8 additions & 8 deletions pruner/params.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pruner

import (
"fmt"
"time"
)

Expand All @@ -12,6 +13,13 @@ type Params struct {
gcCycle time.Duration
}

func (p *Params) Validate() error {
if p.gcCycle == time.Duration(0) {
return fmt.Errorf("invalid GC cycle given, value should be positive and non-zero")
}
return nil
}

func DefaultParams() Params {
return Params{
gcCycle: time.Minute * 5,
Expand All @@ -26,14 +34,6 @@ func WithGCCycle(cycle time.Duration) Option {
}
}

// WithDisabledGC disables the pruning Service's pruning
// routine.
func WithDisabledGC() Option {
return func(p *Params) {
p.gcCycle = time.Duration(0)
}
}

// WithPrunerMetrics is a utility function to turn on pruner metrics and that is
// expected to be "invoked" by the fx lifecycle.
func WithPrunerMetrics(s *Service) error {
Expand Down
31 changes: 14 additions & 17 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Service struct {
ds datastore.Datastore
checkpoint *checkpoint

numBlocksInWindow uint64
blockTime time.Duration

ctx context.Context
cancel context.CancelFunc
Expand All @@ -43,24 +43,26 @@ func NewService(
ds datastore.Datastore,
blockTime time.Duration,
opts ...Option,
) *Service {
) (*Service, error) {
params := DefaultParams()
for _, opt := range opts {
opt(&params)
}

numBlocksInWindow := uint64(time.Duration(window) / blockTime)
if err := params.Validate(); err != nil {
return nil, err
}

return &Service{
pruner: p,
window: window,
getter: getter,
checkpoint: &checkpoint{FailedHeaders: map[uint64]struct{}{}},
ds: namespace.Wrap(ds, storePrefix),
numBlocksInWindow: numBlocksInWindow,
doneCh: make(chan struct{}),
params: params,
}
pruner: p,
window: window,
getter: getter,
checkpoint: &checkpoint{FailedHeaders: map[uint64]struct{}{}},
ds: namespace.Wrap(ds, storePrefix),
blockTime: blockTime,
doneCh: make(chan struct{}),
params: params,
}, nil
}

func (s *Service) Start(context.Context) error {
Expand Down Expand Up @@ -90,11 +92,6 @@ func (s *Service) Stop(ctx context.Context) error {
func (s *Service) run() {
defer close(s.doneCh)

if s.params.gcCycle == time.Duration(0) {
// Service is disabled, exit
return
}

ticker := time.NewTicker(s.params.gcCycle)
defer ticker.Stop()

Expand Down
30 changes: 16 additions & 14 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ func TestService(t *testing.T) {

mp := &mockPruner{}

serv := NewService(
serv, err := NewService(
mp,
AvailabilityWindow(time.Millisecond*2),
store,
sync.MutexWrap(datastore.NewMapDatastore()),
blockTime,
)
require.NoError(t, err)

serv.ctx, serv.cancel = ctx, cancel

err := serv.loadCheckpoint(ctx)
err = serv.loadCheckpoint(ctx)
require.NoError(t, err)

time.Sleep(time.Millisecond * 2)
Expand Down Expand Up @@ -79,16 +80,18 @@ func TestService_FailedAreRecorded(t *testing.T) {
failHeight: map[uint64]int{4: 0, 5: 0, 13: 0},
}

serv := NewService(
serv, err := NewService(
mp,
AvailabilityWindow(time.Millisecond*20),
store,
sync.MutexWrap(datastore.NewMapDatastore()),
blockTime,
)
require.NoError(t, err)

serv.ctx = ctx

err := serv.loadCheckpoint(ctx)
err = serv.loadCheckpoint(ctx)
require.NoError(t, err)

// ensures at least 13 blocks are prune-able
Expand Down Expand Up @@ -122,16 +125,16 @@ func TestServiceCheckpointing(t *testing.T) {

mp := &mockPruner{}

serv := NewService(
serv, err := NewService(
mp,
AvailabilityWindow(time.Second),
store,
sync.MutexWrap(datastore.NewMapDatastore()),
time.Millisecond,
WithGCCycle(0), // we do not need to run GC in this test
)
require.NoError(t, err)

err := serv.Start(ctx)
err = serv.loadCheckpoint(ctx)
require.NoError(t, err)

// ensure checkpoint was initialized correctly
Expand All @@ -142,9 +145,6 @@ func TestServiceCheckpointing(t *testing.T) {
err = serv.updateCheckpoint(ctx, uint64(3), map[uint64]struct{}{2: {}})
require.NoError(t, err)

err = serv.Stop(ctx)
require.NoError(t, err)

// ensure checkpoint was updated correctly in datastore
err = serv.loadCheckpoint(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -174,16 +174,17 @@ func TestPrune_LargeNumberOfBlocks(t *testing.T) {

mp := &mockPruner{failHeight: make(map[uint64]int, 0)}

serv := NewService(
serv, err := NewService(
mp,
availabilityWindow,
store,
sync.MutexWrap(datastore.NewMapDatastore()),
blockTime,
)
require.NoError(t, err)
serv.ctx = ctx

err := serv.loadCheckpoint(ctx)
err = serv.loadCheckpoint(ctx)
require.NoError(t, err)

// ensures availability window has passed
Expand Down Expand Up @@ -253,15 +254,16 @@ func TestFindPruneableHeaders(t *testing.T) {

mp := &mockPruner{}

serv := NewService(
serv, err := NewService(
mp,
tc.availWindow,
store,
sync.MutexWrap(datastore.NewMapDatastore()),
tc.blockTime,
)
require.NoError(t, err)

err := serv.Start(ctx)
err = serv.Start(ctx)
require.NoError(t, err)

lastPruned, err := serv.lastPruned(ctx)
Expand Down

0 comments on commit d69a5cb

Please sign in to comment.