Skip to content

Commit

Permalink
modify values
Browse files Browse the repository at this point in the history
  • Loading branch information
ramin committed Jul 4, 2024
1 parent accb058 commit 9ad5aed
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
16 changes: 7 additions & 9 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ const (
// findPeersTimeout limits the FindPeers operation in time
findPeersTimeout = time.Minute

// retryTimeout defines time interval between discovery and advertise attempts.
retryTimeout = time.Second
// advertiseRetryTimeout defines time interval between advertise attempts.
advertiseRetryTimeout = time.Second * 1

// logInterval defines the time interval at which a warning message will be logged
// if the desired number of nodes is not detected.
logInterval = 5 * time.Minute
)

// discoveryRetryTimeout defines time interval between discovery attempts, needed for tests
var discoveryRetryTimeout = retryTimeout
// discoveryRetryTimeout defines time interval between discovery attempts
// this is set independently for tests in discover_test.go
var DiscoveryRetryTimeout = advertiseRetryTimeout * 60

// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
Expand Down Expand Up @@ -181,16 +182,13 @@ func (d *Discovery) Advertise(ctx context.Context) {

// we don't want retry indefinitely in busy loop
// internal discovery mechanism may need some time before attempts
errTimer := time.NewTimer(retryTimeout)
select {
case <-errTimer.C:
errTimer.Stop()
case <-time.After(advertiseRetryTimeout):
if !timer.Stop() {
<-timer.C
}
continue
case <-ctx.Done():
errTimer.Stop()
return
}
}
Expand All @@ -212,7 +210,7 @@ func (d *Discovery) Advertise(ctx context.Context) {
// It initiates peer discovery upon request and restarts the process until the soft limit is
// reached.
func (d *Discovery) discoveryLoop(ctx context.Context) {
t := time.NewTicker(discoveryRetryTimeout)
t := time.NewTicker(DiscoveryRetryTimeout)
defer t.Stop()

warnTicker := time.NewTicker(logInterval)
Expand Down
2 changes: 1 addition & 1 deletion share/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
func TestDiscovery(t *testing.T) {
const nodes = 10 // higher number brings higher coverage

discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go
DiscoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
t.Cleanup(cancel)
Expand Down
30 changes: 19 additions & 11 deletions share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

const (
defaultTimeout = time.Second * 5
)

func TestManager(t *testing.T) {
t.Run("Validate pool by headerSub", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand All @@ -49,7 +53,7 @@ func TestManager(t *testing.T) {
})

t.Run("Validate pool by shrex.Getter", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand All @@ -72,7 +76,7 @@ func TestManager(t *testing.T) {
})

t.Run("validator", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -108,7 +112,7 @@ func TestManager(t *testing.T) {
})

t.Run("cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -153,7 +157,7 @@ func TestManager(t *testing.T) {
})

t.Run("no peers from shrex.Sub, get from discovery", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand All @@ -176,7 +180,7 @@ func TestManager(t *testing.T) {
})

t.Run("no peers from shrex.Sub and from discovery. Wait", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -218,7 +222,7 @@ func TestManager(t *testing.T) {
})

t.Run("shrexSub sends a message lower than first headerSub header height, headerSub first", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -251,7 +255,7 @@ func TestManager(t *testing.T) {
})

t.Run("shrexSub sends a message lower than first headerSub header height, shrexSub first", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -289,7 +293,7 @@ func TestManager(t *testing.T) {
})

t.Run("pools store window", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -322,7 +326,7 @@ func TestIntegration(t *testing.T) {
t.Run("get peer from shrexsub", func(t *testing.T) {
nw, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

bnPubSub, err := shrexsub.NewPubSub(ctx, nw.Hosts()[0], "test")
Expand Down Expand Up @@ -365,7 +369,7 @@ func TestIntegration(t *testing.T) {
fullNodesTag := "fullNodes"
nw, err := mocknet.FullMeshConnected(3)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// set up bootstrapper
Expand All @@ -391,6 +395,9 @@ func TestIntegration(t *testing.T) {
bnRouter, err := dht.New(ctx, bnHost, opts...)
require.NoError(t, err)

// hack
discovery.DiscoveryRetryTimeout = time.Millisecond * 100

params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second

Expand Down Expand Up @@ -450,6 +457,7 @@ func TestIntegration(t *testing.T) {
go bnDisc.Advertise(ctx)

select {

case <-waitCh:
require.Contains(t, fnPeerManager.nodes.peersList, bnHost.ID())
case <-ctx.Done():
Expand Down

0 comments on commit 9ad5aed

Please sign in to comment.