From be75628a38ebaef9d75ccdd6c01dafdcf85b86fc Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:00:32 +0100 Subject: [PATCH] refactor(share/availability): Move window and constants to share/availability pkg (#3906) --- core/eds.go | 10 +++--- core/exchange.go | 3 +- core/exchange_test.go | 3 +- core/listener.go | 3 +- core/listener_test.go | 3 +- core/option.go | 10 +++--- das/daser.go | 4 +-- das/daser_test.go | 6 ++-- das/options.go | 6 ++-- nodebuilder/das/constructors.go | 6 ++-- nodebuilder/pruner/constructors.go | 5 +-- nodebuilder/pruner/module.go | 36 ++++++++++++++----- nodebuilder/share/constructors.go | 5 ++- nodebuilder/share/module.go | 8 ++--- nodebuilder/share/p2p_constructors.go | 18 +++++----- nodebuilder/share/window.go | 13 +++++++ nodebuilder/tests/prune_test.go | 2 +- pruner/archival/window.go | 5 --- pruner/find.go | 2 +- pruner/full/window.go | 12 ------- pruner/full/window_test.go | 14 -------- pruner/light/window.go | 11 ------ pruner/light/window_test.go | 14 -------- pruner/service.go | 4 +-- pruner/service_test.go | 20 +++++------ pruner/window.go | 21 ----------- share/availability/full/availability.go | 5 ++- share/availability/window.go | 18 ++++++++++ share/availability/window_test.go | 15 ++++++++ share/shwap/p2p/bitswap/getter.go | 9 ++--- share/shwap/p2p/shrex/shrex_getter/shrex.go | 8 ++--- .../p2p/shrex/shrex_getter/shrex_test.go | 7 ++-- 32 files changed, 141 insertions(+), 165 deletions(-) create mode 100644 nodebuilder/share/window.go delete mode 100644 pruner/archival/window.go delete mode 100644 pruner/full/window.go delete mode 100644 pruner/full/window_test.go delete mode 100644 pruner/light/window.go delete mode 100644 pruner/light/window_test.go delete mode 100644 pruner/window.go create mode 100644 share/availability/window.go create mode 100644 share/availability/window_test.go diff --git a/core/eds.go b/core/eds.go index e2cb6fef4d..9928f96a02 100644 --- a/core/eds.go +++ b/core/eds.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "time" "github.com/tendermint/tendermint/types" @@ -15,9 +16,8 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/store" ) @@ -61,16 +61,16 @@ func storeEDS( eh *header.ExtendedHeader, eds *rsmt2d.ExtendedDataSquare, store *store.Store, - window pruner.AvailabilityWindow, + window time.Duration, ) error { - if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) { + if !availability.IsWithinWindow(eh.Time(), window) { log.Debugw("skipping storage of historic block", "height", eh.Height()) return nil } var err error // archival nodes should not store Q4 outside the availability window. - if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) { + if availability.IsWithinWindow(eh.Time(), availability.StorageWindow) { err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds) } else { err = store.PutODS(ctx, eh.DAH, eh.Height(), eds) diff --git a/core/exchange.go b/core/exchange.go index bcf29842e3..a813955968 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -12,7 +12,6 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/store" ) @@ -23,7 +22,7 @@ type Exchange struct { store *store.Store construct header.ConstructFn - availabilityWindow pruner.AvailabilityWindow + availabilityWindow time.Duration metrics *exchangeMetrics } diff --git a/core/exchange_test.go b/core/exchange_test.go index 072493d4f3..f7e69be8a4 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -13,7 +13,6 @@ import ( "github.com/celestiaorg/celestia-app/v3/test/util/testnode" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/store" ) @@ -82,7 +81,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { fetcher, store, header.MakeExtendedHeader, - WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic" + WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic" ) require.NoError(t, err) diff --git a/core/listener.go b/core/listener.go index 67e532d970..1d7dc1b061 100644 --- a/core/listener.go +++ b/core/listener.go @@ -14,7 +14,6 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" "github.com/celestiaorg/celestia-node/store" ) @@ -38,7 +37,7 @@ type Listener struct { construct header.ConstructFn store *store.Store - availabilityWindow pruner.AvailabilityWindow + availabilityWindow time.Duration headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader] hashBroadcaster shrexsub.BroadcastFn diff --git a/core/listener_test.go b/core/listener_test.go index 252c6e88c1..99c6ea1a28 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -15,7 +15,6 @@ import ( "github.com/celestiaorg/celestia-node/header" nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" "github.com/celestiaorg/celestia-node/store" ) @@ -118,7 +117,7 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) { require.NoError(t, err) // create Listener and start listening - opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)) + opt := WithAvailabilityWindow(time.Nanosecond) cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt) dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) diff --git a/core/option.go b/core/option.go index e209d75aca..3e9b5a8e20 100644 --- a/core/option.go +++ b/core/option.go @@ -1,9 +1,9 @@ package core import ( + "time" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/archival" ) type Option func(*params) @@ -11,12 +11,12 @@ type Option func(*params) type params struct { metrics bool chainID string - availabilityWindow pruner.AvailabilityWindow + availabilityWindow time.Duration } func defaultParams() params { return params{ - availabilityWindow: archival.Window, + availabilityWindow: time.Duration(0), } } @@ -34,7 +34,7 @@ func WithChainID(id p2p.Network) Option { } } -func WithAvailabilityWindow(window pruner.AvailabilityWindow) Option { +func WithAvailabilityWindow(window time.Duration) Option { return func(p *params) { p.availabilityWindow = window } diff --git a/das/daser.go b/das/daser.go index 2b0b84ba29..fa41bd56ad 100644 --- a/das/daser.go +++ b/das/daser.go @@ -13,8 +13,8 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error { func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error { // short-circuit if pruning is enabled and the header is outside the // availability window - if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) { + if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) { log.Debugw("skipping header outside sampling window", "height", h.Height(), "time", h.Time()) return errOutsideSamplingWindow diff --git a/das/daser_test.go b/das/daser_test.go index c67e5c06e2..2cdc43497a 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -19,8 +19,8 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/availability/mocks" "github.com/celestiaorg/celestia-node/share/eds/edstest" ) @@ -254,7 +254,7 @@ func TestDASer_SamplingWindow(t *testing.T) { // create and start DASer daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1), - WithSamplingWindow(pruner.AvailabilityWindow(time.Second))) + WithSamplingWindow(time.Second)) require.NoError(t, err) tests := []struct { @@ -276,7 +276,7 @@ func TestDASer_SamplingWindow(t *testing.T) { assert.Equal( t, tt.withinWindow, - pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow), + availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow), ) }) } diff --git a/das/options.go b/das/options.go index d70d8fdd04..6a665d5577 100644 --- a/das/options.go +++ b/das/options.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "time" - - "github.com/celestiaorg/celestia-node/pruner" ) // ErrInvalidOption is an error that is returned by Parameters.Validate @@ -47,7 +45,7 @@ type Parameters struct { // samplingWindow determines the time window that headers should fall into // in order to be sampled. If set to 0, the sampling window will include // all headers. - samplingWindow pruner.AvailabilityWindow + samplingWindow time.Duration } // DefaultParameters returns the default configuration values for the daser parameters @@ -166,7 +164,7 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option { // WithSamplingWindow is a functional option to configure the DASer's // `samplingWindow` parameter. -func WithSamplingWindow(samplingWindow pruner.AvailabilityWindow) Option { +func WithSamplingWindow(samplingWindow time.Duration) Option { return func(d *DASer) { d.params.samplingWindow = samplingWindow } diff --git a/nodebuilder/das/constructors.go b/nodebuilder/das/constructors.go index 8d6f9d1168..b9b7bdf100 100644 --- a/nodebuilder/das/constructors.go +++ b/nodebuilder/das/constructors.go @@ -12,7 +12,7 @@ import ( "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/header" modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" - "github.com/celestiaorg/celestia-node/pruner" + modshare "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" @@ -45,10 +45,10 @@ func newDASer( batching datastore.Batching, fraudServ fraud.Service[*header.ExtendedHeader], bFn shrexsub.BroadcastFn, - availWindow pruner.AvailabilityWindow, + availWindow modshare.Window, options ...das.Option, ) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) { - options = append(options, das.WithSamplingWindow(availWindow)) + options = append(options, das.WithSamplingWindow(availWindow.Duration())) ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) if err != nil { diff --git a/nodebuilder/pruner/constructors.go b/nodebuilder/pruner/constructors.go index 4b3fa44f79..e391c82f60 100644 --- a/nodebuilder/pruner/constructors.go +++ b/nodebuilder/pruner/constructors.go @@ -7,17 +7,18 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + modshare "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/pruner" ) func newPrunerService( p pruner.Pruner, - window pruner.AvailabilityWindow, + window modshare.Window, getter libhead.Store[*header.ExtendedHeader], ds datastore.Batching, opts ...pruner.Option, ) (*pruner.Service, error) { - serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...) + serv, err := pruner.NewService(p, window.Duration(), getter, ds, p2p.BlockTime, opts...) if err != nil { return nil, err } diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 958fc4e63c..b398ee734e 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -2,6 +2,7 @@ package pruner import ( "context" + "time" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -10,10 +11,12 @@ import ( "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/libs/fxutil" "github.com/celestiaorg/celestia-node/nodebuilder/node" + modshare "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/archival" "github.com/celestiaorg/celestia-node/pruner/full" - "github.com/celestiaorg/celestia-node/pruner/light" + "github.com/celestiaorg/celestia-node/share/availability" + "github.com/celestiaorg/celestia-node/share/availability/light" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" ) var log = logging.Logger("module/pruner") @@ -22,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents := fx.Options( fx.Supply(cfg), availWindow(tp, cfg.EnableService), + advertiseArchival(tp, cfg), ) prunerService := fx.Options( @@ -45,6 +49,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { return fx.Module("prune", baseComponents, prunerService, + // TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged + // note this provide exists in pruner module to avoid cyclical imports + fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }), ) } // We do not trigger DetectPreviousRun for Light nodes, to allow them to disable pruning at wish. @@ -73,8 +80,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, prunerService, fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)), - fx.Provide(func(window pruner.AvailabilityWindow) []core.Option { - return []core.Option{core.WithAvailabilityWindow(window)} + fx.Provide(func(window modshare.Window) []core.Option { + return []core.Option{core.WithAvailabilityWindow(window.Duration())} }), ) } @@ -92,20 +99,31 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { } } +func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option { + if (tp == node.Full || tp == node.Bridge) && !pruneCfg.EnableService { + return fx.Supply(discovery.WithAdvertise()) + } + return fx.Provide(func() discovery.Option { + var opt discovery.Option + return opt + }) +} + func availWindow(tp node.Type, pruneEnabled bool) fx.Option { switch tp { case node.Light: // light nodes are still subject to sampling within window // even if pruning is not enabled. - return fx.Provide(func() pruner.AvailabilityWindow { - return light.Window + return fx.Provide(func() modshare.Window { + return modshare.Window(availability.StorageWindow) }) case node.Full, node.Bridge: - return fx.Provide(func() pruner.AvailabilityWindow { + return fx.Provide(func() modshare.Window { if pruneEnabled { - return full.Window + return modshare.Window(availability.StorageWindow) } - return archival.Window + // implicitly disable pruning by setting the window to 0 + return modshare.Window(time.Duration(0)) }) default: panic("unknown node type") diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 62e37477e6..0efa6efa19 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -6,7 +6,6 @@ import ( "go.uber.org/fx" headerServ "github.com/celestiaorg/celestia-node/nodebuilder/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/getters" @@ -23,9 +22,9 @@ func bitswapGetter( lc fx.Lifecycle, exchange exchange.SessionExchange, bstore blockstore.Blockstore, - wndw pruner.AvailabilityWindow, + wndw Window, ) *bitswap.Getter { - getter := bitswap.NewGetter(exchange, bstore, wndw) + getter := bitswap.NewGetter(exchange, bstore, wndw.Duration()) lc.Append(fx.StartStopHook(getter.Start, getter.Stop)) return getter } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index d0c57ca176..adeb8cd330 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -11,9 +11,8 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/node" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/pruner" - lightprune "github.com/celestiaorg/celestia-node/pruner/light" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/shwap" @@ -39,7 +38,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option availabilityComponents(tp, cfg), shrexComponents(tp, cfg), bitswapComponents(tp, cfg), - peerComponents(tp, cfg), + peerManagementComponents(tp, cfg), ) switch tp { @@ -118,7 +117,7 @@ func shrexComponents(tp node.Type, cfg *Config) fx.Option { ndClient, managers[fullNodesTag], managers[archivalNodesTag], - lightprune.Window, + availability.RequestWindow, ) }, fx.OnStart(func(ctx context.Context, getter *shrex_getter.Getter) error { @@ -233,7 +232,6 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option { }, fx.As(fx.Self()), fx.As(new(share.Availability)), - fx.As(new(pruner.Pruner)), // TODO(@walldiss): remove conversion after Availability and Pruner interfaces are merged fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error { return la.Close(ctx) }), diff --git a/nodebuilder/share/p2p_constructors.go b/nodebuilder/share/p2p_constructors.go index 576db73c97..d08d3891d3 100644 --- a/nodebuilder/share/p2p_constructors.go +++ b/nodebuilder/share/p2p_constructors.go @@ -13,7 +13,6 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" - modprune "github.com/celestiaorg/celestia-node/nodebuilder/pruner" "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" @@ -31,12 +30,11 @@ const ( protocolVersion = "v0.1.0" ) -// TODO @renaynay: rename -func peerComponents(tp node.Type, cfg *Config) fx.Option { +func peerManagementComponents(tp node.Type, cfg *Config) fx.Option { return fx.Options( fx.Provide(routingDiscovery), fullDiscoveryAndPeerManager(tp, cfg), - archivalDiscoveryAndPeerManager(tp, cfg), + archivalDiscoveryAndPeerManager(cfg), ) } @@ -101,17 +99,18 @@ func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option { }) } -// archivalDiscoveryAndPeerManager TODO @renaynay -func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option { +// archivalDiscoveryAndPeerManager builds the discovery instance and peer manager +// for discovering and managing peers advertising on the `archival` tag +func archivalDiscoveryAndPeerManager(cfg *Config) fx.Option { return fx.Provide( func( lc fx.Lifecycle, - pruneCfg *modprune.Config, fullDisc *discovery.Discovery, fullManager *peers.Manager, h host.Host, disc p2pdisc.Discovery, gater *conngater.BasicConnectionGater, + discOpt discovery.Option, ) (map[string]*peers.Manager, []*discovery.Discovery, error) { archivalPeerManager, err := peers.NewManager( *cfg.PeerManagerParams, @@ -124,9 +123,8 @@ func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option { } discOpts := []discovery.Option{discovery.WithOnPeersUpdate(archivalPeerManager.UpdateNodePool)} - - if (tp == node.Bridge || tp == node.Full) && !pruneCfg.EnableService { - discOpts = append(discOpts, discovery.WithAdvertise()) + if discOpt != nil { + discOpts = append(discOpts, discOpt) } archivalDisc, err := discovery.NewDiscovery( diff --git a/nodebuilder/share/window.go b/nodebuilder/share/window.go new file mode 100644 index 0000000000..0f41543caf --- /dev/null +++ b/nodebuilder/share/window.go @@ -0,0 +1,13 @@ +package share + +import ( + "time" +) + +// Window is a type alias for time.Duration used in nodebuilder +// to provide sampling windows to their relevant components +type Window time.Duration + +func (w Window) Duration() time.Duration { + return time.Duration(w) +} diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index 64f0f0596a..a92117b7f2 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -73,7 +73,7 @@ func TestArchivalBlobSync(t *testing.T) { pruningCfg := nodebuilder.DefaultConfig(node.Bridge) pruningCfg.Pruner.EnableService = true - testAvailWindow := pruner.AvailabilityWindow(time.Millisecond) + testAvailWindow := time.Millisecond prunerOpts := fx.Options( fx.Replace(testAvailWindow), fxutil.ReplaceAs(func( diff --git a/pruner/archival/window.go b/pruner/archival/window.go deleted file mode 100644 index b89a779816..0000000000 --- a/pruner/archival/window.go +++ /dev/null @@ -1,5 +0,0 @@ -package archival - -import "github.com/celestiaorg/celestia-node/pruner" - -const Window = pruner.AvailabilityWindow(0) diff --git a/pruner/find.go b/pruner/find.go index 0104f24e78..aba621ccbc 100644 --- a/pruner/find.go +++ b/pruner/find.go @@ -18,7 +18,7 @@ func (s *Service) findPruneableHeaders( ctx context.Context, lastPruned *header.ExtendedHeader, ) ([]*header.ExtendedHeader, error) { - pruneCutoff := time.Now().UTC().Add(-s.window.Duration()) + pruneCutoff := time.Now().UTC().Add(-s.window) if !lastPruned.Time().UTC().Before(pruneCutoff) { // this can happen when the network is young and all blocks diff --git a/pruner/full/window.go b/pruner/full/window.go deleted file mode 100644 index 4ad69234e2..0000000000 --- a/pruner/full/window.go +++ /dev/null @@ -1,12 +0,0 @@ -package full - -import ( - "time" - - "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/light" -) - -// Window is the availability window for light nodes in the Celestia -// network (30 days + 1 hour). -const Window = pruner.AvailabilityWindow(time.Duration(light.Window) + time.Hour) diff --git a/pruner/full/window_test.go b/pruner/full/window_test.go deleted file mode 100644 index 9ee6d7c89a..0000000000 --- a/pruner/full/window_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package full - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// TestFullWindowConst exists to ensure that any changes to the sampling window -// are deliberate. -func TestFullWindowConst(t *testing.T) { - assert.Equal(t, Window.Duration(), (30*24*time.Hour)+time.Hour) -} diff --git a/pruner/light/window.go b/pruner/light/window.go deleted file mode 100644 index eeabb0fda7..0000000000 --- a/pruner/light/window.go +++ /dev/null @@ -1,11 +0,0 @@ -package light - -import ( - "time" - - "github.com/celestiaorg/celestia-node/pruner" -) - -// Window is the availability window for light nodes in the Celestia -// network (30 days). -const Window = pruner.AvailabilityWindow(30 * 24 * time.Hour) diff --git a/pruner/light/window_test.go b/pruner/light/window_test.go deleted file mode 100644 index 764be542fd..0000000000 --- a/pruner/light/window_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package light - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// TestLightWindowConst exists to ensure that any changes to the sampling window -// are deliberate. -func TestLightWindowConst(t *testing.T) { - assert.Equal(t, Window.Duration(), 30*24*time.Hour) -} diff --git a/pruner/service.go b/pruner/service.go index 8ba6ccfc24..9a617a486c 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -19,7 +19,7 @@ var log = logging.Logger("pruner/service") // Service handles running the pruning cycle for the node. type Service struct { pruner Pruner - window AvailabilityWindow + window time.Duration getter libhead.Getter[*header.ExtendedHeader] @@ -38,7 +38,7 @@ type Service struct { func NewService( p Pruner, - window AvailabilityWindow, + window time.Duration, getter libhead.Getter[*header.ExtendedHeader], ds datastore.Datastore, blockTime time.Duration, diff --git a/pruner/service_test.go b/pruner/service_test.go index c1be1bc74e..c7ebeac03a 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -40,7 +40,7 @@ func TestService(t *testing.T) { serv, err := NewService( mp, - AvailabilityWindow(time.Millisecond*2), + time.Millisecond*2, store, sync.MutexWrap(datastore.NewMapDatastore()), blockTime, @@ -82,7 +82,7 @@ func TestService_FailedAreRecorded(t *testing.T) { serv, err := NewService( mp, - AvailabilityWindow(time.Millisecond*20), + time.Millisecond*20, store, sync.MutexWrap(datastore.NewMapDatastore()), blockTime, @@ -127,7 +127,7 @@ func TestServiceCheckpointing(t *testing.T) { serv, err := NewService( mp, - AvailabilityWindow(time.Second), + time.Second, store, sync.MutexWrap(datastore.NewMapDatastore()), time.Millisecond, @@ -164,7 +164,7 @@ func TestPrune_LargeNumberOfBlocks(t *testing.T) { t.Cleanup(func() { maxHeadersPerLoop = maxHeadersPerLoopOld }) blockTime := time.Nanosecond - availabilityWindow := AvailabilityWindow(blockTime * 10) + availabilityWindow := blockTime * 10 // all headers generated in suite are timestamped to time.Now(), so // they will all be considered "pruneable" within the availability window @@ -187,7 +187,7 @@ func TestPrune_LargeNumberOfBlocks(t *testing.T) { require.NoError(t, err) // ensures availability window has passed - time.Sleep(availabilityWindow.Duration() + time.Millisecond*100) + time.Sleep(availabilityWindow + time.Millisecond*100) // trigger a prune job lastPruned, err := serv.lastPruned(ctx) @@ -202,7 +202,7 @@ func TestPrune_LargeNumberOfBlocks(t *testing.T) { func TestFindPruneableHeaders(t *testing.T) { testCases := []struct { name string - availWindow AvailabilityWindow + availWindow time.Duration blockTime time.Duration startTime time.Time headerAmount int @@ -211,7 +211,7 @@ func TestFindPruneableHeaders(t *testing.T) { { name: "Estimated range matches expected", // Availability window is one week - availWindow: AvailabilityWindow(time.Hour * 24 * 7), + availWindow: time.Hour * 24 * 7, blockTime: time.Hour, // Make two weeks of headers headerAmount: 2 * (24 * 7), @@ -222,7 +222,7 @@ func TestFindPruneableHeaders(t *testing.T) { { name: "Estimated range not sufficient but finds the correct tail", // Availability window is one week - availWindow: AvailabilityWindow(time.Hour * 24 * 7), + availWindow: time.Hour * 24 * 7, blockTime: time.Hour, // Make three weeks of headers headerAmount: 3 * (24 * 7), @@ -233,7 +233,7 @@ func TestFindPruneableHeaders(t *testing.T) { { name: "No pruneable headers", // Availability window is two weeks - availWindow: AvailabilityWindow(2 * time.Hour * 24 * 7), + availWindow: 2 * time.Hour * 24 * 7, blockTime: time.Hour, // Make one week of headers headerAmount: 24 * 7, @@ -272,7 +272,7 @@ func TestFindPruneableHeaders(t *testing.T) { require.NoError(t, err) require.Len(t, pruneable, tc.expectedLength) - pruneableCutoff := time.Now().Add(-tc.availWindow.Duration()) + pruneableCutoff := time.Now().Add(-tc.availWindow) // All returned headers are older than the availability window for _, h := range pruneable { require.WithinRange(t, h.Time(), tc.startTime, pruneableCutoff) diff --git a/pruner/window.go b/pruner/window.go deleted file mode 100644 index 4d7ba5c776..0000000000 --- a/pruner/window.go +++ /dev/null @@ -1,21 +0,0 @@ -package pruner - -import ( - "time" -) - -type AvailabilityWindow time.Duration - -func (aw AvailabilityWindow) Duration() time.Duration { - return time.Duration(aw) -} - -// IsWithinAvailabilityWindow checks whether the given timestamp is within the -// given AvailabilityWindow. If the window is disabled (0), it returns true for -// every timestamp. -func IsWithinAvailabilityWindow(t time.Time, window AvailabilityWindow) bool { - if window.Duration() == time.Duration(0) { - return true - } - return time.Since(t) <= window.Duration() -} diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 4347d35b36..b95a648d5d 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -8,9 +8,8 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/store" @@ -69,7 +68,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header } // archival nodes should not store Q4 outside the availability window. - if pruner.IsWithinAvailabilityWindow(header.Time(), full.Window) { + if availability.IsWithinWindow(header.Time(), availability.StorageWindow) { err = fa.store.PutODSQ4(ctx, dah, header.Height(), eds) } else { err = fa.store.PutODS(ctx, dah, header.Height(), eds) diff --git a/share/availability/window.go b/share/availability/window.go new file mode 100644 index 0000000000..4832ec16f8 --- /dev/null +++ b/share/availability/window.go @@ -0,0 +1,18 @@ +package availability + +import "time" + +const ( + RequestWindow = 30 * 24 * time.Hour + StorageWindow = RequestWindow + time.Hour +) + +// IsWithinWindow checks whether the given timestamp is within the +// given AvailabilityWindow. If the window is disabled (0), it returns true for +// every timestamp. +func IsWithinWindow(t time.Time, window time.Duration) bool { + if window == time.Duration(0) { + return true + } + return time.Since(t) <= window +} diff --git a/share/availability/window_test.go b/share/availability/window_test.go new file mode 100644 index 0000000000..af1f95c4d6 --- /dev/null +++ b/share/availability/window_test.go @@ -0,0 +1,15 @@ +package availability + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestConsts exists to ensure that any changes to the sampling windows +// are deliberate. +func TestConsts(t *testing.T) { + assert.Equal(t, RequestWindow, 30*24*time.Hour) + assert.Equal(t, StorageWindow, (30*24*time.Hour)+time.Hour) +} diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index c3ffde7965..eaba3a0f39 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -3,6 +3,7 @@ package bitswap import ( "context" "fmt" + "time" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" @@ -16,8 +17,8 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/shwap" ) @@ -27,7 +28,7 @@ var tracer = otel.Tracer("shwap/bitswap") type Getter struct { exchange exchange.SessionExchange bstore blockstore.Blockstore - availWndw pruner.AvailabilityWindow + availWndw time.Duration availableSession exchange.Fetcher archivalSession exchange.Fetcher @@ -39,7 +40,7 @@ type Getter struct { func NewGetter( exchange exchange.SessionExchange, bstore blockstore.Blockstore, - availWndw pruner.AvailabilityWindow, + availWndw time.Duration, ) *Getter { return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw} } @@ -233,7 +234,7 @@ func (g *Getter) GetNamespaceData( func (g *Getter) session(ctx context.Context, hdr *header.ExtendedHeader) exchange.Fetcher { session := g.archivalSession - isWithinAvailability := pruner.IsWithinAvailabilityWindow(hdr.Time(), g.availWndw) + isWithinAvailability := availability.IsWithinWindow(hdr.Time(), g.availWndw) if isWithinAvailability { session = g.availableSession } diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex.go b/share/shwap/p2p/shrex/shrex_getter/shrex.go index a6cc1b88fa..f51328136c 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex.go @@ -18,8 +18,8 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/utils" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" @@ -105,7 +105,7 @@ type Getter struct { // attempt multiple peers in scope of one request before context timeout is reached minAttemptsCount int - availabilityWindow pruner.AvailabilityWindow + availabilityWindow time.Duration metrics *metrics } @@ -115,7 +115,7 @@ func NewGetter( ndClient *shrexnd.Client, fullPeerManager *peers.Manager, archivalManager *peers.Manager, - availWindow pruner.AvailabilityWindow, + availWindow time.Duration, ) *Getter { s := &Getter{ edsClient: edsClient, @@ -306,7 +306,7 @@ func (sg *Getter) getPeer( ctx context.Context, header *header.ExtendedHeader, ) (libpeer.ID, peers.DoneFunc, error) { - if !pruner.IsWithinAvailabilityWindow(header.Time(), sg.availabilityWindow) { + if !availability.IsWithinWindow(header.Time(), sg.availabilityWindow) { p, df, err := sg.archivalPeerManager.Peer(ctx, header.DAH.Hash(), header.Height()) return p, df, err } diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex_test.go b/share/shwap/p2p/shrex/shrex_getter/shrex_test.go index 59b869de49..25298a940c 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex_test.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex_test.go @@ -21,9 +21,8 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/pruner/full" - "github.com/celestiaorg/celestia-node/pruner/light" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/eds/edstest" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" @@ -57,7 +56,7 @@ func TestShrexGetter(t *testing.T) { archivalPeerManager, err := testManager(ctx, clHost, sub) require.NoError(t, err) - getter := NewGetter(edsClient, ndClient, fullPeerManager, archivalPeerManager, light.Window) + getter := NewGetter(edsClient, ndClient, fullPeerManager, archivalPeerManager, availability.RequestWindow) require.NoError(t, getter.Start(ctx)) height := atomic.Uint64{} @@ -262,7 +261,7 @@ func TestShrexGetter(t *testing.T) { eh.RawHeader.Height = int64(height) // historical data expects an archival peer - eh.RawHeader.Time = time.Now().Add(-(time.Duration(full.Window) + time.Second)) + eh.RawHeader.Time = time.Now().Add(-(availability.StorageWindow + time.Second)) id, _, err := getter.getPeer(ctx, eh) require.NoError(t, err) assert.Equal(t, archivalPeer.ID(), id)