Skip to content

Commit

Permalink
adding test for missing block
Browse files Browse the repository at this point in the history
  • Loading branch information
sideninja committed Sep 16, 2024
1 parent ba6d3ac commit 5f3f789
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 13 deletions.
87 changes: 87 additions & 0 deletions services/ingestion/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingestion

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -66,6 +67,92 @@ func Test_Subscribing(t *testing.T) {
require.Equal(t, uint64(endHeight), prevHeight)
}

func Test_MissingBlockEvent(t *testing.T) {
const endHeight = uint64(20)
const startHeight = uint64(1)
const missingBlockHeight = uint64(10)
const foundBlockHeight = uint64(15)

currentClient, clientEvents := testutils.SetupClient(startHeight, endHeight)

client, err := requester.NewCrossSporkClient(
currentClient,
nil,
zerolog.New(zerolog.NewTestWriter(t)),
flowGo.Previewnet,
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

missingHashes := make([]gethCommon.Hash, 0)

go func() {
defer close(clientEvents)

for i := startHeight; i <= endHeight; i++ {
txCdc, txEvent, tx, _, _ := newTransaction(i)
blockCdc, _, blockEvent, _ := newBlock(i, []gethCommon.Hash{tx.Hash()})

if i == foundBlockHeight {
blockCdc, _, _, _ = newBlock(i, missingHashes)
}

blockEvents := []flow.Event{
{Value: blockCdc, Type: string(txEvent.Etype)},
{Value: txCdc, Type: string(blockEvent.Etype)},
}

if i > missingBlockHeight && i < foundBlockHeight {
blockEvents = blockEvents[1:] // remove block
missingHashes = append(missingHashes, tx.Hash())
}

clientEvents <- flow.BlockEvents{
Height: i,
Events: blockEvents,
}
}
}()

var prevHeight uint64
for ev := range events {
if prevHeight == endHeight {
require.ErrorIs(t, ev.Err, errs.ErrDisconnected)
break
}

require.NoError(t, ev.Err)
block := ev.Events.Block()
require.NotNil(t, block) // make sure all have blocks
// make sure all normal blocks have 1 tx
if block.Height != foundBlockHeight {
require.Len(t, ev.Events.Transactions(), 1)
}
// the block that was missing has all txs
if block.Height == foundBlockHeight {
// the missing block has all the transaction in between when it was missing
require.Len(t, ev.Events.Transactions(), int(foundBlockHeight-missingBlockHeight))
for i, h := range missingHashes {
found := false
for _, tx := range ev.Events.Transactions() {
if h.Cmp(tx.Hash()) == 0 {
found = true
}
}
require.True(t, found, fmt.Sprintf("required hash not found at index %d %s", i, h.String()))
}
}

prevHeight = ev.Events.CadenceHeight()
}

// this makes sure we indexed all the events
require.Equal(t, endHeight, prevHeight)
}

// Test that back-up fetching of EVM events is triggered when the
// Event Streaming API returns an inconsistent response.
// This scenario tests the happy path, when the back-up fetching of
Expand Down
31 changes: 18 additions & 13 deletions services/testutils/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ func (c *MockClient) SubscribeEventsByBlockHeight(
}

func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient {
client, events := SetupClient(startHeight, endHeight)
go func() {
defer close(events)

for i := startHeight; i <= endHeight; i++ {
events <- flow.BlockEvents{
Height: i,
}
}
}()

return client
}

func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.BlockEvents) {
events := make(chan flow.BlockEvents)

return &MockClient{
Client: &mocks.Client{},
GetLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) {
Expand Down Expand Up @@ -66,19 +83,7 @@ func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient {
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
events := make(chan flow.BlockEvents)

go func() {
defer close(events)

for i := startHeight; i <= endHeight; i++ {
events <- flow.BlockEvents{
Height: i,
}
}
}()

return events, make(chan error), nil
},
}
}, events
}

0 comments on commit 5f3f789

Please sign in to comment.