From a1fdb165b532a3bcdb7a4d0e418ae85514637a3f Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:00:44 -0800 Subject: [PATCH] Change ReplayPending bool to ReplayStatus, returning enum --- pkg/solana/logpoller/filters.go | 3 +- pkg/solana/logpoller/filters_test.go | 5 +- pkg/solana/logpoller/log_poller.go | 74 +++++++++++----------- pkg/solana/logpoller/log_poller_test.go | 81 ++++++++++++------------- pkg/solana/logpoller/mock_filters.go | 34 +++-------- pkg/solana/logpoller/types.go | 24 ++++++++ 6 files changed, 116 insertions(+), 105 deletions(-) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index b9bc136b2..21ca75c3e 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -407,7 +407,7 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err // are already scheduled for backfilling, but only if the new startingBlock is less than the one they // currently have set. All filters will then be added to the filtersToBackfill index, so their logs will // be (re-)fetched from the specified starting point on the next iteration of the main LogPoller run loop. -func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64) error { +func (fl *filters) UpdateStartingBlocks(startingBlock int64) { fl.filtersMutex.Lock() defer fl.filtersMutex.Unlock() @@ -425,7 +425,6 @@ func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64 fl.filtersByID[id].StartingBlock = blk fl.filtersToBackfill[id] = struct{}{} } - return nil } // LoadFilters - loads filters from database. Can be called multiple times without side effects. diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 35ba9f007..8f763338b 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -489,7 +489,6 @@ func TestFilters_ExtractField(t *testing.T) { } func TestFilters_UpdateStartingBlocks(t *testing.T) { - ctx := tests.Context(t) orm := NewMockORM(t) lggr := logger.Sugared(logger.Test(t)) filters := newFilters(lggr, orm) @@ -549,9 +548,7 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) { filters.filtersByID[ids[0]] = &newFilters[0] filters.filtersByID[ids[1]] = &newFilters[1] filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}} - orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(nil) - err = filters.UpdateStartingBlocks(ctx, tt.replayBlock) - require.NoError(t, err) + filters.UpdateStartingBlocks(tt.replayBlock) assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue for i, id := range ids { diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 90f5fd549..256f51096 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -18,8 +18,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) -const NoNewReplayRequests = -1 - var ( ErrFilterNameConflict = errors.New("filter with such name already exists") ) @@ -51,7 +49,7 @@ type filtersI interface { GetDistinctAddresses(ctx context.Context) ([]PublicKey, error) GetFiltersToBackfill() []Filter MarkFilterBackfilled(ctx context.Context, filterID int64) error - UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error + UpdateStartingBlocks(startingBlocks int64) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error) IncrementSeqNum(filterID int64) int64 @@ -60,7 +58,13 @@ type filtersI interface { type ReplayInfo struct { mut sync.RWMutex requestBlock int64 - pending bool + status ReplayStatus +} + +// hasRequest returns true if a new request has been received (since the last request completed), +// whether or not it is pending yet +func (r *ReplayInfo) hasRequest() bool { + return r.status == ReplayStatusRequested || r.status == ReplayStatusPending } type Service struct { @@ -97,7 +101,7 @@ func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { }, }.NewServiceEngine(lggr) lp.lggr = lp.eng.SugaredLogger - lp.replay.requestBlock = NoNewReplayRequests + lp.replay.status = ReplayStatusNoRequest return lp } @@ -227,38 +231,35 @@ func (lp *Service) UnregisterFilter(ctx context.Context, name string) error { // LogPoller run loop it will backfill all filters starting from fromBlock. If there // are new filters in the backfill queue, with an earlier StartingBlock, then they // will get backfilled from there instead. -func (lp *Service) Replay(ctx context.Context, fromBlock int64) error { - ctx, cancel := lp.eng.Ctx(ctx) - defer cancel() - +func (lp *Service) Replay(fromBlock int64) error { lp.replay.mut.Lock() defer lp.replay.mut.Unlock() - if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock <= fromBlock { + if lp.replay.hasRequest() && lp.replay.requestBlock <= fromBlock { // Already requested, no further action required lp.lggr.Warnf("Ignoring redundant request to replay from block %d, replay from block %d already requested", fromBlock, lp.replay.requestBlock) return nil } - err := lp.filters.UpdateStartingBlocks(ctx, fromBlock) - if err != nil { - return err - } + lp.filters.UpdateStartingBlocks(fromBlock) lp.replay.requestBlock = fromBlock + if lp.replay.status != ReplayStatusPending { + lp.replay.status = ReplayStatusRequested + } return nil } -// ReplayPending returns the current replay status of LogPoller. true indicates there is a replay currently in progress. -// False means there is no replay in progress. Some subtleties to bear in mind: -// 1. if a new request has been submitted since the last replay finished, but the backfilling hasn't been started yet, -// this will still return false. -// 2. It may also return false if the node is restarted after a replay request was submitted but before it completes, -// even though it will still know to finish backfilling everything needed in order to satisfy the replay request. -func (lp *Service) ReplayPending() bool { +// ReplayStatus returns the current replay status of LogPoller: +// +// NoRequests - there have not been any replay requests yet since node startup +// Requested - a replay has been requested, but has not started yet +// Pending - a replay is currently in progress +// Complete - there was at least one replay executed since startup, but all have since completed +func (lp *Service) ReplayStatus() ReplayStatus { lp.replay.mut.RLock() defer lp.replay.mut.RUnlock() - return lp.replay.pending + return lp.replay.status } func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { @@ -290,21 +291,21 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { // checkForReplayRequest checks whether there have been any new replay requests since it was last called, // and if so sets the pending flag to true and returns the block number -func (lp *Service) checkForReplayRequest() int64 { +func (lp *Service) checkForReplayRequest() bool { lp.replay.mut.Lock() defer lp.replay.mut.Unlock() - if lp.replay.requestBlock == NoNewReplayRequests { - return NoNewReplayRequests + if !lp.replay.hasRequest() { + return false } lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock) - lp.replay.pending = true - return lp.replay.requestBlock + lp.replay.status = ReplayStatusPending + return true } func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error { - replayBlock := lp.checkForReplayRequest() + isReplay := lp.checkForReplayRequest() addressesSet := make(map[PublicKey]struct{}) addresses := make([]PublicKey, 0, len(filters)) @@ -326,7 +327,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int } lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to) - if replayBlock != NoNewReplayRequests { + if isReplay { lp.replayComplete(minSlot, to) } @@ -442,18 +443,23 @@ func (lp *Service) run(ctx context.Context) (err error) { return nil } -func (lp *Service) replayComplete(from, to int64) { +// replayComplete is called when a backfill associated with a current pending replay has just completed. +// Assuming there were no new requests to replay while the backfill was happening, it updates the replay +// status to ReplayStatusComplete. If there was a request for a lower block number in the meantime, then +// the status will revert to ReplayStatusRequested +func (lp *Service) replayComplete(from, to int64) bool { lp.replay.mut.Lock() defer lp.replay.mut.Unlock() lp.lggr.Infow("replay complete", "from", from, "to", to) - lp.replay.pending = false - if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock < from { + if lp.replay.requestBlock < from { // received a new request with lower block number while replaying, we'll process that next time - return + lp.replay.status = ReplayStatusRequested + return false } - lp.replay.requestBlock = NoNewReplayRequests + lp.replay.status = ReplayStatusComplete + return true } func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block { diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index 1c88e35ed..facfabc68 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -6,7 +6,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "math/rand" "sync/atomic" "testing" @@ -355,77 +354,77 @@ func TestProcess(t *testing.T) { func Test_LogPoller_Replay(t *testing.T) { t.Parallel() - ctx := tests.Context(t) fromBlock := int64(5) lp := newMockedLP(t) - assertReplayInfo := func(requestBlock int64, pending bool) { + assertReplayInfo := func(requestBlock int64, status ReplayStatus) { assert.Equal(t, requestBlock, lp.LogPoller.replay.requestBlock) - assert.Equal(t, pending, lp.LogPoller.replay.pending) + assert.Equal(t, status, lp.LogPoller.replay.status) } t.Run("ReplayInfo state initialized propery", func(t *testing.T) { - assertReplayInfo(NoNewReplayRequests, false) + assertReplayInfo(0, ReplayStatusNoRequest) }) t.Run("ordinary replay request", func(t *testing.T) { - lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock).Once().Return(nil) - err := lp.LogPoller.Replay(ctx, fromBlock) + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock).Once() + err := lp.LogPoller.Replay(fromBlock) require.NoError(t, err) - assertReplayInfo(fromBlock, false) + assertReplayInfo(fromBlock, ReplayStatusRequested) }) t.Run("redundant replay request", func(t *testing.T) { - err := lp.LogPoller.Replay(ctx, fromBlock) + lp.LogPoller.replay.requestBlock = fromBlock + lp.LogPoller.replay.status = ReplayStatusRequested + err := lp.LogPoller.Replay(fromBlock + 10) require.NoError(t, err) - assertReplayInfo(fromBlock, false) + assertReplayInfo(fromBlock, ReplayStatusRequested) }) t.Run("replay request updated", func(t *testing.T) { - lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-1).Once().Return(nil) - err := lp.LogPoller.Replay(ctx, fromBlock-1) + lp.LogPoller.replay.status = ReplayStatusNoRequest + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once() + err := lp.LogPoller.Replay(fromBlock - 1) require.NoError(t, err) - assertReplayInfo(fromBlock-1, false) + assertReplayInfo(fromBlock-1, ReplayStatusRequested) }) - t.Run("shouldnt update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) { - expectedErr := fmt.Errorf("error") - lp.LogPoller.replay.requestBlock = NoNewReplayRequests - lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-3).Once().Return(expectedErr) - err := lp.LogPoller.Replay(ctx, fromBlock-3) - assert.ErrorIs(t, err, expectedErr) - assertReplayInfo(NoNewReplayRequests, false) + t.Run("replay request updated while pending", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = fromBlock + lp.LogPoller.replay.status = ReplayStatusPending + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once() + err := lp.LogPoller.Replay(fromBlock - 1) + require.NoError(t, err) + assertReplayInfo(fromBlock-1, ReplayStatusPending) }) - t.Run("checkForReplayRequest should not set pending flag if there are no new requests", func(t *testing.T) { - expected := int64(NoNewReplayRequests) - lp.LogPoller.replay.requestBlock = expected - actual := lp.LogPoller.checkForReplayRequest() - assertReplayInfo(expected, false) - assert.Equal(t, expected, actual) - assert.False(t, lp.LogPoller.ReplayPending()) + t.Run("checkForReplayRequest should not enter pending state if there are no requests", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 400 + lp.LogPoller.replay.status = ReplayStatusComplete + assert.False(t, lp.LogPoller.checkForReplayRequest()) + assertReplayInfo(400, ReplayStatusComplete) + assert.Equal(t, ReplayStatusComplete, lp.LogPoller.ReplayStatus()) }) - t.Run("checkForReplayRequest should set pending flag if there is a new request", func(t *testing.T) { - expected := int64(3) - lp.LogPoller.replay.requestBlock = expected - actual := lp.LogPoller.checkForReplayRequest() - assertReplayInfo(expected, true) - assert.Equal(t, expected, actual) - assert.True(t, lp.LogPoller.ReplayPending()) + t.Run("checkForReplayRequest should enter pending state if there is a new request", func(t *testing.T) { + lp.LogPoller.replay.status = ReplayStatusRequested + lp.LogPoller.replay.requestBlock = 18 + assert.True(t, lp.LogPoller.checkForReplayRequest()) + assertReplayInfo(18, ReplayStatusPending) + assert.Equal(t, ReplayStatusPending, lp.LogPoller.ReplayStatus()) }) - t.Run("replayComplete clears pending flag and resets requestBlock", func(t *testing.T) { - lp.LogPoller.replay.requestBlock = NoNewReplayRequests - lp.LogPoller.replay.pending = true + t.Run("replayComplete enters ReplayComplete state", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 10 + lp.LogPoller.replay.status = ReplayStatusPending lp.LogPoller.replayComplete(8, 20) - assertReplayInfo(NoNewReplayRequests, false) + assertReplayInfo(10, ReplayStatusComplete) }) - t.Run("replayComplete clears pending flag but does not reset requestBlock if lower block request received", func(t *testing.T) { + t.Run("replayComplete stays in pending state if lower block request received", func(t *testing.T) { lp.LogPoller.replay.requestBlock = 3 - lp.LogPoller.replay.pending = true + lp.LogPoller.replay.status = ReplayStatusPending lp.LogPoller.replayComplete(8, 20) - assertReplayInfo(3, false) + assertReplayInfo(3, ReplayStatusRequested) }) } diff --git a/pkg/solana/logpoller/mock_filters.go b/pkg/solana/logpoller/mock_filters.go index 8fc7a9469..38fa81596 100644 --- a/pkg/solana/logpoller/mock_filters.go +++ b/pkg/solana/logpoller/mock_filters.go @@ -564,22 +564,9 @@ func (_c *mockFilters_UnregisterFilter_Call) RunAndReturn(run func(context.Conte return _c } -// UpdateStartingBlocks provides a mock function with given fields: ctx, startingBlocks -func (_m *mockFilters) UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error { - ret := _m.Called(ctx, startingBlocks) - - if len(ret) == 0 { - panic("no return value specified for UpdateStartingBlocks") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { - r0 = rf(ctx, startingBlocks) - } else { - r0 = ret.Error(0) - } - - return r0 +// UpdateStartingBlocks provides a mock function with given fields: startingBlocks +func (_m *mockFilters) UpdateStartingBlocks(startingBlocks int64) { + _m.Called(startingBlocks) } // mockFilters_UpdateStartingBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStartingBlocks' @@ -588,25 +575,24 @@ type mockFilters_UpdateStartingBlocks_Call struct { } // UpdateStartingBlocks is a helper method to define mock.On call -// - ctx context.Context // - startingBlocks int64 -func (_e *mockFilters_Expecter) UpdateStartingBlocks(ctx interface{}, startingBlocks interface{}) *mockFilters_UpdateStartingBlocks_Call { - return &mockFilters_UpdateStartingBlocks_Call{Call: _e.mock.On("UpdateStartingBlocks", ctx, startingBlocks)} +func (_e *mockFilters_Expecter) UpdateStartingBlocks(startingBlocks interface{}) *mockFilters_UpdateStartingBlocks_Call { + return &mockFilters_UpdateStartingBlocks_Call{Call: _e.mock.On("UpdateStartingBlocks", startingBlocks)} } -func (_c *mockFilters_UpdateStartingBlocks_Call) Run(run func(ctx context.Context, startingBlocks int64)) *mockFilters_UpdateStartingBlocks_Call { +func (_c *mockFilters_UpdateStartingBlocks_Call) Run(run func(startingBlocks int64)) *mockFilters_UpdateStartingBlocks_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) + run(args[0].(int64)) }) return _c } -func (_c *mockFilters_UpdateStartingBlocks_Call) Return(_a0 error) *mockFilters_UpdateStartingBlocks_Call { - _c.Call.Return(_a0) +func (_c *mockFilters_UpdateStartingBlocks_Call) Return() *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Return() return _c } -func (_c *mockFilters_UpdateStartingBlocks_Call) RunAndReturn(run func(context.Context, int64) error) *mockFilters_UpdateStartingBlocks_Call { +func (_c *mockFilters_UpdateStartingBlocks_Call) RunAndReturn(run func(int64)) *mockFilters_UpdateStartingBlocks_Call { _c.Call.Return(run) return _c } diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index e3c45e989..dbf5eabe2 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -266,3 +266,27 @@ func newIndexedValue(typedVal any) (iVal IndexedValue, err error) { } return nil, fmt.Errorf("can't create indexed value from type %T", typedVal) } + +type ReplayStatus int + +const ( + ReplayStatusNoRequest ReplayStatus = iota + ReplayStatusRequested + ReplayStatusPending + ReplayStatusComplete +) + +func (rs ReplayStatus) String() string { + switch rs { + case ReplayStatusNoRequest: + return "NoRequest" + case ReplayStatusRequested: + return "Requested" + case ReplayStatusPending: + return "Pending" + case ReplayStatusComplete: + return "Complete" + default: + return fmt.Sprintf("invalid status: %d", rs) // Handle unknown cases + } +}