Skip to content

Commit

Permalink
Change ReplayPending bool to ReplayStatus, returning enum
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 15, 2025
1 parent 0066ac4 commit 318ee59
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 98 deletions.
3 changes: 1 addition & 2 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err
// are already scheduled for backfilling, but only if the new startingBlock is less than the one they
// currently have set. All filters will then be added to the filtersToBackfill index, so their logs will
// be (re-)fetched from the specified starting point on the next iteration of the main LogPoller run loop.
func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64) error {
func (fl *filters) UpdateStartingBlocks(startingBlock int64) {
fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()

Expand All @@ -416,7 +416,6 @@ func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64
fl.filtersByID[id].StartingBlock = blk
fl.filtersToBackfill[id] = struct{}{}
}
return nil
}

// LoadFilters - loads filters from database. Can be called multiple times without side effects.
Expand Down
5 changes: 1 addition & 4 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,6 @@ func TestFilters_ExtractField(t *testing.T) {
}

func TestFilters_UpdateStartingBlocks(t *testing.T) {
ctx := tests.Context(t)
orm := NewMockORM(t)
lggr := logger.Sugared(logger.Test(t))
filters := newFilters(lggr, orm)
Expand Down Expand Up @@ -549,9 +548,7 @@ func TestFilters_UpdateStartingBlocks(t *testing.T) {
filters.filtersByID[ids[0]] = &newFilters[0]
filters.filtersByID[ids[1]] = &newFilters[1]
filters.filtersToBackfill = map[int64]struct{}{ids[0]: {}}
orm.EXPECT().UpdateStartingBlocks(mock.Anything, mock.Anything).Once().Return(nil)
err = filters.UpdateStartingBlocks(ctx, tt.replayBlock)
require.NoError(t, err)
filters.UpdateStartingBlocks(tt.replayBlock)
assert.Len(t, filters.filtersToBackfill, 2) // all filters should end up in the backfill queue

for i, id := range ids {
Expand Down
69 changes: 39 additions & 30 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
)

const NoNewReplayRequests = -1

var (
ErrFilterNameConflict = errors.New("filter with such name already exists")
)
Expand Down Expand Up @@ -49,7 +47,7 @@ type filtersI interface {
GetDistinctAddresses(ctx context.Context) ([]PublicKey, error)
GetFiltersToBackfill() []Filter
MarkFilterBackfilled(ctx context.Context, filterID int64) error
UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error
UpdateStartingBlocks(startingBlocks int64)
MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter]
DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error)
IncrementSeqNum(filterID int64) int64
Expand All @@ -58,7 +56,13 @@ type filtersI interface {
type ReplayInfo struct {
mut sync.RWMutex
requestBlock int64
pending bool
status ReplayStatus
}

// hasRequest returns true if a new request has been received (since the last request completed),
// whether or not it is pending yet
func (r ReplayInfo) hasRequest() bool {
return r.status == ReplayStatusRequested || r.status == ReplayStatusPending
}

type Service struct {
Expand Down Expand Up @@ -95,7 +99,7 @@ func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service {
},
}.NewServiceEngine(lggr)
lp.lggr = lp.eng.SugaredLogger
lp.replay.requestBlock = NoNewReplayRequests
lp.replay.status = ReplayStatusNoRequest

return lp
}
Expand Down Expand Up @@ -225,31 +229,31 @@ func (lp *Service) Replay(ctx context.Context, fromBlock int64) error {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock <= fromBlock {
if lp.replay.hasRequest() && lp.replay.requestBlock <= fromBlock {
// Already requested, no further action required
lp.lggr.Warnf("Ignoring redundant request to replay from block %d, replay from block %d already requested",
fromBlock, lp.replay.requestBlock)
return nil
}
err := lp.filters.UpdateStartingBlocks(ctx, fromBlock)
if err != nil {
return err
}
lp.filters.UpdateStartingBlocks(fromBlock)
lp.replay.requestBlock = fromBlock
if lp.replay.status != ReplayStatusPending {
lp.replay.status = ReplayStatusRequested
}

return nil
}

// ReplayPending returns the current replay status of LogPoller. true indicates there is a replay currently in progress.
// False means there is no replay in progress. Some subtleties to bear in mind:
// 1. if a new request has been submitted since the last replay finished, but the backfilling hasn't been started yet,
// this will still return false.
// 2. It may also return false if the node is restarted after a replay request was submitted but before it completes,
// even though it will still know to finish backfilling everything needed in order to satisfy the replay request.
func (lp *Service) ReplayPending() bool {
// ReplayStatus returns the current replay status of LogPoller:
//
// NoRequests - there have not been any replay requests yet since node startup
// Requested - a replay has been requested, but has not started yet
// Pending - a replay is currently in progress
// Complete - there was at least one replay executed since startup, but all have since completed
func (lp *Service) ReplayStatus() ReplayStatus {
lp.replay.mut.RLock()
defer lp.replay.mut.RUnlock()
return lp.replay.pending
return lp.replay.status
}

func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -281,21 +285,21 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {

// checkForReplayRequest checks whether there have been any new replay requests since it was last called,
// and if so sets the pending flag to true and returns the block number
func (lp *Service) checkForReplayRequest() int64 {
func (lp *Service) checkForReplayRequest() bool {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock == NoNewReplayRequests {
return NoNewReplayRequests
if !lp.replay.hasRequest() {
return false
}

lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock)
lp.replay.pending = true
return lp.replay.requestBlock
lp.replay.status = ReplayStatusPending
return true
}

func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error {
replayBlock := lp.checkForReplayRequest()
isReplay := lp.checkForReplayRequest()

addressesSet := make(map[PublicKey]struct{})
addresses := make([]PublicKey, 0, len(filters))
Expand All @@ -317,7 +321,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int
}

lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to)
if replayBlock != NoNewReplayRequests {
if isReplay {
lp.replayComplete(minSlot, to)
}

Expand Down Expand Up @@ -433,18 +437,23 @@ func (lp *Service) run(ctx context.Context) (err error) {
return nil
}

func (lp *Service) replayComplete(from, to int64) {
// replayComplete is called when a backfill associated with a current pending replay has just completed.
// Assuming there were no new requests to replay while the backfill was happening, it updates the replay
// status to ReplayStatusComplete. If there was a request for a lower block number in the meantime, then
// the status will revert to ReplayStatusRequested
func (lp *Service) replayComplete(from, to int64) bool {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

lp.lggr.Infow("replay complete", "from", from, "to", to)

lp.replay.pending = false
if lp.replay.requestBlock != NoNewReplayRequests && lp.replay.requestBlock < from {
if lp.replay.requestBlock < from {
// received a new request with lower block number while replaying, we'll process that next time
return
lp.replay.status = ReplayStatusRequested
return false
}
lp.replay.requestBlock = NoNewReplayRequests
lp.replay.status = ReplayStatusComplete
return true
}

func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
Expand Down
76 changes: 38 additions & 38 deletions pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -359,73 +358,74 @@ func Test_LogPoller_Replay(t *testing.T) {
fromBlock := int64(5)

lp := newMockedLP(t)
assertReplayInfo := func(requestBlock int64, pending bool) {
assertReplayInfo := func(requestBlock int64, status ReplayStatus) {
assert.Equal(t, requestBlock, lp.LogPoller.replay.requestBlock)
assert.Equal(t, pending, lp.LogPoller.replay.pending)
assert.Equal(t, status, lp.LogPoller.replay.status)
}

t.Run("ReplayInfo state initialized propery", func(t *testing.T) {
assertReplayInfo(NoNewReplayRequests, false)
assertReplayInfo(0, ReplayStatusNoRequest)
})

t.Run("ordinary replay request", func(t *testing.T) {
lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock).Once().Return(nil)
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock).Once()
err := lp.LogPoller.Replay(ctx, fromBlock)
require.NoError(t, err)
assertReplayInfo(fromBlock, false)
assertReplayInfo(fromBlock, ReplayStatusRequested)
})

t.Run("redundant replay request", func(t *testing.T) {
err := lp.LogPoller.Replay(ctx, fromBlock)
lp.LogPoller.replay.requestBlock = fromBlock
lp.LogPoller.replay.status = ReplayStatusRequested
err := lp.LogPoller.Replay(ctx, fromBlock+10)
require.NoError(t, err)
assertReplayInfo(fromBlock, false)
assertReplayInfo(fromBlock, ReplayStatusRequested)
})

t.Run("replay request updated", func(t *testing.T) {
lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-1).Once().Return(nil)
lp.LogPoller.replay.status = ReplayStatusNoRequest
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once()
err := lp.LogPoller.Replay(ctx, fromBlock-1)
require.NoError(t, err)
assertReplayInfo(fromBlock-1, false)
assertReplayInfo(fromBlock-1, ReplayStatusRequested)
})

t.Run("shouldnt update requestBlock if UpdateStartingBlocks fails", func(t *testing.T) {
expectedErr := fmt.Errorf("error")
lp.LogPoller.replay.requestBlock = NoNewReplayRequests
lp.Filters.EXPECT().UpdateStartingBlocks(mock.Anything, fromBlock-3).Once().Return(expectedErr)
err := lp.LogPoller.Replay(ctx, fromBlock-3)
assert.ErrorIs(t, err, expectedErr)
assertReplayInfo(NoNewReplayRequests, false)
t.Run("replay request updated while pending", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = fromBlock
lp.LogPoller.replay.status = ReplayStatusPending
lp.Filters.EXPECT().UpdateStartingBlocks(fromBlock - 1).Once()
err := lp.LogPoller.Replay(ctx, fromBlock-1)
require.NoError(t, err)
assertReplayInfo(fromBlock-1, ReplayStatusPending)
})

t.Run("checkForReplayRequest should not set pending flag if there are no new requests", func(t *testing.T) {
expected := int64(NoNewReplayRequests)
lp.LogPoller.replay.requestBlock = expected
actual := lp.LogPoller.checkForReplayRequest()
assertReplayInfo(expected, false)
assert.Equal(t, expected, actual)
assert.False(t, lp.LogPoller.ReplayPending())
t.Run("checkForReplayRequest should not enter pending state if there are no requests", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 400
lp.LogPoller.replay.status = ReplayStatusComplete
assert.False(t, lp.LogPoller.checkForReplayRequest())
assertReplayInfo(400, ReplayStatusComplete)
assert.Equal(t, ReplayStatusComplete, lp.LogPoller.ReplayStatus())
})

t.Run("checkForReplayRequest should set pending flag if there is a new request", func(t *testing.T) {
expected := int64(3)
lp.LogPoller.replay.requestBlock = expected
actual := lp.LogPoller.checkForReplayRequest()
assertReplayInfo(expected, true)
assert.Equal(t, expected, actual)
assert.True(t, lp.LogPoller.ReplayPending())
t.Run("checkForReplayRequest should enter pending state if there is a new request", func(t *testing.T) {
lp.LogPoller.replay.status = ReplayStatusRequested
lp.LogPoller.replay.requestBlock = 18
assert.True(t, lp.LogPoller.checkForReplayRequest())
assertReplayInfo(18, ReplayStatusPending)
assert.Equal(t, ReplayStatusPending, lp.LogPoller.ReplayStatus())
})

t.Run("replayComplete clears pending flag and resets requestBlock", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = NoNewReplayRequests
lp.LogPoller.replay.pending = true
t.Run("replayComplete enters ReplayComplete state", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 10
lp.LogPoller.replay.status = ReplayStatusPending
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(NoNewReplayRequests, false)
assertReplayInfo(10, ReplayStatusComplete)
})

t.Run("replayComplete clears pending flag but does not reset requestBlock if lower block request received", func(t *testing.T) {
t.Run("replayComplete stays in pending state if lower block request received", func(t *testing.T) {
lp.LogPoller.replay.requestBlock = 3
lp.LogPoller.replay.pending = true
lp.LogPoller.replay.status = ReplayStatusPending
lp.LogPoller.replayComplete(8, 20)
assertReplayInfo(3, false)
assertReplayInfo(3, ReplayStatusRequested)
})
}
34 changes: 10 additions & 24 deletions pkg/solana/logpoller/mock_filters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/solana/logpoller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,27 @@ func newIndexedValue(typedVal any) (iVal IndexedValue, err error) {
}
return nil, fmt.Errorf("can't create indexed value from type %T", typedVal)
}

type ReplayStatus int

const (
ReplayStatusNoRequest ReplayStatus = iota
ReplayStatusRequested
ReplayStatusPending
ReplayStatusComplete
)

func (rs ReplayStatus) String() string {
switch rs {
case ReplayStatusNoRequest:
return "NoRequest"
case ReplayStatusRequested:
return "Requested"
case ReplayStatusPending:
return "Pending"
case ReplayStatusComplete:
return "Complete"
default:
return fmt.Sprintf("invalid status: %d", rs) // Handle unknown cases
}
}

0 comments on commit 318ee59

Please sign in to comment.