From 26850703559fc5dbf23c367955d9be2e1cf24bc6 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Tue, 5 Jul 2022 16:42:13 +0300 Subject: [PATCH] p2p: implement full node discovery (#799) * node: add full node discovery * doc: add adr for p2p discovery --- das/daser_test.go | 27 ++++--- docs/adr/adr-008-p2p-discovery.md | 69 ++++++++++++++++ go.mod | 2 +- node/p2p/bitswap.go | 5 +- node/p2p/p2p.go | 13 +-- node/p2p/routing.go | 12 +-- node/services/service.go | 39 +++++++-- node/settings.go | 10 ++- node/tests/p2p_test.go | 71 +++++++++++++++- service/share/discovery.go | 121 ++++++++++++++++++++++++++++ service/share/full_availability.go | 32 ++++++-- service/share/light_availability.go | 37 +++++++-- service/share/set.go | 69 ++++++++++++++++ service/share/set_test.go | 70 ++++++++++++++++ service/share/share.go | 5 +- service/share/testing.go | 30 +++++-- 16 files changed, 550 insertions(+), 62 deletions(-) create mode 100644 docs/adr/adr-008-p2p-discovery.md create mode 100644 service/share/discovery.go create mode 100644 service/share/set.go create mode 100644 service/share/set_test.go diff --git a/das/daser_test.go b/das/daser_test.go index a1b5e01829..5fc6ad29f0 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -28,9 +28,9 @@ var timeout = time.Second * 15 func TestDASerLifecycle(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - + avail := share.TestLightAvailability(bServ) // 15 headers from the past and 15 future headers - mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, share.NewLightAvailability) + mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail) ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) @@ -66,9 +66,9 @@ func TestDASerLifecycle(t *testing.T) { func TestDASer_Restart(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - + avail := share.TestLightAvailability(bServ) // 15 headers from the past and 15 future headers - mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, share.NewLightAvailability) + mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail) ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) @@ -129,8 +129,8 @@ func TestDASer_Restart(t *testing.T) { func TestDASer_catchUp(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - - mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 5, 0, share.NewLightAvailability) + avail := share.TestLightAvailability(bServ) + mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 5, 0, avail) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -170,8 +170,8 @@ func TestDASer_catchUp(t *testing.T) { func TestDASer_catchUp_oneHeader(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - - mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, share.NewLightAvailability) + avail := share.TestLightAvailability(bServ) + mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, avail) daser := NewDASer(shareServ, nil, mockGet, ds, mockService) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -213,8 +213,8 @@ func TestDASer_catchUp_oneHeader(t *testing.T) { func TestDASer_catchUp_fails(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - - mockGet, _, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, share.NewLightAvailability) + avail := share.TestLightAvailability(bServ) + mockGet, _, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, avail) daser := NewDASer(share.NewBrokenAvailability(), nil, mockGet, ds, mockService) ctx, cancel := context.WithCancel(context.Background()) @@ -266,8 +266,9 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) { ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0], pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) require.NoError(t, err) + avail := share.TestFullAvailability(bServ) // 15 headers from the past and 15 future headers - mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, share.NewFullAvailability) + mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, avail) // create fraud service and break one header f := fraud.NewService(ps, mockGet.GetByHeight) @@ -304,9 +305,9 @@ func createDASerSubcomponents( bServ blockservice.BlockService, numGetter, numSub int, - availabilityFn func(blockservice.BlockService) share.Availability, + availability share.Availability, ) (*mockGetter, *share.Service, *header.DummySubscriber, *fraud.DummyService) { - shareServ := share.NewService(bServ, availabilityFn(bServ)) + shareServ := share.NewService(bServ, availability) mockGet := &mockGetter{ headers: make(map[int64]*header.ExtendedHeader), diff --git a/docs/adr/adr-008-p2p-discovery.md b/docs/adr/adr-008-p2p-discovery.md new file mode 100644 index 0000000000..2e730dcfe7 --- /dev/null +++ b/docs/adr/adr-008-p2p-discovery.md @@ -0,0 +1,69 @@ +# ADR #008: Celestia-Node Full Node Discovery + +## Changelog + +- 2022.07.04 - discovery data structure + +## Authors + +@vgonkivs + +## Context + +This ADR is intended to describe p2p full node discovery in celestia node. +P2P discovery helps light and full nodes to find other full nodes on the network at the specified topic(`full`). +As soon as a full node is found and connection is established with it, then it(full node) will be added to a set of peers(limitedSet). +## Decision + +- https://github.com/celestiaorg/celestia-node/issues/599 + +## Detailed design +```go +// peersLimit is max amount of peers that will be discovered. +peersLimit = 3 + +// discovery combines advertise and discover services and allows to store discovered nodes. +type discovery struct { + // storage where all discovered and active peers are saved. + set *limitedSet + // libp2p.Host allows to connect to discovered peers. + host host.Host + // libp2p.Discovery allows to advertise and discover peers. + disc core.Discovery +} + +// limitedSet is a thread safe set of peers with given limit. +// Inspired by libp2p peer.Set but extended with Remove method. +type limitedSet struct { + lk sync.RWMutex + ps map[peer.ID]struct{} + + limit int +} +``` +### Full Nodes behavior: +1. A Node starts advertising itself over DHT at `full` namespace after the system boots up in order to be found. +2. A Node starts finding other full nodes, so it can be able to join the Full Node network. +3. As soon as a new peer is found, the node will try to establish a connection with it. In case the connection is successful +the node will call [Tag Peer](https://github.com/libp2p/go-libp2p-core/blob/525a0b13017263bde889a3295fa2e4212d7af8c5/connmgr/manager.go#L35) and add peer to the peer set, otherwise discovered peer will be dropped. + + +### Light Nodes behavior: +1. A node starts finding full nodes over DHT at `full` namespace using the `discoverer` interface. +2. As soon as a new peer is found, the node will try to establish a connection with it. In case the connection is successful + the node will call [Tag Peer](https://github.com/libp2p/go-libp2p-core/blob/525a0b13017263bde889a3295fa2e4212d7af8c5/connmgr/manager.go#L35) and add peer to the peer set, otherwise discovered peer will be dropped. + +*NOTE* Bridge node behaves the same as Full nodes. + +Tagging protects connections from ConnManager trimming/GCing. +```go +// peerWeight is a weight of discovered peers. +// peerWeight is a number that will be assigned to all discovered full nodes, +// so ConnManager will not break a connection with them. +peerWeight = 1000 +``` + +![discovery](https://user-images.githubusercontent.com/40579846/177183260-92d1c390-928b-4c06-9516-24afea94d1f1.png) + +## Status +Merged \ No newline at end of file diff --git a/go.mod b/go.mod index 58d41b9714..cc9520bd16 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tendermint/tendermint v0.35.4 go.uber.org/fx v1.17.1 + go.uber.org/multierr v1.8.0 golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 google.golang.org/grpc v1.47.0 ) @@ -247,7 +248,6 @@ require ( go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/dig v1.14.0 // indirect - go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect diff --git a/node/p2p/bitswap.go b/node/p2p/bitswap.go index 36fb45845a..3777f1dea1 100644 --- a/node/p2p/bitswap.go +++ b/node/p2p/bitswap.go @@ -11,7 +11,7 @@ import ( exchange "github.com/ipfs/go-ipfs-exchange-interface" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" - "github.com/libp2p/go-libp2p-core/routing" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/libs/fxutil" @@ -46,7 +46,7 @@ func DataExchange(cfg Config) func(bitSwapParams) (exchange.Interface, blockstor prefix := protocol.ID(fmt.Sprintf("/celestia/%s", params.Net)) return bitswap.New( ctx, - network.NewFromIpfsHost(params.Host, params.Cr, network.Prefix(prefix)), + network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix)), bs, bitswap.ProvideEnabled(false), // NOTE: These below ar required for our protocol to work reliably. @@ -64,6 +64,5 @@ type bitSwapParams struct { Net nparams.Network Lc fx.Lifecycle Host host.Host - Cr routing.ContentRouting Ds datastore.Batching } diff --git a/node/p2p/p2p.go b/node/p2p/p2p.go index f239169005..91800306d7 100644 --- a/node/p2p/p2p.go +++ b/node/p2p/p2p.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "time" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -28,7 +29,8 @@ type Config struct { // This is enabled by default for Bootstrappers. PeerExchange bool // ConnManager is a configuration tuple for ConnectionManager. - ConnManager ConnManagerConfig + ConnManager ConnManagerConfig + RoutingTableRefreshPeriod time.Duration } // DefaultConfig returns default configuration for P2P subsystem. @@ -44,10 +46,11 @@ func DefaultConfig() Config { "/ip4/127.0.0.1/tcp/2121", "/ip6/::/tcp/2121", }, - MutualPeers: []string{}, - Bootstrapper: false, - PeerExchange: false, - ConnManager: DefaultConnManagerConfig(), + MutualPeers: []string{}, + Bootstrapper: false, + PeerExchange: false, + ConnManager: DefaultConnManagerConfig(), + RoutingTableRefreshPeriod: time.Minute, } } diff --git a/node/p2p/routing.go b/node/p2p/routing.go index 5eec464bc3..ae02b703c1 100644 --- a/node/p2p/routing.go +++ b/node/p2p/routing.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/libs/fxutil" @@ -17,8 +16,8 @@ import ( // ContentRouting constructs nil content routing, // as for our use-case existing ContentRouting mechanisms, e.g DHT, are unsuitable -func ContentRouting() routing.ContentRouting { - return &routinghelpers.Null{} +func ContentRouting(r routing.PeerRouting) routing.ContentRouting { + return r.(*dht.IpfsDHT) } // PeerRouting provides constructor for PeerRouting over DHT. @@ -30,11 +29,7 @@ func PeerRouting(cfg Config) func(routingParams) (routing.PeerRouting, error) { dht.BootstrapPeers(params.Peers...), dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))), dht.Datastore(params.DataStore), - dht.QueryFilter(dht.PublicQueryFilter), - dht.RoutingTableFilter(dht.PublicRoutingTableFilter), - // disable DHT for everything besides peer routing - dht.DisableValues(), - dht.DisableProviders(), + dht.RoutingTableRefreshPeriod(cfg.RoutingTableRefreshPeriod), } if cfg.Bootstrapper { @@ -57,7 +52,6 @@ func PeerRouting(cfg Config) func(routingParams) (routing.PeerRouting, error) { return d.Close() }, }) - return d, nil } } diff --git a/node/services/service.go b/node/services/service.go index b29a32b1b6..770f5f5e0c 100644 --- a/node/services/service.go +++ b/node/services/service.go @@ -8,8 +8,11 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" + routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" "go.uber.org/fx" + "go.uber.org/multierr" "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/fraud" @@ -125,7 +128,13 @@ func HeaderStoreInit(cfg *Config) func(context.Context, params.Network, header.S } // ShareService constructs new share.Service. -func ShareService(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) *share.Service { +func ShareService( + lc fx.Lifecycle, + bServ blockservice.BlockService, + avail share.Availability, + r routing.ContentRouting, + h host.Host, +) *share.Service { service := share.NewService(bServ, avail) lc.Append(fx.Hook{ OnStart: service.Start, @@ -167,21 +176,39 @@ func LightAvailability( lc fx.Lifecycle, bServ blockservice.BlockService, ds datastore.Batching, + r routing.ContentRouting, + h host.Host, ) share.Availability { - la := share.NewLightAvailability(bServ) + la := share.NewLightAvailability(bServ, routingdisc.NewRoutingDiscovery(r), h) ca := share.NewCacheAvailability(la, ds) lc.Append(fx.Hook{ - OnStop: ca.Close, + OnStart: la.Start, + OnStop: func(ctx context.Context) error { + laErr := la.Stop(ctx) + caErr := ca.Close(ctx) + return multierr.Append(laErr, caErr) + }, }) return ca } // FullAvailability constructs full share availability wrapped in cache availability. -func FullAvailability(lc fx.Lifecycle, bServ blockservice.BlockService, ds datastore.Batching) share.Availability { - fa := share.NewFullAvailability(bServ) +func FullAvailability( + lc fx.Lifecycle, + bServ blockservice.BlockService, + ds datastore.Batching, + r routing.ContentRouting, + h host.Host, +) share.Availability { + fa := share.NewFullAvailability(bServ, routingdisc.NewRoutingDiscovery(r), h) ca := share.NewCacheAvailability(fa, ds) lc.Append(fx.Hook{ - OnStop: ca.Close, + OnStart: fa.Start, + OnStop: func(ctx context.Context) error { + faErr := fa.Stop(ctx) + caErr := ca.Close(ctx) + return multierr.Append(faErr, caErr) + }, }) return ca } diff --git a/node/settings.go b/node/settings.go index cb494de436..9af2e290f3 100644 --- a/node/settings.go +++ b/node/settings.go @@ -2,6 +2,7 @@ package node import ( "encoding/hex" + "time" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -88,9 +89,16 @@ func WithKeyringSigner(signer *apptypes.KeyringSigner) Option { } } -// WithBootstrappers sets custom bootstrap peers +// WithBootstrappers sets custom bootstrap peers. func WithBootstrappers(peers params.Bootstrappers) Option { return func(sets *settings) { sets.opts = append(sets.opts, fx.Replace(peers)) } } + +// WithRefreshRoutingTablePeriod sets custom refresh period for dht. +func WithRefreshRoutingTablePeriod(interval time.Duration) Option { + return func(sets *settings) { + sets.cfg.P2P.RoutingTableRefreshPeriod = interval + } +} diff --git a/node/tests/p2p_test.go b/node/tests/p2p_test.go index b66fc0a35d..9b8045d1a2 100644 --- a/node/tests/p2p_test.go +++ b/node/tests/p2p_test.go @@ -5,16 +5,26 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/node" "github.com/celestiaorg/celestia-node/node/tests/swamp" ) +/* +Test-Case: Full/Light Nodes connection to Bridge as a Bootstapper +Steps: +1. Create a Bridge Node(BN) +2. Start a BN +3. Create full/light nodes with bridge node as bootsrapped peer +4. Start full/light nodes +5. Check that nodes are connected to bridge +*/ func TestUseBridgeNodeAsBootstraper(t *testing.T) { sw := swamp.NewSwamp(t) @@ -64,3 +74,60 @@ func TestAddPeerToBlackList(t *testing.T) { require.True(t, full.ConnGater.InterceptPeerDial(host.InfoFromHost(light.Host).ID)) require.False(t, light.ConnGater.InterceptPeerDial(addr.ID)) } + +/* +Test-Case: Connect Full And Light using Bridge node as a bootstrapper +Steps: +1. Create a Bridge Node(BN) +2. Start a BN +3. Create full/light nodes with bridge node as bootsrapped peer +4. Start full/light nodes +5. Ensure that nodes are connected to bridge +6. Wait until light will find full node +7. Check that full and light nodes are connected to each other +*/ +func TestBootstrapNodesFromBridgeNode(t *testing.T) { + sw := swamp.NewSwamp(t) + cfg := node.DefaultConfig(node.Bridge) + cfg.P2P.Bootstrapper = true + bridge := sw.NewBridgeNode(node.WithConfig(cfg), node.WithRefreshRoutingTablePeriod(time.Second*30)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + err := bridge.Start(ctx) + require.NoError(t, err) + addr := host.InfoFromHost(bridge.Host) + + full := sw.NewFullNode( + node.WithBootstrappers([]peer.AddrInfo{*addr}), + node.WithRefreshRoutingTablePeriod(time.Second*30), + ) + light := sw.NewLightNode( + node.WithBootstrappers([]peer.AddrInfo{*addr}), + node.WithRefreshRoutingTablePeriod(time.Second*30), + ) + nodes := []*node.Node{full, light} + ch := make(chan struct{}) + bundle := &network.NotifyBundle{} + bundle.ConnectedF = func(_ network.Network, conn network.Conn) { + if conn.RemotePeer() == full.Host.ID() { + ch <- struct{}{} + } + } + light.Host.Network().Notify(bundle) + for index := range nodes { + require.NoError(t, nodes[index].Start(ctx)) + assert.Equal(t, *addr, nodes[index].Bootstrappers[0]) + assert.True(t, nodes[index].Host.Network().Connectedness(addr.ID) == network.Connected) + } + + select { + case <-ctx.Done(): + t.Fatal("peer was not found") + case <-ch: + break + } + addrFull := host.InfoFromHost(full.Host) + assert.True(t, light.Host.Network().Connectedness(addrFull.ID) == network.Connected) +} diff --git a/service/share/discovery.go b/service/share/discovery.go new file mode 100644 index 0000000000..bad0caaefd --- /dev/null +++ b/service/share/discovery.go @@ -0,0 +1,121 @@ +package share + +import ( + "context" + "time" + + core "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" +) + +const ( + // peersLimit is max amount of peers that will be discovered. + peersLimit = 3 + + // peerWeight is a weight of discovered peers. + // peerWeight is a number that will be assigned to all discovered full nodes, + // so ConnManager will not break a connection with them. + peerWeight = 1000 + topic = "full" + interval = time.Second * 10 +) + +// waitF calculates time to restart announcing. +var waitF = func(ttl time.Duration) time.Duration { + return 7 * ttl / 8 +} + +// discovery combines advertise and discover services and allows to store discovered nodes. +type discovery struct { + set *limitedSet + host host.Host + disc core.Discovery +} + +// newDiscovery constructs a new discovery. +func newDiscovery(h host.Host, d core.Discovery) *discovery { + return &discovery{ + newLimitedSet(peersLimit), + h, + d, + } +} + +// handlePeersFound receives peers and tries to establish a connection with them. +// Peer will be added to PeerCache if connection succeeds. +func (d *discovery) handlePeerFound(ctx context.Context, topic string, peer peer.AddrInfo) { + if peer.ID == d.host.ID() || len(peer.Addrs) == 0 { + return + } + err := d.set.TryAdd(peer.ID) + if err != nil { + log.Debug(err) + return + } + + err = d.host.Connect(ctx, peer) + if err != nil { + log.Warn(err) + d.set.Remove(peer.ID) + return + } + log.Debugw("added peer to set", "id", peer.ID) + // add tag to protect peer of being killed by ConnManager + d.host.ConnManager().TagPeer(peer.ID, topic, peerWeight) +} + +// findPeers starts peer discovery every 30 seconds until peer cache will not reach peersLimit. +// TODO (@vgonkivs): simplify when https://github.com/libp2p/go-libp2p/pull/1379 will be merged. +func (d *discovery) findPeers(ctx context.Context) { + t := time.NewTicker(interval * 3) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + // return if limit was reached to stop the loop and finish discovery + if d.set.Size() >= peersLimit { + return + } + peers, err := d.disc.FindPeers(ctx, topic) + if err != nil { + log.Error(err) + continue + } + for peer := range peers { + go d.handlePeerFound(ctx, topic, peer) + } + } + } +} + +// advertise is a utility function that persistently advertises a service through an Advertiser. +func (d *discovery) advertise(ctx context.Context) { + timer := time.NewTimer(interval) + for { + ttl, err := d.disc.Advertise(ctx, topic) + if err != nil { + log.Debugf("Error advertising %s: %s", topic, err.Error()) + if ctx.Err() != nil { + return + } + + select { + case <-timer.C: + timer.Reset(interval) + continue + case <-ctx.Done(): + return + } + } + + select { + case <-timer.C: + timer.Reset(waitF(ttl)) + case <-ctx.Done(): + return + } + } +} diff --git a/service/share/full_availability.go b/service/share/full_availability.go index d279afc1e1..a5fde91870 100644 --- a/service/share/full_availability.go +++ b/service/share/full_availability.go @@ -6,27 +6,47 @@ import ( "github.com/ipfs/go-blockservice" format "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/celestiaorg/celestia-node/ipld" ) -// fullAvailability implements Availability using the full data square +// FullAvailability implements Availability using the full data square // recovery technique. It is considered "full" because it is required // to download enough shares to fully reconstruct the data square. -type fullAvailability struct { +type FullAvailability struct { rtrv *ipld.Retriever + disc *discovery + + cancel context.CancelFunc } // NewFullAvailability creates a new full Availability. -func NewFullAvailability(bServ blockservice.BlockService) Availability { - return &fullAvailability{ +func NewFullAvailability(bServ blockservice.BlockService, r *routing.RoutingDiscovery, h host.Host) *FullAvailability { + return &FullAvailability{ rtrv: ipld.NewRetriever(bServ), + disc: newDiscovery(h, r), } } +func (fa *FullAvailability) Start(context.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + fa.cancel = cancel + + go fa.disc.advertise(ctx) + go fa.disc.findPeers(ctx) + return nil +} + +func (fa *FullAvailability) Stop(context.Context) error { + fa.cancel() + return nil +} + // SharesAvailable reconstructs the data committed to the given Root by requesting // enough Shares from the network. -func (fa *fullAvailability) SharesAvailable(ctx context.Context, root *Root) error { +func (fa *FullAvailability) SharesAvailable(ctx context.Context, root *Root) error { ctx, cancel := context.WithTimeout(ctx, AvailabilityTimeout) defer cancel() // we assume the caller of this method has already performed basic validation on the @@ -49,6 +69,6 @@ func (fa *fullAvailability) SharesAvailable(ctx context.Context, root *Root) err return err } -func (fa *fullAvailability) ProbabilityOfAvailability() float64 { +func (fa *FullAvailability) ProbabilityOfAvailability() float64 { return 1 } diff --git a/service/share/light_availability.go b/service/share/light_availability.go index a1c0a9e281..b224b8f7b6 100644 --- a/service/share/light_availability.go +++ b/service/share/light_availability.go @@ -7,6 +7,8 @@ import ( "github.com/ipfs/go-blockservice" format "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/celestiaorg/celestia-node/ipld" ) @@ -14,24 +16,47 @@ import ( // DefaultSampleAmount sets the default amount of samples to be sampled from the network by lightAvailability. var DefaultSampleAmount = 16 -// lightAvailability implements Availability using Data Availability Sampling technique. +// LightAvailability implements Availability using Data Availability Sampling technique. // It is light because it does not require the downloading of all the data to verify // its availability. It is assumed that there are a lot of lightAvailability instances // on the network doing sampling over the same Root to collectively verify its availability. -type lightAvailability struct { +type LightAvailability struct { bserv blockservice.BlockService + // disc discovers new full nodes in the network. + // it is not allowed to call advertise for light nodes (Full nodes only). + disc *discovery + cancel context.CancelFunc } // NewLightAvailability creates a new light Availability. -func NewLightAvailability(bserv blockservice.BlockService) Availability { - return &lightAvailability{ +func NewLightAvailability( + bserv blockservice.BlockService, + r *routing.RoutingDiscovery, + h host.Host, +) *LightAvailability { + la := &LightAvailability{ bserv: bserv, + disc: newDiscovery(h, r), } + return la +} + +func (la *LightAvailability) Start(context.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + la.cancel = cancel + go la.disc.findPeers(ctx) + + return nil +} + +func (la *LightAvailability) Stop(context.Context) error { + la.cancel() + return nil } // SharesAvailable randomly samples DefaultSamples amount of Shares committed to the given Root. // This way SharesAvailable subjectively verifies that Shares are available. -func (la *lightAvailability) SharesAvailable(ctx context.Context, dah *Root) error { +func (la *LightAvailability) SharesAvailable(ctx context.Context, dah *Root) error { log.Debugw("Validate availability", "root", dah.Hash()) // We assume the caller of this method has already performed basic validation on the // given dah/root. If for some reason this has not happened, the node should panic. @@ -91,6 +116,6 @@ func (la *lightAvailability) SharesAvailable(ctx context.Context, dah *Root) err // (DefaultSampleAmount). // // Formula: 1 - (0.75 ** amount of samples) -func (la *lightAvailability) ProbabilityOfAvailability() float64 { +func (la *LightAvailability) ProbabilityOfAvailability() float64 { return 1 - math.Pow(0.75, float64(DefaultSampleAmount)) } diff --git a/service/share/set.go b/service/share/set.go new file mode 100644 index 0000000000..be53840dfd --- /dev/null +++ b/service/share/set.go @@ -0,0 +1,69 @@ +package share + +import ( + "errors" + "sync" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// limitedSet is a thread safe set of peers with given limit. +// Inspired by libp2p peer.Set but extended with Remove method. +type limitedSet struct { + lk sync.RWMutex + ps map[peer.ID]struct{} + + limit uint +} + +// newLimitedSet constructs a set with the maximum peers amount. +func newLimitedSet(limit uint) *limitedSet { + ps := new(limitedSet) + ps.ps = make(map[peer.ID]struct{}) + ps.limit = limit + return ps +} + +func (ps *limitedSet) Contains(p peer.ID) bool { + ps.lk.RLock() + _, ok := ps.ps[p] + ps.lk.RUnlock() + return ok +} + +func (ps *limitedSet) Size() int { + ps.lk.RLock() + defer ps.lk.RUnlock() + return len(ps.ps) +} + +// TryAdd attempts to add the given peer into the set. +// This operation will fail if the number of peers in the set is equal to size. +func (ps *limitedSet) TryAdd(p peer.ID) error { + ps.lk.Lock() + defer ps.lk.Unlock() + if len(ps.ps) < int(ps.limit) { + ps.ps[p] = struct{}{} + return nil + } + + return errors.New("share: discovery: peers limit reached") +} + +func (ps *limitedSet) Remove(id peer.ID) { + ps.lk.Lock() + defer ps.lk.Unlock() + if ps.limit > 0 { + delete(ps.ps, id) + } +} + +func (ps *limitedSet) Peers() []peer.ID { + ps.lk.Lock() + out := make([]peer.ID, 0, len(ps.ps)) + for p := range ps.ps { + out = append(out, p) + } + ps.lk.Unlock() + return out +} diff --git a/service/share/set_test.go b/service/share/set_test.go new file mode 100644 index 0000000000..5c95f05904 --- /dev/null +++ b/service/share/set_test.go @@ -0,0 +1,70 @@ +package share + +import ( + "testing" + + "github.com/stretchr/testify/require" + + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" +) + +func TestSet_TryAdd(t *testing.T) { + m := mocknet.New() + h, err := m.GenPeer() + require.NoError(t, err) + + set := newLimitedSet(1) + require.NoError(t, set.TryAdd(h.ID())) + require.True(t, set.Contains(h.ID())) +} + +func TestSet_TryAddFails(t *testing.T) { + m := mocknet.New() + h1, err := m.GenPeer() + require.NoError(t, err) + h2, err := m.GenPeer() + require.NoError(t, err) + + set := newLimitedSet(1) + require.NoError(t, set.TryAdd(h1.ID())) + require.Error(t, set.TryAdd(h2.ID())) +} + +func TestSet_Remove(t *testing.T) { + m := mocknet.New() + h, err := m.GenPeer() + require.NoError(t, err) + + set := newLimitedSet(1) + require.NoError(t, set.TryAdd(h.ID())) + set.Remove(h.ID()) + require.False(t, set.Contains(h.ID())) +} + +func TestSet_Peers(t *testing.T) { + m := mocknet.New() + h1, err := m.GenPeer() + require.NoError(t, err) + h2, err := m.GenPeer() + require.NoError(t, err) + + set := newLimitedSet(2) + require.NoError(t, set.TryAdd(h1.ID())) + require.NoError(t, set.TryAdd(h2.ID())) + require.True(t, len(set.Peers()) == 2) +} + +func TestSet_Size(t *testing.T) { + m := mocknet.New() + h1, err := m.GenPeer() + require.NoError(t, err) + h2, err := m.GenPeer() + require.NoError(t, err) + + set := newLimitedSet(2) + require.NoError(t, set.TryAdd(h1.ID())) + require.NoError(t, set.TryAdd(h2.ID())) + require.Equal(t, 2, set.Size()) + set.Remove(h2.ID()) + require.Equal(t, 1, set.Size()) +} diff --git a/service/share/share.go b/service/share/share.go index c985cf674a..85a1bb679f 100644 --- a/service/share/share.go +++ b/service/share/share.go @@ -54,8 +54,7 @@ type Service struct { // session is blockservice sub-session that applies optimization for fetching/loading related nodes, like shares // prefer session over blockservice for fetching nodes. session blockservice.BlockGetter - // cancel controls lifecycle of the session - cancel context.CancelFunc + cancel context.CancelFunc } // NewService creates new basic share.Service. @@ -75,7 +74,7 @@ func (s *Service) Start(context.Context) error { // NOTE: The ctx given as param is used to control Start flow and only needed when Start is blocking, // but this one is not. // - // The newer context here is created to control lifecycle of the session. + // The newer context here is created to control lifecycle of the session and peer discovery. ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel s.session = blockservice.NewSession(ctx, s.bServ) diff --git a/service/share/testing.go b/service/share/testing.go index ee63947b60..4882d9bce8 100644 --- a/service/share/testing.go +++ b/service/share/testing.go @@ -15,6 +15,8 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" record "github.com/libp2p/go-libp2p-record" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/pkg/da" @@ -26,21 +28,21 @@ import ( // trees of 'n' random shares, essentially storing a whole square. func RandLightServiceWithSquare(t *testing.T, n int) (*Service, *Root) { bServ := mdutils.Bserv() - return NewService(bServ, NewLightAvailability(bServ)), RandFillBS(t, n, bServ) + return NewService(bServ, TestLightAvailability(bServ)), RandFillBS(t, n, bServ) } // RandLightService provides an unfilled share.Service with corresponding // blockservice.BlockService than can be filled by the test. func RandLightService() (*Service, blockservice.BlockService) { bServ := mdutils.Bserv() - return NewService(bServ, NewLightAvailability(bServ)), bServ + return NewService(bServ, TestLightAvailability(bServ)), bServ } // RandFullServiceWithSquare provides a share.Service filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. func RandFullServiceWithSquare(t *testing.T, n int) (*Service, *Root) { bServ := mdutils.Bserv() - return NewService(bServ, NewFullAvailability(bServ)), RandFillBS(t, n, bServ) + return NewService(bServ, TestFullAvailability(bServ)), RandFillBS(t, n, bServ) } // RandLightLocalServiceWithSquare is the same as RandLightServiceWithSquare, except @@ -48,7 +50,10 @@ func RandFullServiceWithSquare(t *testing.T, n int) (*Service, *Root) { func RandLightLocalServiceWithSquare(t *testing.T, n int) (*Service, *Root) { bServ := mdutils.Bserv() ds := dssync.MutexWrap(ds.NewMapDatastore()) - ca := NewCacheAvailability(NewLightAvailability(bServ), ds) + ca := NewCacheAvailability( + TestLightAvailability(bServ), + ds, + ) return NewService(bServ, ca), RandFillBS(t, n, bServ) } @@ -57,7 +62,10 @@ func RandLightLocalServiceWithSquare(t *testing.T, n int) (*Service, *Root) { func RandFullLocalServiceWithSquare(t *testing.T, n int) (*Service, *Root) { bServ := mdutils.Bserv() ds := dssync.MutexWrap(ds.NewMapDatastore()) - ca := NewCacheAvailability(NewFullAvailability(bServ), ds) + ca := NewCacheAvailability( + TestFullAvailability(bServ), + ds, + ) return NewService(bServ, ca), RandFillBS(t, n, bServ) } @@ -130,14 +138,14 @@ func (dn *dagNet) RandFullNode(squareSize int) (*node, *Root) { // LightNode creates a new empty LightAvailability Node. func (dn *dagNet) LightNode() *node { nd := dn.Node() - nd.Service = NewService(nd.BlockService, NewLightAvailability(nd.BlockService)) + nd.Service = NewService(nd.BlockService, TestLightAvailability(nd.BlockService)) return nd } // FullNode creates a new empty FullAvailability Node. func (dn *dagNet) FullNode() *node { nd := dn.Node() - nd.Service = NewService(nd.BlockService, NewFullAvailability(nd.BlockService)) + nd.Service = NewService(nd.BlockService, TestFullAvailability(nd.BlockService)) return nd } @@ -246,3 +254,11 @@ func (b *brokenAvailability) SharesAvailable(context.Context, *Root) error { func (b *brokenAvailability) ProbabilityOfAvailability() float64 { return 0 } + +func TestLightAvailability(bServ blockservice.BlockService) *LightAvailability { + return NewLightAvailability(bServ, routing.NewRoutingDiscovery(routinghelpers.Null{}), nil) +} + +func TestFullAvailability(bServ blockservice.BlockService) *FullAvailability { + return NewFullAvailability(bServ, routing.NewRoutingDiscovery(routinghelpers.Null{}), nil) +}