Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pruning): Full Node Pruner #3070

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !d.isWithinSamplingWindow(h) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return nil
}

Expand Down
37 changes: 37 additions & 0 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package das

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -244,6 +245,42 @@ func TestDASerSampleTimeout(t *testing.T) {
}
}

// TestDASer_SamplingWindow tests the sampling window determination
// for headers.
func TestDASer_SamplingWindow(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
fserv := &fraudtest.DummyService[*header.ExtendedHeader]{}
getter := getterStub{}
avail := mocks.NewMockAvailability(gomock.NewController(t))

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(time.Second))
require.NoError(t, err)

var tests = []struct {
timestamp time.Time
withinWindow bool
}{
{timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true},
{timestamp: time.Now().Add(-(time.Hour)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false},
{timestamp: time.Now(), withinWindow: true},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh))
})
}

}

// createDASerSubcomponents takes numGetter (number of headers
// to store in mockGetter) and numSub (number of headers to store
// in the mock header.Subscriber), returning a newly instantiated
Expand Down
24 changes: 20 additions & 4 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3), 10)
}

func NewCustomStore(
t *testing.T,
generator headertest.Generator[*header.ExtendedHeader],
numHeaders int,
) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders)
}

// NewTestSuite setups a new test suite with a given number of validators.
func NewTestSuite(t *testing.T, num int) *TestSuite {
valSet, vals := RandValidatorSet(num, 10)
Expand Down Expand Up @@ -77,8 +85,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader {
return eh
}

func MakeCommit(blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) {
func MakeCommit(
blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time,
) (*types.Commit, error) {

// all sign
for i := 0; i < len(validators); i++ {
Expand Down Expand Up @@ -152,7 +162,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader {
}

func (s *TestSuite) GenRawHeader(
height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader {
height uint64, lastHeader, lastCommit, dataHash libhead.Hash,
) *header.RawHeader {
rh := RandRawHeader(s.t)
rh.Height = int64(height)
rh.Time = time.Now()
Expand Down Expand Up @@ -204,6 +215,11 @@ func (s *TestSuite) nextProposer() *types.Validator {

// RandExtendedHeader provides an ExtendedHeader fixture.
func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
timestamp := time.Now()
return RandExtendedHeaderAtTimestamp(t, timestamp)
}

func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader {
dah := share.EmptyRoot()

rh := RandRawHeader(t)
Expand All @@ -214,7 +230,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet)
blockID := RandBlockID(t)
blockID.Hash = rh.Hash()
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now())
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp)
require.NoError(t, err)

return &header.ExtendedHeader{
Expand Down
4 changes: 3 additions & 1 deletion libs/utils/resetctx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import "context"
import (
"context"
)

// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
Expand Down
23 changes: 23 additions & 0 deletions nodebuilder/prune/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package prune

import (
"github.com/ipfs/go-datastore"

hdr "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
)

func newPrunerService(
p pruner.Pruner,
window pruner.AvailabilityWindow,
getter hdr.Store[*header.ExtendedHeader],
ds datastore.Batching,
opts ...pruner.Option,
) *pruner.Service {
// TODO @renaynay: remove this once pruning implementation
opts = append(opts, pruner.WithDisabledGC())
return pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
}
20 changes: 17 additions & 3 deletions nodebuilder/prune/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,38 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share/eds"
)

func ConstructModule(tp node.Type) fx.Option {

baseComponents := fx.Options(
fx.Provide(fx.Annotate(
pruner.NewService,
newPrunerService,
fx.OnStart(func(ctx context.Context, p *pruner.Service) error {
return p.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, p *pruner.Service) error {
return p.Stop(ctx)
}),
)),
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(p *pruner.Service) {}),
)

switch tp {
case node.Full, node.Bridge:
case node.Full:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
)
case node.Bridge:
return fx.Module("prune",
baseComponents,
fx.Provide(func() pruner.Pruner {
Expand All @@ -39,7 +53,7 @@ func ConstructModule(tp node.Type) fx.Option {
fx.Provide(func() pruner.Pruner {
return light.NewPruner()
}),
fx.Supply(archival.Window), // TODO @renaynay: turn this into light.Window in following PR
fx.Supply(light.Window),
)
default:
panic("unknown node type")
Expand Down
2 changes: 2 additions & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/state"
)

Expand Down Expand Up @@ -93,6 +94,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
fx.Invoke(pruner.WithPrunerMetrics),
)

samplingMetrics := fx.Options(
Expand Down
2 changes: 1 addition & 1 deletion pruner/archival/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ func NewPruner() *Pruner {
return &Pruner{}
}

func (p *Pruner) Prune(context.Context, ...*header.ExtendedHeader) error {
func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error {
return nil
}
132 changes: 132 additions & 0 deletions pruner/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package pruner

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/header"
)

var (
lastPrunedHeaderKey = datastore.NewKey("last_pruned_header")
)

type checkpoint struct {
ds datastore.Datastore

lastPrunedHeader atomic.Pointer[header.ExtendedHeader]

// TODO @renaynay: keep track of failed roots to retry in separate job
}

func newCheckpoint(ds datastore.Datastore) *checkpoint {
return &checkpoint{ds: ds}
}

// findPruneableHeaders returns all headers that are eligible for pruning
// (outside the sampling window).
func (s *Service) findPruneableHeaders(ctx context.Context) ([]*header.ExtendedHeader, error) {
lastPruned := s.lastPruned()

pruneCutoff := time.Now().Add(time.Duration(-s.window))
estimatedCutoffHeight := lastPruned.Height() + s.numBlocksInWindow

head, err := s.getter.Head(ctx)
if err != nil {
return nil, err
}
if head.Height() < estimatedCutoffHeight {
estimatedCutoffHeight = head.Height()
}

headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight)
if err != nil {
return nil, err
}

// if our estimated range didn't cover enough headers, we need to fetch more
// TODO: This is really inefficient in the case that lastPruned is the default value, or if the
// node has been offline for a long time. Instead of increasing the boundary by one in the for
// loop we could increase by a range every iteration
headerCount := len(headers)
for {
if headerCount > int(s.maxPruneablePerGC) {
headers = headers[:s.maxPruneablePerGC]
break
}
lastHeader := headers[len(headers)-1]
if lastHeader.Time().After(pruneCutoff) {
break
}

nextHeader, err := s.getter.GetByHeight(ctx, lastHeader.Height()+1)
if err != nil {
return nil, err
}
headers = append(headers, nextHeader)
headerCount++
}

for i, h := range headers {
if h.Time().After(pruneCutoff) {
if i == 0 {
// we can't prune anything
return nil, nil
}

// we can ignore the rest of the headers since they are all newer than the cutoff
return headers[:i], nil
}
}
return headers, nil
}

// initializeCheckpoint initializes the checkpoint, storing the earliest header in the chain.
func (s *Service) initializeCheckpoint(ctx context.Context) error {
firstHeader, err := s.getter.GetByHeight(ctx, 1)
if err != nil {
return fmt.Errorf("failed to initialize checkpoint: %w", err)
}

return s.updateCheckpoint(ctx, firstHeader)
}

// loadCheckpoint loads the last checkpoint from disk, initializing it if it does not already exist.
func (s *Service) loadCheckpoint(ctx context.Context) error {
bin, err := s.checkpoint.ds.Get(ctx, lastPrunedHeaderKey)
if err != nil {
if err == datastore.ErrNotFound {
return s.initializeCheckpoint(ctx)
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}

var lastPruned header.ExtendedHeader
if err := lastPruned.UnmarshalJSON(bin); err != nil {
return fmt.Errorf("failed to load checkpoint: %w", err)
}

s.checkpoint.lastPrunedHeader.Store(&lastPruned)
return nil
}

// updateCheckpoint updates the checkpoint with the last pruned header height
// and persists it to disk.
func (s *Service) updateCheckpoint(ctx context.Context, lastPruned *header.ExtendedHeader) error {
s.checkpoint.lastPrunedHeader.Store(lastPruned)

bin, err := lastPruned.MarshalJSON()
if err != nil {
return err
}

return s.checkpoint.ds.Put(ctx, lastPrunedHeaderKey, bin)
}

func (s *Service) lastPruned() *header.ExtendedHeader {
return s.checkpoint.lastPrunedHeader.Load()
}
33 changes: 33 additions & 0 deletions pruner/full/pruner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package full

import (
"context"

logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

var log = logging.Logger("pruner/full")

type Pruner struct {
store *eds.Store
}

func NewPruner(store *eds.Store) *Pruner {
return &Pruner{
store: store,
}
}

func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error {
// short circuit on empty roots
if eh.DAH.Equals(share.EmptyRoot()) {
return nil
}

log.Debugf("pruning header %s", eh.DAH.Hash())
return p.store.Remove(ctx, eh.DAH.Hash())
}
Loading
Loading