From d926bbe1eec52a0bc2415dc6a15efcfd46260e3c Mon Sep 17 00:00:00 2001 From: Domino Valdano Date: Mon, 17 Feb 2025 08:52:33 -0800 Subject: [PATCH] Add global replay feature to LogPoller (#1076) * Add global replay feature to LogPoller * Warn about redundant replay requests, use NoNewReplayRequests instead of > 0 * Remove persistence of UpdateStartingBlock filter changes to db * make generate * Change ReplayPending bool to ReplayStatus, returning enum * spell properly properly --- pkg/solana/logpoller/filters.go | 25 +++++++ pkg/solana/logpoller/filters_test.go | 75 ++++++++++++++++++- pkg/solana/logpoller/log_poller.go | 97 ++++++++++++++++++++++++- pkg/solana/logpoller/log_poller_test.go | 78 ++++++++++++++++++++ pkg/solana/logpoller/mock_filters.go | 33 +++++++++ pkg/solana/logpoller/orm_test.go | 28 ++++--- pkg/solana/logpoller/types.go | 24 ++++++ 7 files changed, 348 insertions(+), 12 deletions(-) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 540fa5ae2..21ca75c3e 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -402,6 +402,31 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err return nil } +// UpdateStartingBlocks will update the starting blocks of all backfilled filters to startingBlock +// and set IsBackfilled=false for them. It will also update the starting blocks of any filters which +// are already scheduled for backfilling, but only if the new startingBlock is less than the one they +// currently have set. All filters will then be added to the filtersToBackfill index, so their logs will +// be (re-)fetched from the specified starting point on the next iteration of the main LogPoller run loop. +func (fl *filters) UpdateStartingBlocks(startingBlock int64) { + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + + startingBlocks := make(map[int64]int64, len(fl.filtersByID)) + for id, filter := range fl.filtersByID { + newStartingBlock := filter.StartingBlock + if filter.IsBackfilled || startingBlock < newStartingBlock { + newStartingBlock = startingBlock + } + startingBlocks[id] = newStartingBlock + } + + for id, blk := range startingBlocks { + fl.filtersByID[id].IsBackfilled = false + fl.filtersByID[id].StartingBlock = blk + fl.filtersToBackfill[id] = struct{}{} + } +} + // LoadFilters - loads filters from database. Can be called multiple times without side effects. func (fl *filters) LoadFilters(ctx context.Context) error { if fl.loadedFilters.Load() { diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index c09dcea5a..8f763338b 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -426,7 +426,7 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) { ensureInQueue(notBackfilled, Filter{ID: 3, Name: "new filter"}) } -func TestExtractField(t *testing.T) { +func TestFilters_ExtractField(t *testing.T) { type innerInner struct { P string Q int @@ -487,3 +487,76 @@ func TestExtractField(t *testing.T) { }) } } + +func TestFilters_UpdateStartingBlocks(t *testing.T) { + orm := NewMockORM(t) + lggr := logger.Sugared(logger.Test(t)) + filters := newFilters(lggr, orm) + + origFilters := []Filter{{ + ID: 1, + Name: "backfilled", + StartingBlock: 29500, + IsBackfilled: true, + }, { + ID: 2, + StartingBlock: 52000, + Name: "notBackfilled", + }} + ids := make([]int64, 2) + for i, filter := range origFilters { + ids[i] = filter.ID + } + + var err error + + cases := []struct { + name string + replayBlock int64 + expectedBlocks []int64 + }{ + { + name: "updates StartingBlock of both filters", + replayBlock: 51500, + expectedBlocks: []int64{51500, 51500}}, + { + name: "updates StartingBlock of backfilled filter", + replayBlock: 53000, + expectedBlocks: []int64{53000, origFilters[1].StartingBlock}, + }, + } + + orm.EXPECT().SelectFilters(mock.Anything).Return(origFilters, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + }, nil) + + err = filters.LoadFilters(tests.Context(t)) + require.NoError(t, err) + // ensure both filters were loaded + require.Equal(t, origFilters[0], *filters.filtersByID[ids[0]]) + require.Equal(t, origFilters[1], *filters.filtersByID[ids[1]]) + // ensure non-backfilled filters were added to filtersToBackfill + require.Len(t, filters.filtersToBackfill, 1) + require.Contains(t, filters.filtersToBackfill, origFilters[1].ID) + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + newFilters := make([]Filter, len(origFilters)) + copy(newFilters, origFilters) + filters.filtersByID[ids[0]] = &newFilters[0] + filters.filtersByID[ids[1]] = &newFilters[1] + filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}} + filters.UpdateStartingBlocks(tt.replayBlock) + assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue + + for i, id := range ids { + assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock, + "unexpected starting block for \"%s\" filter", filters.filtersByID[id].Name) + assert.False(t, filters.filtersByID[id].IsBackfilled) + assert.Contains(t, filters.filtersToBackfill, id) + } + }) + } +} diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index abe28fca8..256f51096 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -8,6 +8,7 @@ import ( "fmt" "iter" "math" + "sync" "time" "github.com/gagliardetto/solana-go/rpc" @@ -18,8 +19,7 @@ import ( ) var ( - ErrFilterNameConflict = errors.New("filter with such name already exists") - ErrMissingDiscriminator = errors.New("Solana log is missing discriminator") + ErrFilterNameConflict = errors.New("filter with such name already exists") ) type ORM interface { @@ -49,11 +49,24 @@ type filtersI interface { GetDistinctAddresses(ctx context.Context) ([]PublicKey, error) GetFiltersToBackfill() []Filter MarkFilterBackfilled(ctx context.Context, filterID int64) error + UpdateStartingBlocks(startingBlocks int64) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error) IncrementSeqNum(filterID int64) int64 } +type ReplayInfo struct { + mut sync.RWMutex + requestBlock int64 + status ReplayStatus +} + +// hasRequest returns true if a new request has been received (since the last request completed), +// whether or not it is pending yet +func (r *ReplayInfo) hasRequest() bool { + return r.status == ReplayStatusRequested || r.status == ReplayStatusPending +} + type Service struct { services.Service eng *services.Engine @@ -61,6 +74,7 @@ type Service struct { lggr logger.SugaredLogger orm ORM lastProcessedSlot int64 + replay ReplayInfo client RPCClient loader logsLoader filters filtersI @@ -87,6 +101,7 @@ func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { }, }.NewServiceEngine(lggr) lp.lggr = lp.eng.SugaredLogger + lp.replay.status = ReplayStatusNoRequest return lp } @@ -210,6 +225,43 @@ func (lp *Service) UnregisterFilter(ctx context.Context, name string) error { return lp.filters.UnregisterFilter(ctx, name) } +// Replay submits a new replay request. If there was already a new replay request +// submitted since the last replay completed, it will be updated to the earlier of the +// two requested fromBlock's. The expectation is that, on the next timer tick of the +// LogPoller run loop it will backfill all filters starting from fromBlock. If there +// are new filters in the backfill queue, with an earlier StartingBlock, then they +// will get backfilled from there instead. +func (lp *Service) Replay(fromBlock int64) error { + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + if lp.replay.hasRequest() && lp.replay.requestBlock <= fromBlock { + // Already requested, no further action required + lp.lggr.Warnf("Ignoring redundant request to replay from block %d, replay from block %d already requested", + fromBlock, lp.replay.requestBlock) + return nil + } + lp.filters.UpdateStartingBlocks(fromBlock) + lp.replay.requestBlock = fromBlock + if lp.replay.status != ReplayStatusPending { + lp.replay.status = ReplayStatusRequested + } + + return nil +} + +// ReplayStatus returns the current replay status of LogPoller: +// +// NoRequests - there have not been any replay requests yet since node startup +// Requested - a replay has been requested, but has not started yet +// Pending - a replay is currently in progress +// Complete - there was at least one replay executed since startup, but all have since completed +func (lp *Service) ReplayStatus() ReplayStatus { + lp.replay.mut.RLock() + defer lp.replay.mut.RUnlock() + return lp.replay.status +} + func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { if lp.lastProcessedSlot != 0 { return lp.lastProcessedSlot, nil @@ -237,10 +289,28 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { return int64(latestFinalizedSlot) - 1, nil // } +// checkForReplayRequest checks whether there have been any new replay requests since it was last called, +// and if so sets the pending flag to true and returns the block number +func (lp *Service) checkForReplayRequest() bool { + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + if !lp.replay.hasRequest() { + return false + } + + lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock) + lp.replay.status = ReplayStatusPending + return true +} + func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error { + isReplay := lp.checkForReplayRequest() + addressesSet := make(map[PublicKey]struct{}) addresses := make([]PublicKey, 0, len(filters)) minSlot := to + for _, filter := range filters { if _, ok := addressesSet[filter.Address]; !ok { addressesSet[filter.Address] = struct{}{} @@ -257,6 +327,10 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int } lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to) + if isReplay { + lp.replayComplete(minSlot, to) + } + for _, filter := range filters { filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID) if filterErr != nil { @@ -369,6 +443,25 @@ func (lp *Service) run(ctx context.Context) (err error) { return nil } +// replayComplete is called when a backfill associated with a current pending replay has just completed. +// Assuming there were no new requests to replay while the backfill was happening, it updates the replay +// status to ReplayStatusComplete. If there was a request for a lower block number in the meantime, then +// the status will revert to ReplayStatusRequested +func (lp *Service) replayComplete(from, to int64) bool { + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + lp.lggr.Infow("replay complete", "from", from, "to", to) + + if lp.replay.requestBlock < from { + // received a new request with lower block number while replaying, we'll process that next time + lp.replay.status = ReplayStatusRequested + return false + } + lp.replay.status = ReplayStatusComplete + return true +} + func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block { for { select { diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index 32ecc911e..a59e6a2a0 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -209,6 +209,7 @@ func TestLogPoller_getLastProcessedSlot(t *testing.T) { } func TestLogPoller_processBlocksRange(t *testing.T) { + t.Parallel() t.Run("Returns error if failed to start backfill", func(t *testing.T) { lp := newMockedLP(t) expectedErr := errors.New("failed to start backfill") @@ -350,3 +351,80 @@ func TestProcess(t *testing.T) { err = lp.UnregisterFilter(ctx, filter.Name) require.NoError(t, err) } + +func Test_LogPoller_Replay(t *testing.T) { + t.Parallel() + fromBlock := int64(5) + + lp := newMockedLP(t) + assertReplayInfo := func(requestBlock int64, status ReplayStatus) { + assert.Equal(t, requestBlock, lp.LogPoller.replay.requestBlock) + assert.Equal(t, status, lp.LogPoller.replay.status) + } + + t.Run("ReplayInfo state initialized properly", func(t *testing.T) { + assertReplayInfo(0, ReplayStatusNoRequest) + }) + + t.Run("ordinary replay request", func(t *testing.T) { + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock).Once() + err := lp.LogPoller.Replay(fromBlock) + require.NoError(t, err) + assertReplayInfo(fromBlock, ReplayStatusRequested) + }) + + t.Run("redundant replay request", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = fromBlock + lp.LogPoller.replay.status = ReplayStatusRequested + err := lp.LogPoller.Replay(fromBlock + 10) + require.NoError(t, err) + assertReplayInfo(fromBlock, ReplayStatusRequested) + }) + + t.Run("replay request updated", func(t *testing.T) { + lp.LogPoller.replay.status = ReplayStatusNoRequest + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once() + err := lp.LogPoller.Replay(fromBlock - 1) + require.NoError(t, err) + assertReplayInfo(fromBlock-1, ReplayStatusRequested) + }) + + t.Run("replay request updated while pending", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = fromBlock + lp.LogPoller.replay.status = ReplayStatusPending + lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once() + err := lp.LogPoller.Replay(fromBlock - 1) + require.NoError(t, err) + assertReplayInfo(fromBlock-1, ReplayStatusPending) + }) + + t.Run("checkForReplayRequest should not enter pending state if there are no requests", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 400 + lp.LogPoller.replay.status = ReplayStatusComplete + assert.False(t, lp.LogPoller.checkForReplayRequest()) + assertReplayInfo(400, ReplayStatusComplete) + assert.Equal(t, ReplayStatusComplete, lp.LogPoller.ReplayStatus()) + }) + + t.Run("checkForReplayRequest should enter pending state if there is a new request", func(t *testing.T) { + lp.LogPoller.replay.status = ReplayStatusRequested + lp.LogPoller.replay.requestBlock = 18 + assert.True(t, lp.LogPoller.checkForReplayRequest()) + assertReplayInfo(18, ReplayStatusPending) + assert.Equal(t, ReplayStatusPending, lp.LogPoller.ReplayStatus()) + }) + + t.Run("replayComplete enters ReplayComplete state", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 10 + lp.LogPoller.replay.status = ReplayStatusPending + lp.LogPoller.replayComplete(8, 20) + assertReplayInfo(10, ReplayStatusComplete) + }) + + t.Run("replayComplete stays in pending state if lower block request received", func(t *testing.T) { + lp.LogPoller.replay.requestBlock = 3 + lp.LogPoller.replay.status = ReplayStatusPending + lp.LogPoller.replayComplete(8, 20) + assertReplayInfo(3, ReplayStatusRequested) + }) +} diff --git a/pkg/solana/logpoller/mock_filters.go b/pkg/solana/logpoller/mock_filters.go index b0be9dcbb..38fa81596 100644 --- a/pkg/solana/logpoller/mock_filters.go +++ b/pkg/solana/logpoller/mock_filters.go @@ -564,6 +564,39 @@ func (_c *mockFilters_UnregisterFilter_Call) RunAndReturn(run func(context.Conte return _c } +// UpdateStartingBlocks provides a mock function with given fields: startingBlocks +func (_m *mockFilters) UpdateStartingBlocks(startingBlocks int64) { + _m.Called(startingBlocks) +} + +// mockFilters_UpdateStartingBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStartingBlocks' +type mockFilters_UpdateStartingBlocks_Call struct { + *mock.Call +} + +// UpdateStartingBlocks is a helper method to define mock.On call +// - startingBlocks int64 +func (_e *mockFilters_Expecter) UpdateStartingBlocks(startingBlocks interface{}) *mockFilters_UpdateStartingBlocks_Call { + return &mockFilters_UpdateStartingBlocks_Call{Call: _e.mock.On("UpdateStartingBlocks", startingBlocks)} +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) Run(run func(startingBlocks int64)) *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) Return() *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Return() + return _c +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) RunAndReturn(run func(int64)) *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Return(run) + return _c +} + // newMockFilters creates a new instance of mockFilters. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockFilters(t interface { diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 7a50b8b08..4187e54ff 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -178,7 +178,18 @@ func TestLogPollerFilters(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 0) }) - t.Run("MarkBackfilled updated corresponding filed", func(t *testing.T) { + + genEnsureIsBackfilled := func(ctx context.Context, orm *DSORM) func([]int64, bool) { + return func(filterIDs []int64, expectedIsBackfilled bool) { + for _, filterID := range filterIDs { + filter, err := orm.GetFilterByID(ctx, filterID) + require.NoError(t, err) + require.Equal(t, expectedIsBackfilled, filter.IsBackfilled) + } + } + } + + t.Run("MarkBackfilled updated corresponding field", func(t *testing.T) { dbx := sqltest.NewDB(t, sqltest.TestURL(t)) chainID := uuid.NewString() orm := NewORM(chainID, dbx, lggr) @@ -187,22 +198,21 @@ func TestLogPollerFilters(t *testing.T) { ctx := tests.Context(t) filter.IsBackfilled = true filterID, err := orm.InsertFilter(ctx, filter) + filterIDs := []int64{filterID} require.NoError(t, err) - ensureIsBackfilled := func(expectedIsBackfilled bool) { - filter, err = orm.GetFilterByID(ctx, filterID) - require.NoError(t, err) - require.Equal(t, expectedIsBackfilled, filter.IsBackfilled) - } - ensureIsBackfilled(true) + + ensureIsBackfilled := genEnsureIsBackfilled(ctx, orm) + + ensureIsBackfilled(filterIDs, true) // insert overrides filter.IsBackfilled = false _, err = orm.InsertFilter(ctx, filter) require.NoError(t, err) - ensureIsBackfilled(false) + ensureIsBackfilled(filterIDs, false) // mark changes value to true err = orm.MarkFilterBackfilled(ctx, filterID) require.NoError(t, err) - ensureIsBackfilled(true) + ensureIsBackfilled(filterIDs, true) }) } diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index e3c45e989..dbf5eabe2 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -266,3 +266,27 @@ func newIndexedValue(typedVal any) (iVal IndexedValue, err error) { } return nil, fmt.Errorf("can't create indexed value from type %T", typedVal) } + +type ReplayStatus int + +const ( + ReplayStatusNoRequest ReplayStatus = iota + ReplayStatusRequested + ReplayStatusPending + ReplayStatusComplete +) + +func (rs ReplayStatus) String() string { + switch rs { + case ReplayStatusNoRequest: + return "NoRequest" + case ReplayStatusRequested: + return "Requested" + case ReplayStatusPending: + return "Pending" + case ReplayStatusComplete: + return "Complete" + default: + return fmt.Sprintf("invalid status: %d", rs) // Handle unknown cases + } +}