Skip to content

Commit

Permalink
Add global replay feature to LogPoller (#1076)
Browse files Browse the repository at this point in the history
* Add global replay feature to LogPoller

* Warn about redundant replay requests, use NoNewReplayRequests instead of > 0

* Remove persistence of UpdateStartingBlock filter changes to db

* make generate

* Change ReplayPending bool to ReplayStatus, returning enum

* spell properly properly
  • Loading branch information
reductionista authored Feb 17, 2025
1 parent 3a27fc7 commit d926bbe
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 12 deletions.
25 changes: 25 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,31 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err
return nil
}

// UpdateStartingBlocks will update the starting blocks of all backfilled filters to startingBlock
// and set IsBackfilled=false for them. It will also update the starting blocks of any filters which
// are already scheduled for backfilling, but only if the new startingBlock is less than the one they
// currently have set. All filters will then be added to the filtersToBackfill index, so their logs will
// be (re-)fetched from the specified starting point on the next iteration of the main LogPoller run loop.
func (fl *filters) UpdateStartingBlocks(startingBlock int64) {
fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()

startingBlocks := make(map[int64]int64, len(fl.filtersByID))
for id, filter := range fl.filtersByID {
newStartingBlock := filter.StartingBlock
if filter.IsBackfilled || startingBlock < newStartingBlock {
newStartingBlock = startingBlock
}
startingBlocks[id] = newStartingBlock
}

for id, blk := range startingBlocks {
fl.filtersByID[id].IsBackfilled = false
fl.filtersByID[id].StartingBlock = blk
fl.filtersToBackfill[id] = struct{}{}
}
}

// LoadFilters - loads filters from database. Can be called multiple times without side effects.
func (fl *filters) LoadFilters(ctx context.Context) error {
if fl.loadedFilters.Load() {
Expand Down
75 changes: 74 additions & 1 deletion pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) {
ensureInQueue(notBackfilled, Filter{ID: 3, Name: "new filter"})
}

func TestExtractField(t *testing.T) {
func TestFilters_ExtractField(t *testing.T) {
type innerInner struct {
P string
Q int
Expand Down Expand Up @@ -487,3 +487,76 @@ func TestExtractField(t *testing.T) {
})
}
}

func TestFilters_UpdateStartingBlocks(t *testing.T) {
orm := NewMockORM(t)
lggr := logger.Sugared(logger.Test(t))
filters := newFilters(lggr, orm)

origFilters := []Filter{{
ID: 1,
Name: "backfilled",
StartingBlock: 29500,
IsBackfilled: true,
}, {
ID: 2,
StartingBlock: 52000,
Name: "notBackfilled",
}}
ids := make([]int64, 2)
for i, filter := range origFilters {
ids[i] = filter.ID
}

var err error

cases := []struct {
name string
replayBlock int64
expectedBlocks []int64
}{
{
name: "updates StartingBlock of both filters",
replayBlock: 51500,
expectedBlocks: []int64{51500, 51500}},
{
name: "updates StartingBlock of backfilled filter",
replayBlock: 53000,
expectedBlocks: []int64{53000, origFilters[1].StartingBlock},
},
}

orm.EXPECT().SelectFilters(mock.Anything).Return(origFilters, nil).Once()
orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{
1: 18,
2: 25,
}, nil)

err = filters.LoadFilters(tests.Context(t))
require.NoError(t, err)
// ensure both filters were loaded
require.Equal(t, origFilters[0], *filters.filtersByID[ids[0]])
require.Equal(t, origFilters[1], *filters.filtersByID[ids[1]])
// ensure non-backfilled filters were added to filtersToBackfill
require.Len(t, filters.filtersToBackfill, 1)
require.Contains(t, filters.filtersToBackfill, origFilters[1].ID)

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
newFilters := make([]Filter, len(origFilters))
copy(newFilters, origFilters)
filters.filtersByID[ids[0]] = &newFilters[0]
filters.filtersByID[ids[1]] = &newFilters[1]
filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}}
filters.UpdateStartingBlocks(tt.replayBlock)
assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue

for i, id := range ids {
assert.Equal(t, tt.expectedBlocks[i], filters.filtersByID[id].StartingBlock,
"unexpected starting block for \"%s\" filter", filters.filtersByID[id].Name)
assert.False(t, filters.filtersByID[id].IsBackfilled)
assert.Contains(t, filters.filtersToBackfill, id)
}
})
}
}
97 changes: 95 additions & 2 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"iter"
"math"
"sync"
"time"

"github.com/gagliardetto/solana-go/rpc"
Expand All @@ -18,8 +19,7 @@ import (
)

var (
ErrFilterNameConflict = errors.New("filter with such name already exists")
ErrMissingDiscriminator = errors.New("Solana log is missing discriminator")
ErrFilterNameConflict = errors.New("filter with such name already exists")
)

type ORM interface {
Expand Down Expand Up @@ -49,18 +49,32 @@ type filtersI interface {
GetDistinctAddresses(ctx context.Context) ([]PublicKey, error)
GetFiltersToBackfill() []Filter
MarkFilterBackfilled(ctx context.Context, filterID int64) error
UpdateStartingBlocks(startingBlocks int64)
MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter]
DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error)
IncrementSeqNum(filterID int64) int64
}

type ReplayInfo struct {
mut sync.RWMutex
requestBlock int64
status ReplayStatus
}

// hasRequest returns true if a new request has been received (since the last request completed),
// whether or not it is pending yet
func (r *ReplayInfo) hasRequest() bool {
return r.status == ReplayStatusRequested || r.status == ReplayStatusPending
}

type Service struct {
services.Service
eng *services.Engine

lggr logger.SugaredLogger
orm ORM
lastProcessedSlot int64
replay ReplayInfo
client RPCClient
loader logsLoader
filters filtersI
Expand All @@ -87,6 +101,7 @@ func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service {
},
}.NewServiceEngine(lggr)
lp.lggr = lp.eng.SugaredLogger
lp.replay.status = ReplayStatusNoRequest

return lp
}
Expand Down Expand Up @@ -210,6 +225,43 @@ func (lp *Service) UnregisterFilter(ctx context.Context, name string) error {
return lp.filters.UnregisterFilter(ctx, name)
}

// Replay submits a new replay request. If there was already a new replay request
// submitted since the last replay completed, it will be updated to the earlier of the
// two requested fromBlock's. The expectation is that, on the next timer tick of the
// LogPoller run loop it will backfill all filters starting from fromBlock. If there
// are new filters in the backfill queue, with an earlier StartingBlock, then they
// will get backfilled from there instead.
func (lp *Service) Replay(fromBlock int64) error {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.hasRequest() && lp.replay.requestBlock <= fromBlock {
// Already requested, no further action required
lp.lggr.Warnf("Ignoring redundant request to replay from block %d, replay from block %d already requested",
fromBlock, lp.replay.requestBlock)
return nil
}
lp.filters.UpdateStartingBlocks(fromBlock)
lp.replay.requestBlock = fromBlock
if lp.replay.status != ReplayStatusPending {
lp.replay.status = ReplayStatusRequested
}

return nil
}

// ReplayStatus returns the current replay status of LogPoller:
//
// NoRequests - there have not been any replay requests yet since node startup
// Requested - a replay has been requested, but has not started yet
// Pending - a replay is currently in progress
// Complete - there was at least one replay executed since startup, but all have since completed
func (lp *Service) ReplayStatus() ReplayStatus {
lp.replay.mut.RLock()
defer lp.replay.mut.RUnlock()
return lp.replay.status
}

func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {
if lp.lastProcessedSlot != 0 {
return lp.lastProcessedSlot, nil
Expand Down Expand Up @@ -237,10 +289,28 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {
return int64(latestFinalizedSlot) - 1, nil //
}

// checkForReplayRequest checks whether there have been any new replay requests since it was last called,
// and if so sets the pending flag to true and returns the block number
func (lp *Service) checkForReplayRequest() bool {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if !lp.replay.hasRequest() {
return false
}

lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock)
lp.replay.status = ReplayStatusPending
return true
}

func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error {
isReplay := lp.checkForReplayRequest()

addressesSet := make(map[PublicKey]struct{})
addresses := make([]PublicKey, 0, len(filters))
minSlot := to

for _, filter := range filters {
if _, ok := addressesSet[filter.Address]; !ok {
addressesSet[filter.Address] = struct{}{}
Expand All @@ -257,6 +327,10 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int
}

lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to)
if isReplay {
lp.replayComplete(minSlot, to)
}

for _, filter := range filters {
filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID)
if filterErr != nil {
Expand Down Expand Up @@ -369,6 +443,25 @@ func (lp *Service) run(ctx context.Context) (err error) {
return nil
}

// replayComplete is called when a backfill associated with a current pending replay has just completed.
// Assuming there were no new requests to replay while the backfill was happening, it updates the replay
// status to ReplayStatusComplete. If there was a request for a lower block number in the meantime, then
// the status will revert to ReplayStatusRequested
func (lp *Service) replayComplete(from, to int64) bool {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

lp.lggr.Infow("replay complete", "from", from, "to", to)

if lp.replay.requestBlock < from {
// received a new request with lower block number while replaying, we'll process that next time
lp.replay.status = ReplayStatusRequested
return false
}
lp.replay.status = ReplayStatusComplete
return true
}

func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
for {
select {
Expand Down
78 changes: 78 additions & 0 deletions pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func TestLogPoller_getLastProcessedSlot(t *testing.T) {
}

func TestLogPoller_processBlocksRange(t *testing.T) {
t.Parallel()
t.Run("Returns error if failed to start backfill", func(t *testing.T) {
lp := newMockedLP(t)
expectedErr := errors.New("failed to start backfill")
Expand Down Expand Up @@ -350,3 +351,80 @@ func TestProcess(t *testing.T) {
err = lp.UnregisterFilter(ctx, filter.Name)
require.NoError(t, err)
}

func Test_LogPoller_Replay(t *testing.T) {
t.Parallel()
fromBlock := int64(5)

lp := newMockedLP(t)
assertReplayInfo := func(requestBlock int64, status ReplayStatus) {
assert.Equal(t, requestBlock, lp.LogPoller.replay.requestBlock)
assert.Equal(t, status, lp.LogPoller.replay.status)
}

t.Run("ReplayInfo state initialized properly", func(t *testing.T) {
assertReplayInfo(0, ReplayStatusNoRequest)
})

t.Run("ordinary replay request", func(t *testing.T) {
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock).Once()
err := lp.LogPoller.Replay(fromBlock)
require.NoError(t, err)
assertReplayInfo(fromBlock, ReplayStatusRequested)
})

t.Run("redundant replay request", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = fromBlock
lp.LogPoller.replay.status = ReplayStatusRequested
err := lp.LogPoller.Replay(fromBlock + 10)
require.NoError(t, err)
assertReplayInfo(fromBlock, ReplayStatusRequested)
})

t.Run("replay request updated", func(t *testing.T) {
lp.LogPoller.replay.status = ReplayStatusNoRequest
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once()
err := lp.LogPoller.Replay(fromBlock - 1)
require.NoError(t, err)
assertReplayInfo(fromBlock-1, ReplayStatusRequested)
})

t.Run("replay request updated while pending", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = fromBlock
lp.LogPoller.replay.status = ReplayStatusPending
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once()
err := lp.LogPoller.Replay(fromBlock - 1)
require.NoError(t, err)
assertReplayInfo(fromBlock-1, ReplayStatusPending)
})

t.Run("checkForReplayRequest should not enter pending state if there are no requests", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 400
lp.LogPoller.replay.status = ReplayStatusComplete
assert.False(t, lp.LogPoller.checkForReplayRequest())
assertReplayInfo(400, ReplayStatusComplete)
assert.Equal(t, ReplayStatusComplete, lp.LogPoller.ReplayStatus())
})

t.Run("checkForReplayRequest should enter pending state if there is a new request", func(t *testing.T) {
lp.LogPoller.replay.status = ReplayStatusRequested
lp.LogPoller.replay.requestBlock = 18
assert.True(t, lp.LogPoller.checkForReplayRequest())
assertReplayInfo(18, ReplayStatusPending)
assert.Equal(t, ReplayStatusPending, lp.LogPoller.ReplayStatus())
})

t.Run("replayComplete enters ReplayComplete state", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 10
lp.LogPoller.replay.status = ReplayStatusPending
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(10, ReplayStatusComplete)
})

t.Run("replayComplete stays in pending state if lower block request received", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 3
lp.LogPoller.replay.status = ReplayStatusPending
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(3, ReplayStatusRequested)
})
}
Loading

0 comments on commit d926bbe

Please sign in to comment.