From 70aec51bad8518a496275a2664b0b0be4788be30 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 13 Feb 2025 21:34:30 -0800 Subject: [PATCH] Add more tests --- pkg/solana/logpoller/filters.go | 9 ++++--- pkg/solana/logpoller/filters_test.go | 23 ++++++++++++---- pkg/solana/logpoller/log_poller.go | 11 ++++---- pkg/solana/logpoller/log_poller_test.go | 36 ++++++++++++++++++++++++- 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 92d360872..82681cfa9 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -404,11 +404,11 @@ func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64 startingBlocks := make(map[int64]int64, len(fl.filtersByID)) for id, filter := range fl.filtersByID { - if filter.IsBackfilled || startingBlock < filter.StartingBlock { - filter.StartingBlock = startingBlock + newStartingBlock := filter.StartingBlock + if filter.IsBackfilled || startingBlock < newStartingBlock { + newStartingBlock = startingBlock } - startingBlocks[id] = filter.StartingBlock - filter.IsBackfilled = false + startingBlocks[id] = newStartingBlock } err := fl.orm.UpdateStartingBlocks(ctx, startingBlocks) if err != nil { @@ -416,6 +416,7 @@ func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64 } for id, blk := range startingBlocks { + fl.filtersByID[id].IsBackfilled = false fl.filtersByID[id].StartingBlock = blk fl.filtersToBackfill[id] = struct{}{} } diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 126d856a7..9009ae952 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -528,7 +528,7 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) { }, { name: "shouldn't update anything in memory if db update fails", - replayBlock: 51500, + replayBlock: 59700, dbErr: errors.New("failed to write db"), expectedBlocks: []int64{origFilters[0].StartingBlock, origFilters[1].StartingBlock}, }, @@ -551,17 +551,30 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(nil) + newFilters := make([]Filter, len(origFilters)) + copy(newFilters, origFilters) + filters.filtersByID[ids[0]] = &newFilters[0] + filters.filtersByID[ids[1]] = &newFilters[1] + filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}} + orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(tt.dbErr) err = filters.UpdateStartingBlocks(ctx, tt.replayBlock) if tt.dbErr != nil { require.Error(t, err) + assert.Len(t, filters.filtersToBackfill, 1) // all filters should end up in the backfill queue } else { require.NoError(t, err) + assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue } - assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue + for i, id := range ids { - assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock) - assert.Contains(t, filters.filtersToBackfill, id) + assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock, + "unexpected starting block for \"%s\" filter", filters.filtersByID[id].Name) + if tt.dbErr == nil { + assert.False(t, filters.filtersByID[id].IsBackfilled) + assert.Contains(t, filters.filtersToBackfill, id) + } else { + assert.Equal(t, origFilters[i].IsBackfilled, filters.filtersByID[id].IsBackfilled) + } } }) } diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index b18544fd7..d0c67d62e 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -311,7 +311,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to) if replayBlock > 0 { - lp.replayComplete(replayBlock) + lp.replayComplete(minSlot, to) } for _, filter := range filters { @@ -426,17 +426,18 @@ func (lp *Service) run(ctx context.Context) (err error) { return nil } -func (lp *Service) replayComplete(replayBlock int64) { +func (lp *Service) replayComplete(from, to int64) { lp.replay.mut.Lock() defer lp.replay.mut.Unlock() - if lp.replay.requestBlock < replayBlock { + lp.lggr.Infow("replay complete", "from", from, "to", to) + + lp.replay.pending = false + if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock < from { // received a new request with lower block number while replaying, we'll process that next time return } - lp.lggr.Infow("replay complete", "replayBlock", lp.replay.requestBlock) lp.replay.requestBlock = NoNewReplayRequests - lp.replay.pending = false } func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block { diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index 324dae943..1c88e35ed 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -210,6 +210,7 @@ func TestLogPoller_getLastProcessedSlot(t *testing.T) { } func TestLogPoller_processBlocksRange(t *testing.T) { + t.Parallel() t.Run("Returns error if failed to start backfill", func(t *testing.T) { lp := newMockedLP(t) expectedErr := errors.New("failed to start backfill") @@ -353,6 +354,7 @@ func TestProcess(t *testing.T) { } func Test_LogPoller_Replay(t *testing.T) { + t.Parallel() ctx := tests.Context(t) fromBlock := int64(5) @@ -386,7 +388,7 @@ func Test_LogPoller_Replay(t *testing.T) { assertReplayInfo(fromBlock-1, false) }) - t.Run("shouldn't update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) { + t.Run("shouldnt update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) { expectedErr := fmt.Errorf("error") lp.LogPoller.replay.requestBlock = NoNewReplayRequests lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-3).Once().Return(expectedErr) @@ -394,4 +396,36 @@ func Test_LogPoller_Replay(t *testing.T) { assert.ErrorIs(t, err, expectedErr) assertReplayInfo(NoNewReplayRequests, false) }) + + t.Run("checkForReplayRequest should not set pending flag if there are no new requests", func(t *testing.T) { + expected := int64(NoNewReplayRequests) + lp.LogPoller.replay.requestBlock = expected + actual := lp.LogPoller.checkForReplayRequest() + assertReplayInfo(expected, false) + assert.Equal(t, expected, actual) + assert.False(t, lp.LogPoller.ReplayPending()) + }) + + t.Run("checkForReplayRequest should set pending flag if there is a new request", func(t *testing.T) { + expected := int64(3) + lp.LogPoller.replay.requestBlock = expected + actual := lp.LogPoller.checkForReplayRequest() + assertReplayInfo(expected, true) + assert.Equal(t, expected, actual) + assert.True(t, lp.LogPoller.ReplayPending()) + }) + + t.Run("replayComplete clears pending flag and resets requestBlock", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = NoNewReplayRequests + lp.LogPoller.replay.pending = true + lp.LogPoller.replayComplete(8, 20) + assertReplayInfo(NoNewReplayRequests, false) + }) + + t.Run("replayComplete clears pending flag but does not reset requestBlock if lower block request received", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 3 + lp.LogPoller.replay.pending = true + lp.LogPoller.replayComplete(8, 20) + assertReplayInfo(3, false) + }) }