From 1809680284c15eb0171d825d8ba91fca308f44bf Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 21 Feb 2024 14:19:24 +0100 Subject: [PATCH] chore(nodebuilder/share): Make module construction more digestable (#3170) I tried to clean up the share module construction a bit so that it's easier to parse. Hopefully this is more helpful than the mess that was there prior. **Also please carefully review this PR!!** I tried my best to make sure that all components are added where they are supposed to be but it's possible I may have missed something (although all related units are passing). --- api/gateway/health.go | 4 +- nodebuilder/share/constructors.go | 2 +- nodebuilder/share/module.go | 247 ++++++++++++++++++------------ 3 files changed, 153 insertions(+), 100 deletions(-) diff --git a/api/gateway/health.go b/api/gateway/health.go index 0ddb56bb67..2a96e0200e 100644 --- a/api/gateway/health.go +++ b/api/gateway/health.go @@ -1,6 +1,8 @@ package gateway -import "net/http" +import ( + "net/http" +) const ( healthEndpoint = "/status/health" diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index aa2ac5bec1..e13786a4d9 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -42,7 +42,7 @@ func newDiscovery(cfg *disc.Parameters, } } -func newModule(getter share.Getter, avail share.Availability) Module { +func newShareModule(getter share.Getter, avail share.Availability) Module { return &module{getter, avail} } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index 3fa55b2d35..7caaf39a92 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -34,7 +34,72 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Supply(*cfg), fx.Error(cfgErr), fx.Options(options...), - fx.Provide(newModule), + fx.Provide(newShareModule), + peerManagerComponents(tp, cfg), + discoveryComponents(cfg), + shrexSubComponents(), + ) + + bridgeAndFullComponents := fx.Options( + fx.Provide(getters.NewStoreGetter), + shrexServerComponents(cfg), + edsStoreComponents(cfg), + fullAvailabilityComponents(), + shrexGetterComponents(cfg), + fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn { + return shrexSub.Broadcast + }), + ) + + switch tp { + case node.Bridge: + return fx.Module( + "share", + baseComponents, + bridgeAndFullComponents, + fx.Provide(func() peers.Parameters { + return cfg.PeerManagerParams + }), + fx.Provide(bridgeGetter), + fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error { + lc.Append(fx.Hook{ + OnStart: sub.Start, + OnStop: sub.Stop, + }) + return nil + }), + ) + case node.Full: + return fx.Module( + "share", + baseComponents, + bridgeAndFullComponents, + fx.Provide(getters.NewIPLDGetter), + fx.Provide(fullGetter), + ) + case node.Light: + return fx.Module( + "share", + baseComponents, + shrexGetterComponents(cfg), + lightAvailabilityComponents(cfg), + fx.Invoke(ensureEmptyEDSInBS), + fx.Provide(getters.NewIPLDGetter), + fx.Provide(lightGetter), + // shrexsub broadcaster stub for daser + fx.Provide(func() shrexsub.BroadcastFn { + return func(context.Context, shrexsub.Notification) error { + return nil + } + }), + ) + default: + panic("invalid node type") + } +} + +func discoveryComponents(cfg *Config) fx.Option { + return fx.Options( fx.Invoke(func(disc *disc.Discovery) {}), fx.Provide(fx.Annotate( newDiscovery(cfg.Discovery), @@ -45,29 +110,71 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return d.Stop(ctx) }), )), - fx.Provide( - func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) { - return shrexsub.NewPubSub(ctx, h, network.String()) - }, - ), ) +} - shrexGetterComponents := fx.Options( - fx.Provide(func() peers.Parameters { - return cfg.PeerManagerParams - }), +func peerManagerComponents(tp node.Type, cfg *Config) fx.Option { + switch tp { + case node.Full, node.Light: + return fx.Options( + fx.Provide(func() peers.Parameters { + return cfg.PeerManagerParams + }), + fx.Provide( + func( + params peers.Parameters, + host host.Host, + connGater *conngater.BasicConnectionGater, + shrexSub *shrexsub.PubSub, + headerSub libhead.Subscriber[*header.ExtendedHeader], + // we must ensure Syncer is started before PeerManager + // so that Syncer registers header validator before PeerManager subscribes to headers + _ *sync.Syncer[*header.ExtendedHeader], + ) (*peers.Manager, error) { + return peers.NewManager( + params, + host, + connGater, + peers.WithShrexSubPools(shrexSub, headerSub), + ) + }, + ), + ) + case node.Bridge: + return fx.Provide(peers.NewManager) + default: + panic("invalid node type") + } +} + +func shrexSubComponents() fx.Option { + return fx.Provide( + func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) { + return shrexsub.NewPubSub(ctx, h, network.String()) + }, + ) +} + +// shrexGetterComponents provides components for a shrex getter that +// is capable of requesting +func shrexGetterComponents(cfg *Config) fx.Option { + return fx.Options( + // shrex-nd client fx.Provide( func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) { cfg.ShrExNDParams.WithNetworkID(network.String()) return shrexnd.NewClient(cfg.ShrExNDParams, host) }, ), + + // shrex-eds client fx.Provide( func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) { cfg.ShrExEDSParams.WithNetworkID(network.String()) return shrexeds.NewClient(cfg.ShrExEDSParams, host) }, ), + fx.Provide(fx.Annotate( getters.NewShrexGetter, fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error { @@ -78,9 +185,10 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option }), )), ) +} - bridgeAndFullComponents := fx.Options( - fx.Provide(getters.NewStoreGetter), +func shrexServerComponents(cfg *Config) fx.Option { + return fx.Options( fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}), fx.Provide(fx.Annotate( func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) { @@ -108,8 +216,13 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option }), fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error { return server.Stop(ctx) - }), - )), + })), + ), + ) +} + +func edsStoreComponents(cfg *Config) fx.Option { + return fx.Options( fx.Provide(fx.Annotate( func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) { return eds.NewStore(cfg.EDSStoreParams, string(path), ds) @@ -125,6 +238,11 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return store.Stop(ctx) }), )), + ) +} + +func fullAvailabilityComponents() fx.Option { + return fx.Options( fx.Provide(fx.Annotate( full.NewShareAvailability, fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error { @@ -137,91 +255,24 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide(func(avail *full.ShareAvailability) share.Availability { return avail }), - fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn { - return shrexSub.Broadcast - }), - ) - - peerManagerWithShrexPools := fx.Options( - fx.Provide( - func( - params peers.Parameters, - host host.Host, - connGater *conngater.BasicConnectionGater, - shrexSub *shrexsub.PubSub, - headerSub libhead.Subscriber[*header.ExtendedHeader], - // we must ensure Syncer is started before PeerManager - // so that Syncer registers header validator before PeerManager subscribes to headers - _ *sync.Syncer[*header.ExtendedHeader], - ) (*peers.Manager, error) { - return peers.NewManager( - params, - host, - connGater, - peers.WithShrexSubPools(shrexSub, headerSub), - ) - }, - ), ) +} - switch tp { - case node.Bridge: - return fx.Module( - "share", - baseComponents, - fx.Provide(peers.NewManager), - bridgeAndFullComponents, - shrexGetterComponents, - fx.Provide(bridgeGetter), - fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error { - lc.Append(fx.Hook{ - OnStart: sub.Start, - OnStop: sub.Stop, - }) - return nil - }), - ) - case node.Full: - return fx.Module( - "share", - peerManagerWithShrexPools, - baseComponents, - bridgeAndFullComponents, - shrexGetterComponents, - fx.Provide(getters.NewIPLDGetter), - fx.Provide(fullGetter), - ) - case node.Light: - return fx.Module( - "share", - baseComponents, - fx.Provide(func() []light.Option { - return []light.Option{ - light.WithSampleAmount(cfg.LightAvailability.SampleAmount), - } - }), - peerManagerWithShrexPools, - shrexGetterComponents, - fx.Invoke(ensureEmptyEDSInBS), - fx.Provide(getters.NewIPLDGetter), - fx.Provide(lightGetter), - // shrexsub broadcaster stub for daser - fx.Provide(func() shrexsub.BroadcastFn { - return func(context.Context, shrexsub.Notification) error { - return nil - } - }), - fx.Provide(fx.Annotate( - light.NewShareAvailability, - fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error { - return la.Close(ctx) - }), - )), - fx.Provide(func(avail *light.ShareAvailability) share.Availability { - return avail +func lightAvailabilityComponents(cfg *Config) fx.Option { + return fx.Options( + fx.Provide(fx.Annotate( + light.NewShareAvailability, + fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error { + return la.Close(ctx) }), - ) - default: - panic("invalid node type") - } + )), + fx.Provide(func() []light.Option { + return []light.Option{ + light.WithSampleAmount(cfg.LightAvailability.SampleAmount), + } + }), + fx.Provide(func(avail *light.ShareAvailability) share.Availability { + return avail + }), + ) }