Skip to content

Commit

Permalink
Add global replay feature to LogPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 13, 2025
1 parent e727e73 commit de84b8b
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,26 @@ func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) err
return nil
}

func (fl *filters) UpdateStartingBlocks(ctx context.Context, startingBlock int64) error {
fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()

startingBlocks := make(map[int64]int64, len(fl.filtersByID))
for id, filter := range fl.filtersByID {
if startingBlock < filter.StartingBlock {
startingBlocks[id] = startingBlock
}
}
err := fl.orm.UpdateStartingBlocks(ctx, startingBlocks)
if err != nil {
return err
}
for id, blk := range startingBlocks {
fl.filtersByID[id].StartingBlock = blk
}
return nil
}

// LoadFilters - loads filters from database. Can be called multiple times without side effects.
func (fl *filters) LoadFilters(ctx context.Context) error {
if fl.loadedFilters.Load() {
Expand Down
77 changes: 77 additions & 0 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"iter"
"math"
"sync"
"time"

"github.com/gagliardetto/solana-go/rpc"
Expand All @@ -29,6 +30,7 @@ type ORM interface {
DeleteFilters(ctx context.Context, filters map[int64]Filter) error
MarkFilterDeleted(ctx context.Context, id int64) (err error)
MarkFilterBackfilled(ctx context.Context, id int64) (err error)
UpdateStartingBlocks(ctx context.Context, startingBlock map[int64]int64) (err error)
GetLatestBlock(ctx context.Context) (int64, error)
InsertLogs(context.Context, []Log) (err error)
SelectSeqNums(ctx context.Context) (map[int64]int64, error)
Expand All @@ -47,18 +49,26 @@ type filtersI interface {
GetDistinctAddresses(ctx context.Context) ([]PublicKey, error)
GetFiltersToBackfill() []Filter
MarkFilterBackfilled(ctx context.Context, filterID int64) error
UpdateStartingBlocks(ctx context.Context, startingBlocks int64) error
MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter]
DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error)
IncrementSeqNum(filterID int64) int64
}

type ReplayInfo struct {
mut sync.RWMutex
requestBlock int64
pending bool
}

type Service struct {
services.Service
eng *services.Engine

lggr logger.SugaredLogger
orm ORM
lastProcessedSlot int64
replay ReplayInfo
client RPCClient
loader logsLoader
filters filtersI
Expand Down Expand Up @@ -201,6 +211,38 @@ func (lp *Service) UnregisterFilter(ctx context.Context, name string) error {
return lp.filters.UnregisterFilter(ctx, name)
}

func (lp *Service) Replay(ctx context.Context, fromBlock int64) error {
ctx, cancel := lp.eng.Ctx(ctx)
defer cancel()

lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock != 0 && fromBlock >= lp.replay.requestBlock {
// Already requested, no further action required
return nil
}
err := lp.filters.UpdateStartingBlocks(ctx, fromBlock)
if err != nil {
return err
}
lp.replay.requestBlock = fromBlock

return nil
}

// ReplayPending returns the current replay status of LogPoller. true indicates there is a replay currently in progress.
// False means there is no replay in progress. Some subtleties to bear in mind:
// 1. if a new request has been submitted since the last replay finished, but the backfilling hasn't been started yet,
// this will still return false.
// 2. It may also return false if the node is restarted after a replay request was submitted but before it completes,
// even though it will still know to finish backfilling everything needed in order to satisfy the replay request.
func (lp *Service) ReplayPending() bool {
lp.replay.mut.RLock()
defer lp.replay.mut.RUnlock()
return lp.replay.pending
}

func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {
if lp.lastProcessedSlot != 0 {
return lp.lastProcessedSlot, nil
Expand Down Expand Up @@ -228,10 +270,28 @@ func (lp *Service) getLastProcessedSlot(ctx context.Context) (int64, error) {
return int64(latestFinalizedSlot) - 1, nil //
}

// checkForReplayRequest checks whether there have been any new replay requests since it was last called,
// and if so sets the pending flag to true and returns the block number
func (lp *Service) checkForReplayRequest() int64 {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock < 0 {
return lp.replay.requestBlock
}

lp.lggr.Infow("starting replay", "replayBlock", lp.replay.requestBlock)
lp.replay.pending = true
return lp.replay.requestBlock
}

func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int64) error {
replayBlock := lp.checkForReplayRequest()

addressesSet := make(map[PublicKey]struct{})
addresses := make([]PublicKey, 0, len(filters))
minSlot := to

for _, filter := range filters {
if _, ok := addressesSet[filter.Address]; !ok {
addressesSet[filter.Address] = struct{}{}
Expand All @@ -248,6 +308,10 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int
}

lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to)
if replayBlock > 0 {
lp.replayComplete(replayBlock)
}

for _, filter := range filters {
filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID)
if filterErr != nil {
Expand Down Expand Up @@ -360,6 +424,19 @@ func (lp *Service) run(ctx context.Context) (err error) {
return nil
}

func (lp *Service) replayComplete(replayBlock int64) {
lp.replay.mut.Lock()
defer lp.replay.mut.Unlock()

if lp.replay.requestBlock < replayBlock {
// received a new request with lower block number while replaying, we'll process that next time
return
}
lp.replay.requestBlock = -1
lp.replay.pending = false
lp.lggr.Infow("replay complete", "replayBlock", lp.replay.requestBlock)
}

func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
for {
select {
Expand Down
47 changes: 47 additions & 0 deletions pkg/solana/logpoller/mock_filters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions pkg/solana/logpoller/mock_orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ func (o *DSORM) MarkFilterBackfilled(ctx context.Context, id int64) (err error)
return err
}

// UpdateStartingBlocks accepts a map from filter ids to new startingBlock numbers. Each filter corresponding
// to an id in the map will have its starting_block column updated with the new value. is_backfilled for each
// filter will also be reset to false
func (o *DSORM) UpdateStartingBlocks(ctx context.Context, startingBlocks map[int64]int64) (err error) {
query := `UPDATE solana.log_poller_filters SET starting_block = $1, is_backfilled = false WHERE id=$2;`
return o.Transact(ctx, func(orm *DSORM) error {
for id, startingBlock := range startingBlocks {
_, err = o.ds.ExecContext(ctx, query, startingBlock, id)
if err != nil {
return err
}
}
return nil
})
}

func (o *DSORM) DeleteFilter(ctx context.Context, id int64) (err error) {
query := `DELETE FROM solana.log_poller_filters WHERE id = $1`
_, err = o.ds.ExecContext(ctx, query, id)
Expand Down

0 comments on commit de84b8b

Please sign in to comment.