Skip to content

Commit

Permalink
chore(nodebuilder/share): Make module construction more digestable (#…
Browse files Browse the repository at this point in the history
…3170)

I tried to clean up the share module construction a bit so that it's
easier to parse. Hopefully this is more helpful than the mess that was
there prior.

**Also please carefully review this PR!!** I tried my best to make sure
that all components are added where they are supposed to be but it's
possible I may have missed something (although all related units are
passing).
  • Loading branch information
renaynay authored Feb 21, 2024
1 parent ec978ec commit 1809680
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 100 deletions.
4 changes: 3 additions & 1 deletion api/gateway/health.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gateway

import "net/http"
import (
"net/http"
)

const (
healthEndpoint = "/status/health"
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newDiscovery(cfg *disc.Parameters,
}
}

func newModule(getter share.Getter, avail share.Availability) Module {
func newShareModule(getter share.Getter, avail share.Availability) Module {
return &module{getter, avail}
}

Expand Down
247 changes: 149 additions & 98 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,72 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Supply(*cfg),
fx.Error(cfgErr),
fx.Options(options...),
fx.Provide(newModule),
fx.Provide(newShareModule),
peerManagerComponents(tp, cfg),
discoveryComponents(cfg),
shrexSubComponents(),
)

bridgeAndFullComponents := fx.Options(
fx.Provide(getters.NewStoreGetter),
shrexServerComponents(cfg),
edsStoreComponents(cfg),
fullAvailabilityComponents(),
shrexGetterComponents(cfg),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
}),
)

switch tp {
case node.Bridge:
return fx.Module(
"share",
baseComponents,
bridgeAndFullComponents,
fx.Provide(func() peers.Parameters {
return cfg.PeerManagerParams
}),
fx.Provide(bridgeGetter),
fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error {
lc.Append(fx.Hook{
OnStart: sub.Start,
OnStop: sub.Stop,
})
return nil
}),
)
case node.Full:
return fx.Module(
"share",
baseComponents,
bridgeAndFullComponents,
fx.Provide(getters.NewIPLDGetter),
fx.Provide(fullGetter),
)
case node.Light:
return fx.Module(
"share",
baseComponents,
shrexGetterComponents(cfg),
lightAvailabilityComponents(cfg),
fx.Invoke(ensureEmptyEDSInBS),
fx.Provide(getters.NewIPLDGetter),
fx.Provide(lightGetter),
// shrexsub broadcaster stub for daser
fx.Provide(func() shrexsub.BroadcastFn {
return func(context.Context, shrexsub.Notification) error {
return nil
}
}),
)
default:
panic("invalid node type")
}
}

func discoveryComponents(cfg *Config) fx.Option {
return fx.Options(
fx.Invoke(func(disc *disc.Discovery) {}),
fx.Provide(fx.Annotate(
newDiscovery(cfg.Discovery),
Expand All @@ -45,29 +110,71 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return d.Stop(ctx)
}),
)),
fx.Provide(
func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) {
return shrexsub.NewPubSub(ctx, h, network.String())
},
),
)
}

shrexGetterComponents := fx.Options(
fx.Provide(func() peers.Parameters {
return cfg.PeerManagerParams
}),
func peerManagerComponents(tp node.Type, cfg *Config) fx.Option {
switch tp {
case node.Full, node.Light:
return fx.Options(
fx.Provide(func() peers.Parameters {
return cfg.PeerManagerParams
}),
fx.Provide(
func(
params peers.Parameters,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
headerSub libhead.Subscriber[*header.ExtendedHeader],
// we must ensure Syncer is started before PeerManager
// so that Syncer registers header validator before PeerManager subscribes to headers
_ *sync.Syncer[*header.ExtendedHeader],
) (*peers.Manager, error) {
return peers.NewManager(
params,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
)
},
),
)
case node.Bridge:
return fx.Provide(peers.NewManager)
default:
panic("invalid node type")
}
}

func shrexSubComponents() fx.Option {
return fx.Provide(
func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) {
return shrexsub.NewPubSub(ctx, h, network.String())
},
)
}

// shrexGetterComponents provides components for a shrex getter that
// is capable of requesting
func shrexGetterComponents(cfg *Config) fx.Option {
return fx.Options(
// shrex-nd client
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) {
cfg.ShrExNDParams.WithNetworkID(network.String())
return shrexnd.NewClient(cfg.ShrExNDParams, host)
},
),

// shrex-eds client
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) {
cfg.ShrExEDSParams.WithNetworkID(network.String())
return shrexeds.NewClient(cfg.ShrExEDSParams, host)
},
),

fx.Provide(fx.Annotate(
getters.NewShrexGetter,
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
Expand All @@ -78,9 +185,10 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
}),
)),
)
}

bridgeAndFullComponents := fx.Options(
fx.Provide(getters.NewStoreGetter),
func shrexServerComponents(cfg *Config) fx.Option {
return fx.Options(
fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}),
fx.Provide(fx.Annotate(
func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) {
Expand Down Expand Up @@ -108,8 +216,13 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Stop(ctx)
}),
)),
})),
),
)
}

func edsStoreComponents(cfg *Config) fx.Option {
return fx.Options(
fx.Provide(fx.Annotate(
func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) {
return eds.NewStore(cfg.EDSStoreParams, string(path), ds)
Expand All @@ -125,6 +238,11 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return store.Stop(ctx)
}),
)),
)
}

func fullAvailabilityComponents() fx.Option {
return fx.Options(
fx.Provide(fx.Annotate(
full.NewShareAvailability,
fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error {
Expand All @@ -137,91 +255,24 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
}),
)

peerManagerWithShrexPools := fx.Options(
fx.Provide(
func(
params peers.Parameters,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
headerSub libhead.Subscriber[*header.ExtendedHeader],
// we must ensure Syncer is started before PeerManager
// so that Syncer registers header validator before PeerManager subscribes to headers
_ *sync.Syncer[*header.ExtendedHeader],
) (*peers.Manager, error) {
return peers.NewManager(
params,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
)
},
),
)
}

switch tp {
case node.Bridge:
return fx.Module(
"share",
baseComponents,
fx.Provide(peers.NewManager),
bridgeAndFullComponents,
shrexGetterComponents,
fx.Provide(bridgeGetter),
fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error {
lc.Append(fx.Hook{
OnStart: sub.Start,
OnStop: sub.Stop,
})
return nil
}),
)
case node.Full:
return fx.Module(
"share",
peerManagerWithShrexPools,
baseComponents,
bridgeAndFullComponents,
shrexGetterComponents,
fx.Provide(getters.NewIPLDGetter),
fx.Provide(fullGetter),
)
case node.Light:
return fx.Module(
"share",
baseComponents,
fx.Provide(func() []light.Option {
return []light.Option{
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
}
}),
peerManagerWithShrexPools,
shrexGetterComponents,
fx.Invoke(ensureEmptyEDSInBS),
fx.Provide(getters.NewIPLDGetter),
fx.Provide(lightGetter),
// shrexsub broadcaster stub for daser
fx.Provide(func() shrexsub.BroadcastFn {
return func(context.Context, shrexsub.Notification) error {
return nil
}
}),
fx.Provide(fx.Annotate(
light.NewShareAvailability,
fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error {
return la.Close(ctx)
}),
)),
fx.Provide(func(avail *light.ShareAvailability) share.Availability {
return avail
func lightAvailabilityComponents(cfg *Config) fx.Option {
return fx.Options(
fx.Provide(fx.Annotate(
light.NewShareAvailability,
fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error {
return la.Close(ctx)
}),
)
default:
panic("invalid node type")
}
)),
fx.Provide(func() []light.Option {
return []light.Option{
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
}
}),
fx.Provide(func(avail *light.ShareAvailability) share.Availability {
return avail
}),
)
}

0 comments on commit 1809680

Please sign in to comment.