Skip to content

Commit

Permalink
feat(nodebuilder/prune): Enable sampling window for light nodes
Browse files Browse the repository at this point in the history
prog

prog

prog

prog

prog

prog

feat: initial untested findPruneableHeaders impl

change provides in nodebuilder

provide properly to fx
  • Loading branch information
renaynay committed Jan 4, 2024
1 parent f75e255 commit 3a368b2
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 9 deletions.
2 changes: 2 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !d.isWithinSamplingWindow(h) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return nil
}

Expand Down
37 changes: 37 additions & 0 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package das

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -244,6 +245,42 @@ func TestDASerSampleTimeout(t *testing.T) {
}
}

// TestDASer_SamplingWindow tests the sampling window determination
// for headers.
func TestDASer_SamplingWindow(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
fserv := &fraudtest.DummyService[*header.ExtendedHeader]{}
getter := getterStub{}
avail := mocks.NewMockAvailability(gomock.NewController(t))

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(time.Second))
require.NoError(t, err)

var tests = []struct {
timestamp time.Time
withinWindow bool
}{
{timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true},
{timestamp: time.Now().Add(-(time.Hour)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false},
{timestamp: time.Now(), withinWindow: true},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh))
})
}

}

// createDASerSubcomponents takes numGetter (number of headers
// to store in mockGetter) and numSub (number of headers to store
// in the mock header.Subscriber), returning a newly instantiated
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/celestiaorg/celestia-node

go 1.21.1

replace github.com/celestiaorg/go-header => /Users/rene/go/src/github.com/renaynay/go-header

require (
cosmossdk.io/errors v1.0.0
cosmossdk.io/math v1.2.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,6 @@ github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXv
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc=
github.com/celestiaorg/go-header v0.5.1 h1:1s1lw4fcCHalNK0qw/0a3cxg3ezx3Hl020znIxPZvtk=
github.com/celestiaorg/go-header v0.5.1/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
4 changes: 3 additions & 1 deletion libs/utils/resetctx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import "context"
import (
"context"
)

// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
Expand Down
23 changes: 23 additions & 0 deletions nodebuilder/prune/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package prune

import (
"github.com/ipfs/go-datastore"

hdr "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
)

func newPrunerService(
p pruner.Pruner,
window pruner.AvailabilityWindow,
getter hdr.Store[*header.ExtendedHeader],
ds datastore.Batching,
opts ...pruner.Option,
) *pruner.Service {
// TODO @renaynay: remove this once pruning implementation
opts = append(opts, pruner.WithDisabledGC())
return pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
}
10 changes: 8 additions & 2 deletions nodebuilder/prune/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prune

import (
"context"
"fmt"

"go.uber.org/fx"

Expand All @@ -12,16 +13,21 @@ import (
)

func ConstructModule(tp node.Type) fx.Option {
fmt.Print("\n\n\n\nconstructing pruning module\n\n\n\n")

baseComponents := fx.Options(
fx.Provide(fx.Annotate(
pruner.NewService,
newPrunerService,
fx.OnStart(func(ctx context.Context, p *pruner.Service) error {
return p.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, p *pruner.Service) error {
return p.Stop(ctx)
}),
)),
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(p *pruner.Service) {}),
)

switch tp {
Expand All @@ -39,7 +45,7 @@ func ConstructModule(tp node.Type) fx.Option {
fx.Provide(func() pruner.Pruner {
return light.NewPruner()
}),
fx.Supply(archival.Window), // TODO @renaynay: turn this into light.Window in following PR
fx.Supply(light.Window),
)
default:
panic("unknown node type")
Expand Down
84 changes: 84 additions & 0 deletions pruner/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package pruner

import (
"context"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/header"
)

var (
lastPrunedHeaderKey = datastore.NewKey("last_pruned_header")
)

type checkpoint struct {
ds datastore.Datastore

lastPrunedHeader atomic.Pointer[header.ExtendedHeader]

// TODO @renaynay: keep track of failed roots to retry in separate job
}

func newCheckpoint(ds datastore.Datastore) *checkpoint {
return &checkpoint{ds: ds}
}

// findPruneableHeaders returns all headers that are eligible for pruning
// (outside the sampling window).
func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) {
lastPruned := s.lastPruned()
pruneCutoff := time.Now().Add(time.Duration(-s.window))
estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow

headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight)
if err != nil {
return nil, err
}

// if our estimated range didn't cover enough headers, we need to fetch more
for {
lastHeader := headers[len(headers)-1]
if lastHeader.Time().After(pruneCutoff) {
break
}

nextHeader, err := s.getter.GetByHeight(ctx, lastHeader.Height()+1)
if err != nil {
return nil, err
}
headers = append(headers, nextHeader)
}

for i, h := range headers {
if h.Time().After(pruneCutoff) {
if i == 0 {
// we can't prune anything
return nil, nil
}

// we can ignore the rest of the headers since they are all newer than the cutoff
return headers[:i-1], nil
}
}
return headers, nil
}

// updateCheckpoint updates the checkpoint with the last pruned header height
// and persists it to disk.
func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error {
s.checkpoint.lastPrunedHeader.Store(lastPruned)

bin, err := lastPruned.MarshalJSON()
if err != nil {
return err
}

return s.checkpoint.ds.Put(ctx, lastPrunedHeaderKey, bin)
}

func (s *Service) lastPruned() *header.ExtendedHeader {
return s.checkpoint.lastPrunedHeader.Load()
}
2 changes: 2 additions & 0 deletions pruner/light/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ import (
"github.com/celestiaorg/celestia-node/pruner"
)

// Window is the availability window for light nodes in the Celestia
// network (30 days).
const Window = pruner.AvailabilityWindow(time.Second * 86400 * 30)
35 changes: 35 additions & 0 deletions pruner/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pruner

import (
"time"
)

type Option func(*Params)

type Params struct {
// gcCycle is the frequency at which the pruning Service
// runs the ticker. If set to 0, the Service will not run.
gcCycle time.Duration
}

func DefaultParams() Params {
return Params{
gcCycle: time.Hour,
}
}

// WithGCCycle configures how often the pruning Service
// triggers a pruning cycle.
func WithGCCycle(cycle time.Duration) Option {
return func(p *Params) {
p.gcCycle = cycle
}
}

// WithDisabledGC disables the pruning Service's pruning
// routine.
func WithDisabledGC() Option {
return func(p *Params) {
p.gcCycle = time.Duration(0)
}
}
98 changes: 94 additions & 4 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,114 @@ package pruner

import (
"context"
"fmt"
"time"

"github.com/ipfs/go-datastore"

hdr "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
)

// Service handles the pruning routine for the node using the
// prune Pruner.
type Service struct {
pruner Pruner
window AvailabilityWindow

getter hdr.Getter[*header.ExtendedHeader] // TODO @renaynay: expects a header service with access to sync head

checkpoint *checkpoint
numBlocksInWindow uint64

ctx context.Context
cancel context.CancelFunc
doneCh chan struct{}

params Params
}

func NewService(p Pruner) *Service {
func NewService(
p Pruner,
window AvailabilityWindow,
getter hdr.Getter[*header.ExtendedHeader],
ds datastore.Datastore,
blockTime time.Duration,
opts ...Option,
) *Service {
params := DefaultParams()
for _, opt := range opts {
opt(&params)
}

// TODO @renaynay
numBlocksInWindow := uint64(time.Duration(window) / blockTime)

return &Service{
pruner: p,
pruner: p,
window: window,
getter: getter,
checkpoint: newCheckpoint(ds),
numBlocksInWindow: numBlocksInWindow,
doneCh: make(chan struct{}),
params: params,
}
}

func (s *Service) Start(context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())

go s.prune()
return nil
}

func (s *Service) Stop(context.Context) error {
return nil
func (s *Service) Stop(ctx context.Context) error {
s.cancel()

select {
case <-s.doneCh:
return nil
case <-ctx.Done():
return fmt.Errorf("pruner unable to exit within context deadline")
}
}

func (s *Service) prune() {
if s.params.gcCycle == time.Duration(0) {
// Service is disabled, exit
close(s.doneCh)
return
}

ticker := time.NewTicker(s.params.gcCycle)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
close(s.doneCh)
return
case <-ticker.C:
headers, err := s.findPruneableHeaders(s.ctx)
if err != nil {
// TODO @renaynay: record + report errors properly
continue
}
// TODO @renaynay: make deadline a param ? / configurable?
pruneCtx, cancel := context.WithDeadline(s.ctx, time.Now().Add(time.Minute))
err = s.pruner.Prune(pruneCtx, headers...)
cancel()
if err != nil {
// TODO @renaynay: record + report errors properly
continue
}

err = s.updateCheckpoint(s.ctx, headers[len(headers)-1])
if err != nil {
// TODO @renaynay: record + report errors properly
continue
}
}
}
}
Loading

0 comments on commit 3a368b2

Please sign in to comment.