Skip to content

Commit

Permalink
p2p: implement full node discovery (#799)
Browse files Browse the repository at this point in the history
* node: add full node discovery
* doc: add adr for p2p discovery
  • Loading branch information
vgonkivs authored Jul 5, 2022
1 parent b1636e0 commit 2685070
Show file tree
Hide file tree
Showing 16 changed files with 550 additions and 62 deletions.
27 changes: 14 additions & 13 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ var timeout = time.Second * 15
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()

avail := share.TestLightAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, share.NewLightAvailability)
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -66,9 +66,9 @@ func TestDASerLifecycle(t *testing.T) {
func TestDASer_Restart(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()

avail := share.TestLightAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, share.NewLightAvailability)
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -129,8 +129,8 @@ func TestDASer_Restart(t *testing.T) {
func TestDASer_catchUp(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()

mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 5, 0, share.NewLightAvailability)
avail := share.TestLightAvailability(bServ)
mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 5, 0, avail)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down Expand Up @@ -170,8 +170,8 @@ func TestDASer_catchUp(t *testing.T) {
func TestDASer_catchUp_oneHeader(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()

mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, share.NewLightAvailability)
avail := share.TestLightAvailability(bServ)
mockGet, shareServ, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, avail)
daser := NewDASer(shareServ, nil, mockGet, ds, mockService)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down Expand Up @@ -213,8 +213,8 @@ func TestDASer_catchUp_oneHeader(t *testing.T) {
func TestDASer_catchUp_fails(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()

mockGet, _, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, share.NewLightAvailability)
avail := share.TestLightAvailability(bServ)
mockGet, _, _, mockService := createDASerSubcomponents(t, bServ, 6, 0, avail)
daser := NewDASer(share.NewBrokenAvailability(), nil, mockGet, ds, mockService)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -266,8 +266,9 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
avail := share.TestFullAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, share.NewFullAvailability)
mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, avail)

// create fraud service and break one header
f := fraud.NewService(ps, mockGet.GetByHeight)
Expand Down Expand Up @@ -304,9 +305,9 @@ func createDASerSubcomponents(
bServ blockservice.BlockService,
numGetter,
numSub int,
availabilityFn func(blockservice.BlockService) share.Availability,
availability share.Availability,
) (*mockGetter, *share.Service, *header.DummySubscriber, *fraud.DummyService) {
shareServ := share.NewService(bServ, availabilityFn(bServ))
shareServ := share.NewService(bServ, availability)

mockGet := &mockGetter{
headers: make(map[int64]*header.ExtendedHeader),
Expand Down
69 changes: 69 additions & 0 deletions docs/adr/adr-008-p2p-discovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# ADR #008: Celestia-Node Full Node Discovery

## Changelog

- 2022.07.04 - discovery data structure

## Authors

@vgonkivs

## Context

This ADR is intended to describe p2p full node discovery in celestia node.
P2P discovery helps light and full nodes to find other full nodes on the network at the specified topic(`full`).
As soon as a full node is found and connection is established with it, then it(full node) will be added to a set of peers(limitedSet).
## Decision

- https://github.com/celestiaorg/celestia-node/issues/599

## Detailed design
```go
// peersLimit is max amount of peers that will be discovered.
peersLimit = 3

// discovery combines advertise and discover services and allows to store discovered nodes.
type discovery struct {
// storage where all discovered and active peers are saved.
set *limitedSet
// libp2p.Host allows to connect to discovered peers.
host host.Host
// libp2p.Discovery allows to advertise and discover peers.
disc core.Discovery
}

// limitedSet is a thread safe set of peers with given limit.
// Inspired by libp2p peer.Set but extended with Remove method.
type limitedSet struct {
lk sync.RWMutex
ps map[peer.ID]struct{}

limit int
}
```
### Full Nodes behavior:
1. A Node starts advertising itself over DHT at `full` namespace after the system boots up in order to be found.
2. A Node starts finding other full nodes, so it can be able to join the Full Node network.
3. As soon as a new peer is found, the node will try to establish a connection with it. In case the connection is successful
the node will call [Tag Peer](https://github.com/libp2p/go-libp2p-core/blob/525a0b13017263bde889a3295fa2e4212d7af8c5/connmgr/manager.go#L35) and add peer to the peer set, otherwise discovered peer will be dropped.


### Light Nodes behavior:
1. A node starts finding full nodes over DHT at `full` namespace using the `discoverer` interface.
2. As soon as a new peer is found, the node will try to establish a connection with it. In case the connection is successful
the node will call [Tag Peer](https://github.com/libp2p/go-libp2p-core/blob/525a0b13017263bde889a3295fa2e4212d7af8c5/connmgr/manager.go#L35) and add peer to the peer set, otherwise discovered peer will be dropped.

*NOTE* Bridge node behaves the same as Full nodes.

Tagging protects connections from ConnManager trimming/GCing.
```go
// peerWeight is a weight of discovered peers.
// peerWeight is a number that will be assigned to all discovered full nodes,
// so ConnManager will not break a connection with them.
peerWeight = 1000
```

![discovery](https://user-images.githubusercontent.com/40579846/177183260-92d1c390-928b-4c06-9516-24afea94d1f1.png)

## Status
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tendermint/tendermint v0.35.4
go.uber.org/fx v1.17.1
go.uber.org/multierr v1.8.0
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
google.golang.org/grpc v1.47.0
)
Expand Down Expand Up @@ -247,7 +248,6 @@ require (
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.14.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
Expand Down
5 changes: 2 additions & 3 deletions node/p2p/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
exchange "github.com/ipfs/go-ipfs-exchange-interface"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
Expand Down Expand Up @@ -46,7 +46,7 @@ func DataExchange(cfg Config) func(bitSwapParams) (exchange.Interface, blockstor
prefix := protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))
return bitswap.New(
ctx,
network.NewFromIpfsHost(params.Host, params.Cr, network.Prefix(prefix)),
network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix)),
bs,
bitswap.ProvideEnabled(false),
// NOTE: These below ar required for our protocol to work reliably.
Expand All @@ -64,6 +64,5 @@ type bitSwapParams struct {
Net nparams.Network
Lc fx.Lifecycle
Host host.Host
Cr routing.ContentRouting
Ds datastore.Batching
}
13 changes: 8 additions & 5 deletions node/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -28,7 +29,8 @@ type Config struct {
// This is enabled by default for Bootstrappers.
PeerExchange bool
// ConnManager is a configuration tuple for ConnectionManager.
ConnManager ConnManagerConfig
ConnManager ConnManagerConfig
RoutingTableRefreshPeriod time.Duration
}

// DefaultConfig returns default configuration for P2P subsystem.
Expand All @@ -44,10 +46,11 @@ func DefaultConfig() Config {
"/ip4/127.0.0.1/tcp/2121",
"/ip6/::/tcp/2121",
},
MutualPeers: []string{},
Bootstrapper: false,
PeerExchange: false,
ConnManager: DefaultConnManagerConfig(),
MutualPeers: []string{},
Bootstrapper: false,
PeerExchange: false,
ConnManager: DefaultConnManagerConfig(),
RoutingTableRefreshPeriod: time.Minute,
}
}

Expand Down
12 changes: 3 additions & 9 deletions node/p2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
Expand All @@ -17,8 +16,8 @@ import (

// ContentRouting constructs nil content routing,
// as for our use-case existing ContentRouting mechanisms, e.g DHT, are unsuitable
func ContentRouting() routing.ContentRouting {
return &routinghelpers.Null{}
func ContentRouting(r routing.PeerRouting) routing.ContentRouting {
return r.(*dht.IpfsDHT)
}

// PeerRouting provides constructor for PeerRouting over DHT.
Expand All @@ -30,11 +29,7 @@ func PeerRouting(cfg Config) func(routingParams) (routing.PeerRouting, error) {
dht.BootstrapPeers(params.Peers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))),
dht.Datastore(params.DataStore),
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
// disable DHT for everything besides peer routing
dht.DisableValues(),
dht.DisableProviders(),
dht.RoutingTableRefreshPeriod(cfg.RoutingTableRefreshPeriod),
}

if cfg.Bootstrapper {
Expand All @@ -57,7 +52,6 @@ func PeerRouting(cfg Config) func(routingParams) (routing.PeerRouting, error) {
return d.Close()
},
})

return d, nil
}
}
Expand Down
39 changes: 33 additions & 6 deletions node/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"go.uber.org/fx"
"go.uber.org/multierr"

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
Expand Down Expand Up @@ -125,7 +128,13 @@ func HeaderStoreInit(cfg *Config) func(context.Context, params.Network, header.S
}

// ShareService constructs new share.Service.
func ShareService(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) *share.Service {
func ShareService(
lc fx.Lifecycle,
bServ blockservice.BlockService,
avail share.Availability,
r routing.ContentRouting,
h host.Host,
) *share.Service {
service := share.NewService(bServ, avail)
lc.Append(fx.Hook{
OnStart: service.Start,
Expand Down Expand Up @@ -167,21 +176,39 @@ func LightAvailability(
lc fx.Lifecycle,
bServ blockservice.BlockService,
ds datastore.Batching,
r routing.ContentRouting,
h host.Host,
) share.Availability {
la := share.NewLightAvailability(bServ)
la := share.NewLightAvailability(bServ, routingdisc.NewRoutingDiscovery(r), h)
ca := share.NewCacheAvailability(la, ds)
lc.Append(fx.Hook{
OnStop: ca.Close,
OnStart: la.Start,
OnStop: func(ctx context.Context) error {
laErr := la.Stop(ctx)
caErr := ca.Close(ctx)
return multierr.Append(laErr, caErr)
},
})
return ca
}

// FullAvailability constructs full share availability wrapped in cache availability.
func FullAvailability(lc fx.Lifecycle, bServ blockservice.BlockService, ds datastore.Batching) share.Availability {
fa := share.NewFullAvailability(bServ)
func FullAvailability(
lc fx.Lifecycle,
bServ blockservice.BlockService,
ds datastore.Batching,
r routing.ContentRouting,
h host.Host,
) share.Availability {
fa := share.NewFullAvailability(bServ, routingdisc.NewRoutingDiscovery(r), h)
ca := share.NewCacheAvailability(fa, ds)
lc.Append(fx.Hook{
OnStop: ca.Close,
OnStart: fa.Start,
OnStop: func(ctx context.Context) error {
faErr := fa.Stop(ctx)
caErr := ca.Close(ctx)
return multierr.Append(faErr, caErr)
},
})
return ca
}
10 changes: 9 additions & 1 deletion node/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"encoding/hex"
"time"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -88,9 +89,16 @@ func WithKeyringSigner(signer *apptypes.KeyringSigner) Option {
}
}

// WithBootstrappers sets custom bootstrap peers
// WithBootstrappers sets custom bootstrap peers.
func WithBootstrappers(peers params.Bootstrappers) Option {
return func(sets *settings) {
sets.opts = append(sets.opts, fx.Replace(peers))
}
}

// WithRefreshRoutingTablePeriod sets custom refresh period for dht.
func WithRefreshRoutingTablePeriod(interval time.Duration) Option {
return func(sets *settings) {
sets.cfg.P2P.RoutingTableRefreshPeriod = interval
}
}
Loading

0 comments on commit 2685070

Please sign in to comment.