Skip to content

Commit

Permalink
test(core): more deterministic non-empty block generation and listene…
Browse files Browse the repository at this point in the history
…r test do not store historic
  • Loading branch information
renaynay committed Apr 2, 2024
1 parent 16b5fd5 commit e4084fa
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 28 deletions.
78 changes: 59 additions & 19 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = networkID
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

// generate several blocks
generateBlocks(t, fetcher, cfg, cctx)
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

Expand All @@ -43,12 +42,12 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

to := uint64(40) // ensures some blocks will be non-empty
to := uint64(30)
expectedFirstHeightInRange := genHeader.Height() + 1
expectedLastHeightInRange := to - 1
expectedLenHeaders := to - expectedFirstHeightInRange

// request headers from height 1 to 10 [2:35)
// request headers from height 1 to 20 [2:30)
headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to)
require.NoError(t, err)

Expand All @@ -70,11 +69,10 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
t.Cleanup(cancel)

cfg := DefaultTestConfig()
cfg.ChainID = networkID
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher, cfg, cctx)
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)

Expand All @@ -93,8 +91,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

// ensures some blocks will be non-empty
headers, err := ce.GetRangeByHeight(ctx, genHeader, 40)
headers, err := ce.GetRangeByHeight(ctx, genHeader, 30)
require.NoError(t, err)

// ensure none of the "historic" EDSs were stored
Expand Down Expand Up @@ -143,19 +140,62 @@ func createStore(t *testing.T) *eds.Store {
return store
}

func generateBlocks(t *testing.T, fetcher *BlockFetcher, cfg *testnode.Config, cctx testnode.Context) {
sub, err := fetcher.SubscribeNewBlockEvent(context.Background())
require.NoError(t, err)
// fillBlocks generates at least 20 non-empty blocks.
func fillBlocks(
t *testing.T,
ctx context.Context,
cfg *testnode.Config,
cctx testnode.Context,
) {
for {
select {
case <-ctx.Done():
return
default:
}

i := 0
for i < 20 {
_, err := cctx.FillBlock(16, cfg.Accounts, flags.BroadcastBlock)
require.NoError(t, err)
}
}

b := <-sub
if bytes.Equal(b.Header.DataHash, share.EmptyRoot().Hash()) {
continue
// generateNonEmptyBlocks generates at least 20 non-empty blocks
func generateNonEmptyBlocks(
t *testing.T,
ctx context.Context,
fetcher *BlockFetcher,
cfg *testnode.Config,
cctx testnode.Context,
) []share.DataHash {
// generate several non-empty blocks
generateCtx, generateCtxCancel := context.WithCancel(context.Background())

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
require.NoError(t, err)
}()

go fillBlocks(t, generateCtx, cfg, cctx)

hashes := make([]share.DataHash, 0, 20)

i := 0
for i < 20 {
select {
case b, ok := <-sub:
require.True(t, ok)

if !bytes.Equal(b.Data.Hash(), share.EmptyRoot().Hash()) {
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
}
case <-ctx.Done():
t.Fatal("failed to fill blocks within timeout")
}
i++
}
generateCtxCancel()

return hashes
}
4 changes: 2 additions & 2 deletions core/listener_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID)
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID)
err := cl.Start(ctx)
require.NoError(t, err)

Expand Down
52 changes: 45 additions & 7 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (

"github.com/celestiaorg/celestia-node/header"
nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

const networkID = "private"
const testChainID = "private"

// TestListener tests the lifecycle of the core listener.
func TestListener(t *testing.T) {
Expand All @@ -31,7 +32,7 @@ func TestListener(t *testing.T) {
subscriber, err := p2p.NewSubscriber[*header.ExtendedHeader](
ps1,
header.MsgID,
p2p.WithSubscriberNetworkID(networkID),
p2p.WithSubscriberNetworkID(testChainID),
)
require.NoError(t, err)
err = subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error {
Expand All @@ -45,13 +46,13 @@ func TestListener(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
cfg.ChainID = testChainID
fetcher, _ := createCoreFetcher(t, cfg)

eds := createEdsPubSub(ctx, t)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID)
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), testChainID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -79,7 +80,7 @@ func TestListenerWithWrongChainRPC(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
cfg.ChainID = testChainID
fetcher, _ := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

Expand All @@ -94,6 +95,42 @@ func TestListenerWithWrongChainRPC(t *testing.T) {
assert.ErrorIs(t, err, errInvalidSubscription)
}

// TestListener_DoesNotStoreHistoric tests the (unlikely) case that
// blocks come through the listener's subscription that are actually
// older than the sampling window.
func TestListener_DoesNotStoreHistoric(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

// create mocknet with two pubsub endpoints
ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t)

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = testChainID
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)

// create Listener and start listening
opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond))
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt)

dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

err := cl.Start(ctx)
require.NoError(t, err)

// ensure none of the EDSes were stored
for _, hash := range dataRoots {
has, err := store.Has(ctx, hash)
require.NoError(t, err)
assert.False(t, has)
}

}

func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
Expand Down Expand Up @@ -137,8 +174,9 @@ func createListener(
edsSub *shrexsub.PubSub,
store *eds.Store,
chainID string,
opts ...Option,
) *Listener {
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID))
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(testChainID))
require.NoError(t, err)

err = p2pSub.Start(ctx)
Expand All @@ -152,7 +190,7 @@ func createListener(
})

listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader,
store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID)))
store, nodep2p.BlockTime, append(opts, WithChainID(nodep2p.Network(chainID)))...)
require.NoError(t, err)
return listener
}
Expand Down

0 comments on commit e4084fa

Please sign in to comment.