Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogPoller e2e test fixes #1046

Merged
merged 9 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/solana/codec/discriminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewDiscriminatorExtractor() DiscriminatorExtractor {
// Extract expects input of > 12 characters which 8 bytes are extracted from, if the input string is less than 12 characters, this will panic.
// Extract doesn't handle base64 padding because discriminators shouldn't have padding.
// If string contains non-Base64 characters (e.g., !, @, space) map to index 0 (ASCII 'A'), and won't be accurate.
func (e *DiscriminatorExtractor) Extract(data string) []byte {
func (e *DiscriminatorExtractor) Extract(data string) [discriminatorLength]byte {
var decodeBuffer [9]byte
d := decodeBuffer[:9]
s := data[:12]
Expand All @@ -121,5 +121,5 @@ func (e *DiscriminatorExtractor) Extract(data string) []byte {
s = s[4:]
}

return decodeBuffer[:discriminatorLength]
return [discriminatorLength]byte(decodeBuffer[:discriminatorLength])
}
2 changes: 1 addition & 1 deletion pkg/solana/codec/discriminator_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FuzzExtractorHappyPath(f *testing.F) {

stdDecoded, err := base64.StdEncoding.DecodeString(testString)
if err == nil {
require.Equal(t, stdDecoded[:8], extractor.Extract(testString))
require.Equal(t, [8]byte(stdDecoded[:8]), extractor.Extract(testString))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.Equal(t, [8]byte(stdDecoded[:8]), extractor.Extract(testString))
require.Equal(t, [discriminatorLength]byte(stdDecoded[:discriminatorLength]), extractor.Extract(testString))

}
})
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type filters struct {
filtersToDelete map[int64]Filter
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters
knownDiscriminators map[string]uint // fast lookup based on raw discriminator bytes as string
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters
knownDiscriminators map[EventSignature]uint // fast lookup based on raw discriminator bytes as string
seqNums map[int64]int64
decoders map[int64]Decoder
discriminatorExtractor codec.DiscriminatorExtractor
Expand Down Expand Up @@ -179,7 +179,7 @@ func (fl *filters) addToIndices(filter Filter, decoder Decoder) error {

programID := filter.Address.ToSolana().String()
fl.knownPrograms[programID]++
fl.knownDiscriminators[filter.DiscriminatorRawBytes()]++
fl.knownDiscriminators[filter.EventSig]++
return nil
}

Expand Down Expand Up @@ -255,13 +255,12 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) {
}
}

discriminator := filter.DiscriminatorRawBytes()
if refcount, ok := fl.knownDiscriminators[discriminator]; ok {
if refcount, ok := fl.knownDiscriminators[filter.EventSig]; ok {
refcount--
if refcount > 0 {
fl.knownDiscriminators[discriminator] = refcount
fl.knownDiscriminators[filter.EventSig] = refcount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're renaming filter.DiscriminatorRawBytes() to filter.EventSig, shouldn't we also rename fl.knownDiscriminators to fl.knownEventSigs?

} else {
delete(fl.knownDiscriminators, discriminator)
delete(fl.knownDiscriminators, filter.EventSig)
}
}
}
Expand Down Expand Up @@ -338,7 +337,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F
return ok
}

_, ok = fl.knownDiscriminators[string(discriminator)]
_, ok = fl.knownDiscriminators[discriminator]
return ok
}

Expand All @@ -352,7 +351,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F
return nil
}

return fl.matchingFilters(PublicKey(addr), EventSignature(discriminator))
return fl.matchingFilters(PublicKey(addr), discriminator)
}

// GetFiltersToBackfill - returns copy of backfill queue
Expand Down Expand Up @@ -413,7 +412,7 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
fl.filtersToBackfill = make(map[int64]struct{})
fl.filtersToDelete = make(map[int64]Filter)
fl.knownPrograms = make(map[string]uint)
fl.knownDiscriminators = make(map[string]uint)
fl.knownDiscriminators = make(map[EventSignature]uint)

filters, err := fl.orm.SelectFilters(ctx)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func TestFilters_LoadFilters(t *testing.T) {
ID: 1,
Name: "Happy path",
EventName: "happyPath1",
EventSig: NewEventSignatureFromName("happyPath1"),
IsBackfilled: true,
}
happyPath2 := Filter{
ID: 2,
Name: "Happy path 2",
EventName: "happyPath2",
EventSig: NewEventSignatureFromName("happyPath2"),
}
orm.On("SelectFilters", mock.Anything).Return([]Filter{
deleted,
Expand All @@ -59,8 +61,8 @@ func TestFilters_LoadFilters(t *testing.T) {
require.Equal(t, deleted, fs.filtersToDelete[deleted.ID])
// filtersByAddress only contains not deleted filters
require.Len(t, fs.filtersByAddress, 1)
require.Len(t, fs.filtersByAddress[happyPath.Address], 1)
require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2)
require.Len(t, fs.filtersByAddress[happyPath.Address], 2)
require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 1)
// both filters are properly indexed
requireIndexed(t, fs, happyPath)
requireIndexed(t, fs, happyPath2)
Expand All @@ -81,7 +83,7 @@ func requireIndexed(t *testing.T, fs *filters, f Filter) {
eventSigIDs := byEventSig[f.EventSig]
require.Contains(t, eventSigIDs, f.ID)
require.Contains(t, fs.decoders, f.ID)
require.Contains(t, fs.knownDiscriminators, f.DiscriminatorRawBytes())
require.Contains(t, fs.knownDiscriminators, f.EventSig)
require.Contains(t, fs.knownPrograms, f.Address.String())
}

Expand All @@ -94,7 +96,7 @@ func requireNoInIndices(t *testing.T, fs *filters, f Filter) {
require.NotContains(t, byEventSig[f.EventSig], f.ID)
}
require.NotContains(t, fs.decoders, f.ID)
require.NotContains(t, fs.knownDiscriminators, f.DiscriminatorRawBytes())
require.NotContains(t, fs.knownDiscriminators, f.EventSig)
require.NotContains(t, fs.knownPrograms, f.Address.String())
require.NotContains(t, fs.seqNums, f.ID)
require.NotContains(t, fs.filtersToBackfill, f.ID)
Expand Down Expand Up @@ -173,7 +175,6 @@ func TestFilters_RegisterFilter(t *testing.T) {
err = fs.RegisterFilter(tests.Context(t), filter)
require.NoError(t, err)
// can update non-primary fields
filter.EventName = uuid.NewString()
filter.StartingBlock++
filter.Retention++
filter.MaxLogsKept++
Expand Down
13 changes: 11 additions & 2 deletions pkg/solana/logpoller/job_get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"

"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
Expand All @@ -16,21 +18,23 @@ import (
// getBlockJob is a job that fetches a block with transactions, converts logs into ProgramEvents and writes them into blocks channel
type getBlockJob struct {
slotNumber uint64
stopCh services.StopRChan
client RPCClient
blocks chan Block
done chan struct{}
parseProgramLogs func(logs []string) []ProgramOutput
lggr logger.SugaredLogger
}

func newGetBlockJob(client RPCClient, blocks chan Block, lggr logger.SugaredLogger, slotNumber uint64) *getBlockJob {
func newGetBlockJob(stopCh services.StopRChan, client RPCClient, blocks chan Block, lggr logger.SugaredLogger, slotNumber uint64) *getBlockJob {
return &getBlockJob{
client: client,
blocks: blocks,
slotNumber: slotNumber,
done: make(chan struct{}),
parseProgramLogs: parseProgramLogs,
lggr: lggr,
stopCh: stopCh,
}
}

Expand All @@ -43,6 +47,11 @@ func (j *getBlockJob) Done() <-chan struct{} {
}

func (j *getBlockJob) Run(ctx context.Context) error {
ctx, cancel := j.stopCh.Ctx(ctx)
defer cancel()
if ctx.Err() != nil {
return ctx.Err()
}
var excludeRewards bool
version := client.MaxSupportTransactionVersion
block, err := j.client.GetBlockWithOpts(
Expand Down Expand Up @@ -95,7 +104,7 @@ func (j *getBlockJob) Run(ctx context.Context) error {
return fmt.Errorf("expected transaction to have meta. signature: %s; slot: %d; idx: %d", tx.Signatures[0], j.slotNumber, idx)
}
if txWithMeta.Meta.Err != nil {
j.lggr.Debugw("Skipping all events of failed transaction", "err", txWithMeta.Meta.Err, "signature", tx.Signatures[0])
// silently skip as at the moment there is no way for us to filter transactions produced by our contracts
continue
}
detail.trxSig = tx.Signatures[0] // according to Solana docs fist signature is used as ID
Expand Down
28 changes: 15 additions & 13 deletions pkg/solana/logpoller/job_get_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func TestGetBlockJob(t *testing.T) {
const slotNumber = uint64(42)
t.Run("String contains slot number", func(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
job := newGetBlockJob(nil, nil, lggr, slotNumber)
job := newGetBlockJob(nil, nil, nil, lggr, slotNumber)
require.Equal(t, "getBlock for slotNumber: 42", job.String())
})
t.Run("Error if fails to get block", func(t *testing.T) {
client := mocks.NewRPCClient(t)
lggr := logger.Sugared(logger.Test(t))
expectedError := errors.New("rpc failed")
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(nil, expectedError).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err := job.Run(tests.Context(t))
require.ErrorIs(t, err, expectedError)
})
Expand All @@ -39,7 +39,7 @@ func TestGetBlockJob(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
block := rpc.GetBlockResult{}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err := job.Run(tests.Context(t))
require.ErrorContains(t, err, "block at slot 42 returned from rpc is missing block number")
})
Expand All @@ -49,7 +49,7 @@ func TestGetBlockJob(t *testing.T) {

block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10))}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err := job.Run(tests.Context(t))
require.ErrorContains(t, err, "block at slot 42 returned from rpc is missing block time")
})
Expand All @@ -58,7 +58,7 @@ func TestGetBlockJob(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: nil}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err := job.Run(tests.Context(t))
require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42: missing transaction field")
})
Expand All @@ -67,7 +67,7 @@ func TestGetBlockJob(t *testing.T) {
lggr := logger.Sugared(logger.Test(t))
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes([]byte("{"))}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err := job.Run(tests.Context(t))
require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42")
})
Expand All @@ -79,7 +79,7 @@ func TestGetBlockJob(t *testing.T) {
require.NoError(t, err)
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB)}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err = job.Run(tests.Context(t))
require.ErrorContains(t, err, "expected all transactions to have at least one signature 0 in slot 42")
})
Expand All @@ -91,7 +91,7 @@ func TestGetBlockJob(t *testing.T) {
require.NoError(t, err)
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB)}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber)
err = job.Run(tests.Context(t))
require.ErrorContains(t, err, "expected transaction to have meta. signature: 2AnZxg8HN2sGa7GC7iWGDgpXbEasqXQNEumCjvHUFDcBnfRKAdaN3SvKLhbQwheN15xDkL5D5mdX21A5gH1MdYB; slot: 42; idx: 0")
})
Expand All @@ -101,11 +101,13 @@ func TestGetBlockJob(t *testing.T) {
tx := solana.Transaction{Signatures: make([]solana.Signature, 1)}
txB, err := tx.MarshalBinary()
require.NoError(t, err)
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB), Meta: &rpc.TransactionMeta{}}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), lggr, slotNumber)
ctx, cancel := context.WithCancel(tests.Context(t))
cancel()
block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB), Meta: &rpc.TransactionMeta{}}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).RunAndReturn(func(ctx context.Context, u uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
cancel()
return &block, nil
}).Once()
job := newGetBlockJob(ctx.Done(), client, make(chan Block), lggr, slotNumber)
err = job.Run(ctx)
require.ErrorIs(t, err, context.Canceled)
select {
Expand Down Expand Up @@ -133,7 +135,7 @@ func TestGetBlockJob(t *testing.T) {
blockTime := solana.UnixTimeSeconds(128)
block := rpc.GetBlockResult{BlockHeight: &height, BlockTime: ptr(blockTime), Blockhash: solana.Hash{1, 2, 3}, Transactions: []rpc.TransactionWithMeta{txWithMeta1, txWithMeta2, txWithMeta3}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block, 1), lggr, slotNumber)
job := newGetBlockJob(nil, client, make(chan Block, 1), lggr, slotNumber)
job.parseProgramLogs = func(logs []string) []ProgramOutput {
result := ProgramOutput{
Program: "myProgram",
Expand Down
14 changes: 11 additions & 3 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots
blocks := make(chan Block)
getBlockJobs := make([]*getBlockJob, len(slots))
for i, slot := range slots {
getBlockJobs[i] = newGetBlockJob(c.client, blocks, c.lggr, slot)
getBlockJobs[i] = newGetBlockJob(ctx.Done(), c.client, blocks, c.lggr, slot)
err := c.workers.Do(ctx, getBlockJobs[i])
if err != nil {
return nil, fmt.Errorf("could not schedule job to fetch blocks for slot: %w", err)
}
}

c.engine.Go(func(ctx context.Context) {
go func() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to cancel this goroutine if iteration gets aborted

for _, job := range getBlockJobs {
select {
case <-ctx.Done():
Expand All @@ -129,7 +129,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots
}
}
close(blocks)
})
}()

return blocks, nil
}
Expand All @@ -142,6 +142,13 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse

c.lggr.Debugw("Got all slots that need fetching for backfill operations", "addresses", PublicKeysToString(addresses), "fromSlot", fromSlot, "toSlot", toSlot, "slotsToFetch", slotsToFetch)

ctx, cancelJobs := context.WithCancel(ctx)
defer func() {
// if failed to start backfill process - cancel jobs
if err != nil {
cancelJobs()
}
}()
unorderedBlocks, err := c.scheduleBlocksFetching(ctx, slotsToFetch)
if err != nil {
return nil, func() {}, fmt.Errorf("failed to schedule blocks to fetch: %w", err)
Expand All @@ -153,6 +160,7 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse
}

cleanUp = func() {
cancelJobs()
err := blocksSorter.Close()
if err != nil {
blocksSorter.lggr.Errorw("Failed to close blocks sorter", "err", err)
Expand Down
Loading
Loading