From de84b8b3875d70672f688f12ce952b167290d286 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 12 Feb 2025 21:36:00 -0800 Subject: [PATCH] Add global replay feature to LogPoller --- pkg/solana/logpoller/filters.go | 20 ++++++++ pkg/solana/logpoller/log_poller.go | 77 ++++++++++++++++++++++++++++ pkg/solana/logpoller/mock_filters.go | 47 +++++++++++++++++ pkg/solana/logpoller/mock_orm.go | 47 +++++++++++++++++ pkg/solana/logpoller/orm.go | 16 ++++++ 5 files changed, 207 insertions(+) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index f98f7aa26..6178c8c22 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -393,6 +393,26 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err return nil } +func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64) error { + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + + startingBlocks := make(map[int64]int64, len(fl.filtersByID)) + for id, filter := range fl.filtersByID { + if startingBlock < filter.StartingBlock { + startingBlocks[id] = startingBlock + } + } + err := fl.orm.UpdateStartingBlocks(ctx, startingBlocks) + if err != nil { + return err + } + for id, blk := range startingBlocks { + fl.filtersByID[id].StartingBlock = blk + } + return nil +} + // LoadFilters - loads filters from database. Can be called multiple times without side effects. func (fl *filters) LoadFilters(ctx context.Context) error { if fl.loadedFilters.Load() { diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 7dd2c42d2..acb629b6b 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -8,6 +8,7 @@ import ( "fmt" "iter" "math" + "sync" "time" "github.com/gagliardetto/solana-go/rpc" @@ -29,6 +30,7 @@ type ORM interface { DeleteFilters(ctx context.Context, filters map[int64]Filter) error MarkFilterDeleted(ctx context.Context, id int64) (err error) MarkFilterBackfilled(ctx context.Context, id int64) (err error) + UpdateStartingBlocks(ctx context.Context, startingBlock map[int64]int64) (err error) GetLatestBlock(ctx context.Context) (int64, error) InsertLogs(context.Context, []Log) (err error) SelectSeqNums(ctx context.Context) (map[int64]int64, error) @@ -47,11 +49,18 @@ type filtersI interface { GetDistinctAddresses(ctx context.Context) ([]PublicKey, error) GetFiltersToBackfill() []Filter MarkFilterBackfilled(ctx context.Context, filterID int64) error + UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error) IncrementSeqNum(filterID int64) int64 } +type ReplayInfo struct { + mut sync.RWMutex + requestBlock int64 + pending bool +} + type Service struct { services.Service eng *services.Engine @@ -59,6 +68,7 @@ type Service struct { lggr logger.SugaredLogger orm ORM lastProcessedSlot int64 + replay ReplayInfo client RPCClient loader logsLoader filters filtersI @@ -201,6 +211,38 @@ func (lp *Service) UnregisterFilter(ctx context.Context, name string) error { return lp.filters.UnregisterFilter(ctx, name) } +func (lp *Service) Replay(ctx context.Context, fromBlock int64) error { + ctx, cancel := lp.eng.Ctx(ctx) + defer cancel() + + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + if lp.replay.requestBlock != 0 && fromBlock >= lp.replay.requestBlock { + // Already requested, no further action required + return nil + } + err := lp.filters.UpdateStartingBlocks(ctx, fromBlock) + if err != nil { + return err + } + lp.replay.requestBlock = fromBlock + + return nil +} + +// ReplayPending returns the current replay status of LogPoller. true indicates there is a replay currently in progress. +// False means there is no replay in progress. Some subtleties to bear in mind: +// 1. if a new request has been submitted since the last replay finished, but the backfilling hasn't been started yet, +// this will still return false. +// 2. It may also return false if the node is restarted after a replay request was submitted but before it completes, +// even though it will still know to finish backfilling everything needed in order to satisfy the replay request. +func (lp *Service) ReplayPending() bool { + lp.replay.mut.RLock() + defer lp.replay.mut.RUnlock() + return lp.replay.pending +} + func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { if lp.lastProcessedSlot != 0 { return lp.lastProcessedSlot, nil @@ -228,10 +270,28 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) { return int64(latestFinalizedSlot) - 1, nil // } +// checkForReplayRequest checks whether there have been any new replay requests since it was last called, +// and if so sets the pending flag to true and returns the block number +func (lp *Service) checkForReplayRequest() int64 { + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + if lp.replay.requestBlock < 0 { + return lp.replay.requestBlock + } + + lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock) + lp.replay.pending = true + return lp.replay.requestBlock +} + func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error { + replayBlock := lp.checkForReplayRequest() + addressesSet := make(map[PublicKey]struct{}) addresses := make([]PublicKey, 0, len(filters)) minSlot := to + for _, filter := range filters { if _, ok := addressesSet[filter.Address]; !ok { addressesSet[filter.Address] = struct{}{} @@ -248,6 +308,10 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int } lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to) + if replayBlock > 0 { + lp.replayComplete(replayBlock) + } + for _, filter := range filters { filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID) if filterErr != nil { @@ -360,6 +424,19 @@ func (lp *Service) run(ctx context.Context) (err error) { return nil } +func (lp *Service) replayComplete(replayBlock int64) { + lp.replay.mut.Lock() + defer lp.replay.mut.Unlock() + + if lp.replay.requestBlock < replayBlock { + // received a new request with lower block number while replaying, we'll process that next time + return + } + lp.replay.requestBlock = -1 + lp.replay.pending = false + lp.lggr.Infow("replay complete", "replayBlock", lp.replay.requestBlock) +} + func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block { for { select { diff --git a/pkg/solana/logpoller/mock_filters.go b/pkg/solana/logpoller/mock_filters.go index 96b14446c..160008a13 100644 --- a/pkg/solana/logpoller/mock_filters.go +++ b/pkg/solana/logpoller/mock_filters.go @@ -517,6 +517,53 @@ func (_c *mockFilters_UnregisterFilter_Call) RunAndReturn(run func(context.Conte return _c } +// UpdateStartingBlocks provides a mock function with given fields: ctx, startingBlocks +func (_m *mockFilters) UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error { + ret := _m.Called(ctx, startingBlocks) + + if len(ret) == 0 { + panic("no return value specified for UpdateStartingBlocks") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, startingBlocks) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockFilters_UpdateStartingBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStartingBlocks' +type mockFilters_UpdateStartingBlocks_Call struct { + *mock.Call +} + +// UpdateStartingBlocks is a helper method to define mock.On call +// - ctx context.Context +// - startingBlocks int64 +func (_e *mockFilters_Expecter) UpdateStartingBlocks(ctx interface{}, startingBlocks interface{}) *mockFilters_UpdateStartingBlocks_Call { + return &mockFilters_UpdateStartingBlocks_Call{Call: _e.mock.On("UpdateStartingBlocks", ctx, startingBlocks)} +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) Run(run func(ctx context.Context, startingBlocks int64)) *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) Return(_a0 error) *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockFilters_UpdateStartingBlocks_Call) RunAndReturn(run func(context.Context, int64) error) *mockFilters_UpdateStartingBlocks_Call { + _c.Call.Return(run) + return _c +} + // newMockFilters creates a new instance of mockFilters. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockFilters(t interface { diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go index 855683ae3..3382d5c39 100644 --- a/pkg/solana/logpoller/mock_orm.go +++ b/pkg/solana/logpoller/mock_orm.go @@ -545,6 +545,53 @@ func (_c *MockORM_SelectSeqNums_Call) RunAndReturn(run func(context.Context) (ma return _c } +// UpdateStartingBlocks provides a mock function with given fields: ctx, startingBlock +func (_m *MockORM) UpdateStartingBlocks(ctx context.Context, startingBlock map[int64]int64) error { + ret := _m.Called(ctx, startingBlock) + + if len(ret) == 0 { + panic("no return value specified for UpdateStartingBlocks") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[int64]int64) error); ok { + r0 = rf(ctx, startingBlock) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockORM_UpdateStartingBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStartingBlocks' +type MockORM_UpdateStartingBlocks_Call struct { + *mock.Call +} + +// UpdateStartingBlocks is a helper method to define mock.On call +// - ctx context.Context +// - startingBlock map[int64]int64 +func (_e *MockORM_Expecter) UpdateStartingBlocks(ctx interface{}, startingBlock interface{}) *MockORM_UpdateStartingBlocks_Call { + return &MockORM_UpdateStartingBlocks_Call{Call: _e.mock.On("UpdateStartingBlocks", ctx, startingBlock)} +} + +func (_c *MockORM_UpdateStartingBlocks_Call) Run(run func(ctx context.Context, startingBlock map[int64]int64)) *MockORM_UpdateStartingBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[int64]int64)) + }) + return _c +} + +func (_c *MockORM_UpdateStartingBlocks_Call) Return(err error) *MockORM_UpdateStartingBlocks_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockORM_UpdateStartingBlocks_Call) RunAndReturn(run func(context.Context, map[int64]int64) error) *MockORM_UpdateStartingBlocks_Call { + _c.Call.Return(run) + return _c +} + // NewMockORM creates a new instance of MockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockORM(t interface { diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index a06765559..d3509a01d 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -104,6 +104,22 @@ func (o *DSORM) MarkFilterBackfilled(ctx context.Context, id int64) (err error) return err } +// UpdateStartingBlocks accepts a map from filter ids to new startingBlock numbers. Each filter corresponding +// to an id in the map will have its starting_block column updated with the new value. is_backfilled for each +// filter will also be reset to false +func (o *DSORM) UpdateStartingBlocks(ctx context.Context, startingBlocks map[int64]int64) (err error) { + query := `UPDATE solana.log_poller_filters SET starting_block = $1, is_backfilled = false WHERE id=$2;` + return o.Transact(ctx, func(orm *DSORM) error { + for id, startingBlock := range startingBlocks { + _, err = o.ds.ExecContext(ctx, query, startingBlock, id) + if err != nil { + return err + } + } + return nil + }) +} + func (o *DSORM) DeleteFilter(ctx context.Context, id int64) (err error) { query := `DELETE FROM solana.log_poller_filters WHERE id = $1` _, err = o.ds.ExecContext(ctx, query, id)