Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 14, 2025
1 parent 4e21d5b commit 70aec51
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 15 deletions.
9 changes: 5 additions & 4 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,18 +404,19 @@ func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64

startingBlocks := make(map[int64]int64, len(fl.filtersByID))
for id, filter := range fl.filtersByID {
if filter.IsBackfilled || startingBlock < filter.StartingBlock {
filter.StartingBlock = startingBlock
newStartingBlock := filter.StartingBlock
if filter.IsBackfilled || startingBlock < newStartingBlock {
newStartingBlock = startingBlock
}
startingBlocks[id] = filter.StartingBlock
filter.IsBackfilled = false
startingBlocks[id] = newStartingBlock
}
err := fl.orm.UpdateStartingBlocks(ctx, startingBlocks)
if err != nil {
return err
}

for id, blk := range startingBlocks {
fl.filtersByID[id].IsBackfilled = false
fl.filtersByID[id].StartingBlock = blk
fl.filtersToBackfill[id] = struct{}{}
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) {
},
{
name: "shouldn't update anything in memory if db update fails",
replayBlock: 51500,
replayBlock: 59700,
dbErr: errors.New("failed to write db"),
expectedBlocks: []int64{origFilters[0].StartingBlock, origFilters[1].StartingBlock},
},
Expand All @@ -551,17 +551,30 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) {

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(nil)
newFilters := make([]Filter, len(origFilters))
copy(newFilters, origFilters)
filters.filtersByID[ids[0]] = &newFilters[0]
filters.filtersByID[ids[1]] = &newFilters[1]
filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}}
orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(tt.dbErr)
err = filters.UpdateStartingBlocks(ctx, tt.replayBlock)
if tt.dbErr != nil {
require.Error(t, err)
assert.Len(t, filters.filtersToBackfill, 1) // all filters should end up in the backfill queue
} else {
require.NoError(t, err)
assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue
}
assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue

for i, id := range ids {
assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock)
assert.Contains(t, filters.filtersToBackfill, id)
assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock,
"unexpected starting block for \"%s\" filter", filters.filtersByID[id].Name)
if tt.dbErr == nil {
assert.False(t, filters.filtersByID[id].IsBackfilled)
assert.Contains(t, filters.filtersToBackfill, id)
} else {
assert.Equal(t, origFilters[i].IsBackfilled, filters.filtersByID[id].IsBackfilled)
}
}
})
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int

lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to)
if replayBlock > 0 {
lp.replayComplete(replayBlock)
lp.replayComplete(minSlot, to)
}

for _, filter := range filters {
Expand Down Expand Up @@ -426,17 +426,18 @@ func (lp *Service) run(ctx context.Context) (err error) {
return nil
}

func (lp *Service) replayComplete(replayBlock int64) {
func (lp *Service) replayComplete(from, to int64) {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock < replayBlock {
lp.lggr.Infow("replay complete", "from", from, "to", to)

lp.replay.pending = false
if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock < from {
// received a new request with lower block number while replaying, we'll process that next time
return
}
lp.lggr.Infow("replay complete", "replayBlock", lp.replay.requestBlock)
lp.replay.requestBlock = NoNewReplayRequests
lp.replay.pending = false
}

func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
Expand Down
36 changes: 35 additions & 1 deletion pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestLogPoller_getLastProcessedSlot(t *testing.T) {
}

func TestLogPoller_processBlocksRange(t *testing.T) {
t.Parallel()
t.Run("Returns error if failed to start backfill", func(t *testing.T) {
lp := newMockedLP(t)
expectedErr := errors.New("failed to start backfill")
Expand Down Expand Up @@ -353,6 +354,7 @@ func TestProcess(t *testing.T) {
}

func Test_LogPoller_Replay(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
fromBlock := int64(5)

Expand Down Expand Up @@ -386,12 +388,44 @@ func Test_LogPoller_Replay(t *testing.T) {
assertReplayInfo(fromBlock-1, false)
})

t.Run("shouldn't update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) {
t.Run("shouldnt update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) {
expectedErr := fmt.Errorf("error")
lp.LogPoller.replay.requestBlock = NoNewReplayRequests
lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-3).Once().Return(expectedErr)
err := lp.LogPoller.Replay(ctx, fromBlock-3)
assert.ErrorIs(t, err, expectedErr)
assertReplayInfo(NoNewReplayRequests, false)
})

t.Run("checkForReplayRequest should not set pending flag if there are no new requests", func(t *testing.T) {
expected := int64(NoNewReplayRequests)
lp.LogPoller.replay.requestBlock = expected
actual := lp.LogPoller.checkForReplayRequest()
assertReplayInfo(expected, false)
assert.Equal(t, expected, actual)
assert.False(t, lp.LogPoller.ReplayPending())
})

t.Run("checkForReplayRequest should set pending flag if there is a new request", func(t *testing.T) {
expected := int64(3)
lp.LogPoller.replay.requestBlock = expected
actual := lp.LogPoller.checkForReplayRequest()
assertReplayInfo(expected, true)
assert.Equal(t, expected, actual)
assert.True(t, lp.LogPoller.ReplayPending())
})

t.Run("replayComplete clears pending flag and resets requestBlock", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = NoNewReplayRequests
lp.LogPoller.replay.pending = true
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(NoNewReplayRequests, false)
})

t.Run("replayComplete clears pending flag but does not reset requestBlock if lower block request received", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 3
lp.LogPoller.replay.pending = true
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(3, false)
})
}

0 comments on commit 70aec51

Please sign in to comment.