From 942f1daa8bf6df3adf5600ae8d23cb4002d21056 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 18:05:05 +0100 Subject: [PATCH 1/8] Make work with EventSignatures & Discriminator consistent --- pkg/solana/codec/discriminator.go | 4 ++-- pkg/solana/logpoller/filters.go | 19 +++++++++---------- pkg/solana/logpoller/filters_test.go | 11 ++++++----- pkg/solana/logpoller/log_poller_test.go | 2 +- pkg/solana/logpoller/models.go | 9 +-------- pkg/solana/logpoller/test_helpers.go | 4 +--- pkg/solana/logpoller/types.go | 8 ++++++++ 7 files changed, 28 insertions(+), 29 deletions(-) diff --git a/pkg/solana/codec/discriminator.go b/pkg/solana/codec/discriminator.go index 0046b1e21..e1f45e50a 100644 --- a/pkg/solana/codec/discriminator.go +++ b/pkg/solana/codec/discriminator.go @@ -98,7 +98,7 @@ func NewDiscriminatorExtractor() DiscriminatorExtractor { // Extract expects input of > 12 characters which 8 bytes are extracted from, if the input string is less than 12 characters, this will panic. // Extract doesn't handle base64 padding because discriminators shouldn't have padding. // If string contains non-Base64 characters (e.g., !, @, space) map to index 0 (ASCII 'A'), and won't be accurate. -func (e *DiscriminatorExtractor) Extract(data string) []byte { +func (e *DiscriminatorExtractor) Extract(data string) [discriminatorLength]byte { var decodeBuffer [9]byte d := decodeBuffer[:9] s := data[:12] @@ -121,5 +121,5 @@ func (e *DiscriminatorExtractor) Extract(data string) []byte { s = s[4:] } - return decodeBuffer[:discriminatorLength] + return [discriminatorLength]byte(decodeBuffer[:discriminatorLength]) } diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 63fd1721d..b6724e9e9 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -30,8 +30,8 @@ type filters struct { filtersToDelete map[int64]Filter filtersMutex sync.RWMutex loadedFilters atomic.Bool - knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters - knownDiscriminators map[string]uint // fast lookup based on raw discriminator bytes as string + knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters + knownDiscriminators map[EventSignature]uint // fast lookup based on raw discriminator bytes as string seqNums map[int64]int64 decoders map[int64]Decoder discriminatorExtractor codec.DiscriminatorExtractor @@ -179,7 +179,7 @@ func (fl *filters) addToIndices(filter Filter, decoder Decoder) error { programID := filter.Address.ToSolana().String() fl.knownPrograms[programID]++ - fl.knownDiscriminators[filter.DiscriminatorRawBytes()]++ + fl.knownDiscriminators[filter.EventSig]++ return nil } @@ -255,13 +255,12 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { } } - discriminator := filter.DiscriminatorRawBytes() - if refcount, ok := fl.knownDiscriminators[discriminator]; ok { + if refcount, ok := fl.knownDiscriminators[filter.EventSig]; ok { refcount-- if refcount > 0 { - fl.knownDiscriminators[discriminator] = refcount + fl.knownDiscriminators[filter.EventSig] = refcount } else { - delete(fl.knownDiscriminators, discriminator) + delete(fl.knownDiscriminators, filter.EventSig) } } } @@ -338,7 +337,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F return ok } - _, ok = fl.knownDiscriminators[string(discriminator)] + _, ok = fl.knownDiscriminators[discriminator] return ok } @@ -352,7 +351,7 @@ func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[F return nil } - return fl.matchingFilters(PublicKey(addr), EventSignature(discriminator)) + return fl.matchingFilters(PublicKey(addr), discriminator) } // GetFiltersToBackfill - returns copy of backfill queue @@ -413,7 +412,7 @@ func (fl *filters) LoadFilters(ctx context.Context) error { fl.filtersToBackfill = make(map[int64]struct{}) fl.filtersToDelete = make(map[int64]Filter) fl.knownPrograms = make(map[string]uint) - fl.knownDiscriminators = make(map[string]uint) + fl.knownDiscriminators = make(map[EventSignature]uint) filters, err := fl.orm.SelectFilters(ctx) if err != nil { diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 7ccd88811..c09dcea5a 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -31,12 +31,14 @@ func TestFilters_LoadFilters(t *testing.T) { ID: 1, Name: "Happy path", EventName: "happyPath1", + EventSig: NewEventSignatureFromName("happyPath1"), IsBackfilled: true, } happyPath2 := Filter{ ID: 2, Name: "Happy path 2", EventName: "happyPath2", + EventSig: NewEventSignatureFromName("happyPath2"), } orm.On("SelectFilters", mock.Anything).Return([]Filter{ deleted, @@ -59,8 +61,8 @@ func TestFilters_LoadFilters(t *testing.T) { require.Equal(t, deleted, fs.filtersToDelete[deleted.ID]) // filtersByAddress only contains not deleted filters require.Len(t, fs.filtersByAddress, 1) - require.Len(t, fs.filtersByAddress[happyPath.Address], 1) - require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) + require.Len(t, fs.filtersByAddress[happyPath.Address], 2) + require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 1) // both filters are properly indexed requireIndexed(t, fs, happyPath) requireIndexed(t, fs, happyPath2) @@ -81,7 +83,7 @@ func requireIndexed(t *testing.T, fs *filters, f Filter) { eventSigIDs := byEventSig[f.EventSig] require.Contains(t, eventSigIDs, f.ID) require.Contains(t, fs.decoders, f.ID) - require.Contains(t, fs.knownDiscriminators, f.DiscriminatorRawBytes()) + require.Contains(t, fs.knownDiscriminators, f.EventSig) require.Contains(t, fs.knownPrograms, f.Address.String()) } @@ -94,7 +96,7 @@ func requireNoInIndices(t *testing.T, fs *filters, f Filter) { require.NotContains(t, byEventSig[f.EventSig], f.ID) } require.NotContains(t, fs.decoders, f.ID) - require.NotContains(t, fs.knownDiscriminators, f.DiscriminatorRawBytes()) + require.NotContains(t, fs.knownDiscriminators, f.EventSig) require.NotContains(t, fs.knownPrograms, f.Address.String()) require.NotContains(t, fs.seqNums, f.ID) require.NotContains(t, fs.filtersToBackfill, f.ID) @@ -173,7 +175,6 @@ func TestFilters_RegisterFilter(t *testing.T) { err = fs.RegisterFilter(tests.Context(t), filter) require.NoError(t, err) // can update non-primary fields - filter.EventName = uuid.NewString() filter.StartingBlock++ filter.Retention++ filter.MaxLogsKept++ diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index 32c5c3443..32ecc911e 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -250,7 +250,7 @@ func TestProcess(t *testing.T) { addr := newRandomPublicKey(t) eventName := "myEvent" - eventSig := EventSignature(codec.NewDiscriminatorHashPrefix(eventName, false)) + eventSig := NewEventSignatureFromName(eventName) event := struct { A int64 B string diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index 293303f50..fc8a55c0e 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -2,8 +2,6 @@ package logpoller import ( "time" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) type Filter struct { @@ -22,15 +20,10 @@ type Filter struct { } func (f Filter) MatchSameLogs(other Filter) bool { - return f.Address == other.Address && f.EventSig == other.EventSig && + return f.Address == other.Address && f.EventSig == other.EventSig && f.EventName == other.EventName && f.EventIdl.Equal(other.EventIdl) && f.SubkeyPaths.Equal(other.SubkeyPaths) } -// DiscriminatorRawBytes returns raw discriminator bytes as a string, this string is not base64 encoded and is always len of discriminator which is 8. -func (f Filter) DiscriminatorRawBytes() string { - return string(codec.NewDiscriminatorHashPrefix(f.EventName, false)) -} - type Log struct { ID int64 FilterID int64 diff --git a/pkg/solana/logpoller/test_helpers.go b/pkg/solana/logpoller/test_helpers.go index 8f133bab2..a1dda9282 100644 --- a/pkg/solana/logpoller/test_helpers.go +++ b/pkg/solana/logpoller/test_helpers.go @@ -7,8 +7,6 @@ import ( "github.com/gagliardetto/solana-go" "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) func newRandomPublicKey(t *testing.T) PublicKey { @@ -38,7 +36,7 @@ func newRandomLog(t *testing.T, filterID int64, chainID string, eventName string BlockNumber: rand.Int63n(1000000), BlockTimestamp: time.Unix(1731590113, 0).UTC(), Address: PublicKey(pubKey), - EventSig: EventSignature(codec.NewDiscriminatorHashPrefix(eventName, false)), + EventSig: NewEventSignatureFromName(eventName), SubkeyValues: []IndexedValue{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, TxHash: Signature(signature), Data: data, diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 7f9d59edb..d76b7f5ab 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -112,11 +112,19 @@ const EventSignatureLength = 8 type EventSignature [EventSignatureLength]byte +func NewEventSignatureFromName(eventName string) EventSignature { + return EventSignature(codec.NewDiscriminatorHashPrefix(eventName, false)) +} + // Scan implements Scanner for database/sql. func (s *EventSignature) Scan(src interface{}) error { return scanFixedLengthArray("EventSignature", EventSignatureLength, src, s[:]) } +func (s EventSignature) String() string { + return string(s[:]) +} + // Value implements valuer for database/sql. func (s EventSignature) Value() (driver.Value, error) { return s[:], nil From 4ad5f7292db880273d2bea80d691c46fb4f7c4b8 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 18:17:08 +0100 Subject: [PATCH 2/8] Ensure get block job is canceled in case of an error --- pkg/solana/logpoller/job_get_block.go | 13 ++++++-- pkg/solana/logpoller/job_get_block_test.go | 28 +++++++++-------- pkg/solana/logpoller/loader.go | 14 +++++++-- pkg/solana/logpoller/worker/worker.go | 35 ++++++++++++++-------- 4 files changed, 59 insertions(+), 31 deletions(-) diff --git a/pkg/solana/logpoller/job_get_block.go b/pkg/solana/logpoller/job_get_block.go index 7cf62ba98..f74c67bdb 100644 --- a/pkg/solana/logpoller/job_get_block.go +++ b/pkg/solana/logpoller/job_get_block.go @@ -8,6 +8,8 @@ import ( "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" @@ -16,6 +18,7 @@ import ( // getBlockJob is a job that fetches a block with transactions, converts logs into ProgramEvents and writes them into blocks channel type getBlockJob struct { slotNumber uint64 + stopCh services.StopRChan client RPCClient blocks chan Block done chan struct{} @@ -23,7 +26,7 @@ type getBlockJob struct { lggr logger.SugaredLogger } -func newGetBlockJob(client RPCClient, blocks chan Block, lggr logger.SugaredLogger, slotNumber uint64) *getBlockJob { +func newGetBlockJob(stopCh services.StopRChan, client RPCClient, blocks chan Block, lggr logger.SugaredLogger, slotNumber uint64) *getBlockJob { return &getBlockJob{ client: client, blocks: blocks, @@ -31,6 +34,7 @@ func newGetBlockJob(client RPCClient, blocks chan Block, lggr logger.SugaredLogg done: make(chan struct{}), parseProgramLogs: parseProgramLogs, lggr: lggr, + stopCh: stopCh, } } @@ -43,6 +47,11 @@ func (j *getBlockJob) Done() <-chan struct{} { } func (j *getBlockJob) Run(ctx context.Context) error { + ctx, cancel := j.stopCh.Ctx(ctx) + defer cancel() + if ctx.Err() != nil { + return ctx.Err() + } var excludeRewards bool version := client.MaxSupportTransactionVersion block, err := j.client.GetBlockWithOpts( @@ -95,7 +104,7 @@ func (j *getBlockJob) Run(ctx context.Context) error { return fmt.Errorf("expected transaction to have meta. signature: %s; slot: %d; idx: %d", tx.Signatures[0], j.slotNumber, idx) } if txWithMeta.Meta.Err != nil { - j.lggr.Debugw("Skipping all events of failed transaction", "err", txWithMeta.Meta.Err, "signature", tx.Signatures[0]) + // silently skip as at the moment there is no way for us to filter transactions produced by our contracts continue } detail.trxSig = tx.Signatures[0] // according to Solana docs fist signature is used as ID diff --git a/pkg/solana/logpoller/job_get_block_test.go b/pkg/solana/logpoller/job_get_block_test.go index 078746f13..a0c14d2b8 100644 --- a/pkg/solana/logpoller/job_get_block_test.go +++ b/pkg/solana/logpoller/job_get_block_test.go @@ -22,7 +22,7 @@ func TestGetBlockJob(t *testing.T) { const slotNumber = uint64(42) t.Run("String contains slot number", func(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) - job := newGetBlockJob(nil, nil, lggr, slotNumber) + job := newGetBlockJob(nil, nil, nil, lggr, slotNumber) require.Equal(t, "getBlock for slotNumber: 42", job.String()) }) t.Run("Error if fails to get block", func(t *testing.T) { @@ -30,7 +30,7 @@ func TestGetBlockJob(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) expectedError := errors.New("rpc failed") client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(nil, expectedError).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err := job.Run(tests.Context(t)) require.ErrorIs(t, err, expectedError) }) @@ -39,7 +39,7 @@ func TestGetBlockJob(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) block := rpc.GetBlockResult{} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err := job.Run(tests.Context(t)) require.ErrorContains(t, err, "block at slot 42 returned from rpc is missing block number") }) @@ -49,7 +49,7 @@ func TestGetBlockJob(t *testing.T) { block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10))} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err := job.Run(tests.Context(t)) require.ErrorContains(t, err, "block at slot 42 returned from rpc is missing block time") }) @@ -58,7 +58,7 @@ func TestGetBlockJob(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: nil}}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err := job.Run(tests.Context(t)) require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42: missing transaction field") }) @@ -67,7 +67,7 @@ func TestGetBlockJob(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes([]byte("{"))}}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err := job.Run(tests.Context(t)) require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42") }) @@ -79,7 +79,7 @@ func TestGetBlockJob(t *testing.T) { require.NoError(t, err) block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB)}}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err = job.Run(tests.Context(t)) require.ErrorContains(t, err, "expected all transactions to have at least one signature 0 in slot 42") }) @@ -91,7 +91,7 @@ func TestGetBlockJob(t *testing.T) { require.NoError(t, err) block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB)}}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block), lggr, slotNumber) err = job.Run(tests.Context(t)) require.ErrorContains(t, err, "expected transaction to have meta. signature: 2AnZxg8HN2sGa7GC7iWGDgpXbEasqXQNEumCjvHUFDcBnfRKAdaN3SvKLhbQwheN15xDkL5D5mdX21A5gH1MdYB; slot: 42; idx: 0") }) @@ -101,11 +101,13 @@ func TestGetBlockJob(t *testing.T) { tx := solana.Transaction{Signatures: make([]solana.Signature, 1)} txB, err := tx.MarshalBinary() require.NoError(t, err) - block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB), Meta: &rpc.TransactionMeta{}}}} - client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) ctx, cancel := context.WithCancel(tests.Context(t)) - cancel() + block := rpc.GetBlockResult{BlockHeight: ptr(uint64(10)), BlockTime: ptr(solana.UnixTimeSeconds(10)), Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB), Meta: &rpc.TransactionMeta{}}}} + client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).RunAndReturn(func(ctx context.Context, u uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + cancel() + return &block, nil + }).Once() + job := newGetBlockJob(ctx.Done(), client, make(chan Block), lggr, slotNumber) err = job.Run(ctx) require.ErrorIs(t, err, context.Canceled) select { @@ -133,7 +135,7 @@ func TestGetBlockJob(t *testing.T) { blockTime := solana.UnixTimeSeconds(128) block := rpc.GetBlockResult{BlockHeight: &height, BlockTime: ptr(blockTime), Blockhash: solana.Hash{1, 2, 3}, Transactions: []rpc.TransactionWithMeta{txWithMeta1, txWithMeta2, txWithMeta3}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() - job := newGetBlockJob(client, make(chan Block, 1), lggr, slotNumber) + job := newGetBlockJob(nil, client, make(chan Block, 1), lggr, slotNumber) job.parseProgramLogs = func(logs []string) []ProgramOutput { result := ProgramOutput{ Program: "myProgram", diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index ddd940602..8aa592773 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -112,14 +112,14 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots blocks := make(chan Block) getBlockJobs := make([]*getBlockJob, len(slots)) for i, slot := range slots { - getBlockJobs[i] = newGetBlockJob(c.client, blocks, c.lggr, slot) + getBlockJobs[i] = newGetBlockJob(ctx.Done(), c.client, blocks, c.lggr, slot) err := c.workers.Do(ctx, getBlockJobs[i]) if err != nil { return nil, fmt.Errorf("could not schedule job to fetch blocks for slot: %w", err) } } - c.engine.Go(func(ctx context.Context) { + go func() { for _, job := range getBlockJobs { select { case <-ctx.Done(): @@ -129,7 +129,7 @@ func (c *EncodedLogCollector) scheduleBlocksFetching(ctx context.Context, slots } } close(blocks) - }) + }() return blocks, nil } @@ -142,6 +142,13 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse c.lggr.Debugw("Got all slots that need fetching for backfill operations", "addresses", PublicKeysToString(addresses), "fromSlot", fromSlot, "toSlot", toSlot, "slotsToFetch", slotsToFetch) + ctx, cancelJobs := context.WithCancel(ctx) + defer func() { + // if failed to start backfill process - cancel jobs + if err != nil { + cancelJobs() + } + }() unorderedBlocks, err := c.scheduleBlocksFetching(ctx, slotsToFetch) if err != nil { return nil, func() {}, fmt.Errorf("failed to schedule blocks to fetch: %w", err) @@ -153,6 +160,7 @@ func (c *EncodedLogCollector) BackfillForAddresses(ctx context.Context, addresse } cleanUp = func() { + cancelJobs() err := blocksSorter.Close() if err != nil { blocksSorter.lggr.Errorw("Failed to close blocks sorter", "err", err) diff --git a/pkg/solana/logpoller/worker/worker.go b/pkg/solana/logpoller/worker/worker.go index 65cab9603..54d031419 100644 --- a/pkg/solana/logpoller/worker/worker.go +++ b/pkg/solana/logpoller/worker/worker.go @@ -3,6 +3,7 @@ package worker import ( "context" "crypto/rand" + "errors" "fmt" "math/big" "sync" @@ -37,22 +38,30 @@ type worker struct { } func (w *worker) Do(ctx context.Context, job Job) { - if ctx.Err() == nil { - start := time.Now() - w.Lggr.Debugf("Starting job %s", job.String()) - if err := job.Run(ctx); err != nil { - w.Lggr.Errorf("job %s failed with error; retrying: %s", job, err) - w.Retry <- job - } else { - w.Lggr.Debugf("Finished job %s in %s", job.String(), time.Since(start)) + defer func() { + // put itself back on the queue when done + select { + case w.Queue <- w: + default: } + }() + if ctx.Err() != nil { + return } - // put itself back on the queue when done - select { - case w.Queue <- w: - default: + start := time.Now() + w.Lggr.Debugf("Starting job %s", job.String()) + if err := job.Run(ctx); err != nil { + if errors.Is(err, context.Canceled) { + w.Lggr.Debugf("job %s was canceled", job.String()) + return + } + w.Lggr.Errorf("job %s failed with error; retrying: %s", job, err) + w.Retry <- job + return } + + w.Lggr.Debugf("Finished job %s in %s", job.String(), time.Since(start)) } type Group struct { @@ -275,7 +284,7 @@ func (g *Group) processQueue(ctx context.Context) { } if g.queue.Len() >= DefaultNotifyQueueDepth { - g.lggr.Errorf("queue depth: %d", g.queue.Len()) + g.lggr.Warnf("queue depth is too large: %d", g.queue.Len()) } value, err := g.queue.Pop() From 0aedb575c33d7a454419f16c38518936d2e80ac3 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 18:26:17 +0100 Subject: [PATCH 3/8] Support non addressable arrays in IndexedValue --- pkg/solana/logpoller/log_poller.go | 16 +++++++++++----- pkg/solana/logpoller/types.go | 15 ++++++++++++++- pkg/solana/logpoller/types_test.go | 1 + 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index defc823c4..8aa60cbce 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -109,8 +109,8 @@ func (lp *Service) start(_ context.Context) error { } func makeLogIndex(txIndex int, txLogIndex uint) (int64, error) { - if txIndex > 0 && txIndex < math.MaxInt32 && txLogIndex < math.MaxUint32 { - return int64(txIndex<<32) | int64(txLogIndex), nil + if txIndex >= 0 && txIndex < math.MaxInt32 && txLogIndex < math.MaxUint32 { + return int64(txIndex<<32) | int64(txLogIndex), nil //nolint:gosec } return 0, fmt.Errorf("txIndex or txLogIndex out of range: txIndex=%d, txLogIndex=%d", txIndex, txLogIndex) } @@ -263,6 +263,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int func (lp *Service) processBlocksRange(ctx context.Context, addresses []PublicKey, from, to int64) error { // nolint:gosec // G115: integer overflow conversion uint64 -> int64 + lp.lggr.Infow("Processing block range", "from", from, "to", to) blocks, cleanup, err := lp.loader.BackfillForAddresses(ctx, addresses, uint64(from), uint64(to)) if err != nil { return fmt.Errorf("error backfilling filters: %w", err) @@ -304,8 +305,13 @@ func (lp *Service) processBlocksImpl(ctx context.Context, blocks []Block) error return nil } -func (lp *Service) run(ctx context.Context) error { - err := lp.filters.LoadFilters(ctx) +func (lp *Service) run(ctx context.Context) (err error) { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("panic recovered: %v", rec) + } + }() + err = lp.filters.LoadFilters(ctx) if err != nil { return fmt.Errorf("error loading filters: %w", err) } @@ -317,7 +323,7 @@ func (lp *Service) run(ctx context.Context) error { filtersToBackfill := lp.filters.GetFiltersToBackfill() if len(filtersToBackfill) != 0 { - lp.lggr.Debugw("Got new filters to backfill", "filters", filtersToBackfill) + lp.lggr.Debugw("Got new filters to backfill", "filters_len", len(filtersToBackfill)) return lp.backfillFilters(ctx, filtersToBackfill, lastProcessedSlot) } diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index d76b7f5ab..e3c45e989 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -222,6 +222,11 @@ func (v IndexedValues) Value() (driver.Value, error) { } func newIndexedValue(typedVal any) (iVal IndexedValue, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic recovered: %v while creating indexedValue for %T", r, typedVal) + } + }() // handle 2 simplest cases first switch t := typedVal.(type) { case []byte: @@ -248,7 +253,15 @@ func newIndexedValue(typedVal any) (iVal IndexedValue, err error) { // any length array is fine as long as the element type is byte if t := v.Type(); t.Kind() == reflect.Array { if t.Elem().Kind() == reflect.Uint8 { - return v.Bytes(), nil + if v.CanAddr() { + return v.Bytes(), nil + } + result := make([]byte, v.Len()) + l := v.Len() + for i := 0; i < l; i++ { + result[i] = byte(v.Index(i).Uint()) + } + return result, nil } } return nil, fmt.Errorf("can't create indexed value from type %T", typedVal) diff --git a/pkg/solana/logpoller/types_test.go b/pkg/solana/logpoller/types_test.go index 499ce53e5..368979387 100644 --- a/pkg/solana/logpoller/types_test.go +++ b/pkg/solana/logpoller/types_test.go @@ -32,6 +32,7 @@ func TestIndexedValue(t *testing.T) { {"string", "abcd", "abcdef"}, {"[]byte", []byte("abcc"), []byte("abcd")}, {"[]byte", []byte("abcd"), []byte("abcdef")}, + {"[2]byte", [2]byte{1, 2}, [2]byte{2, 2}}, } for _, c := range cases { t.Run(c.typeName, func(t *testing.T) { From 2e217cd800be60d7133383ca471e55debd5ad478 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 19:49:33 +0100 Subject: [PATCH 4/8] Fix log --- pkg/solana/logpoller/log_poller.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 8aa60cbce..d3559d931 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -8,7 +8,6 @@ import ( "fmt" "iter" "math" - "slices" "time" "github.com/gagliardetto/solana-go/rpc" @@ -68,6 +67,10 @@ type Service struct { } func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { + return newService(lggr, orm, cl) +} + +func newService(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) lp := &Service{ orm: orm, @@ -249,7 +252,7 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int return err } - lp.lggr.Infow("Done backfilling filters", "filters", slices.All(filters)) + lp.lggr.Infow("Done backfilling filters", "filters", len(filters), "from", minSlot, "to", to) for _, filter := range filters { filterErr := lp.filters.MarkFilterBackfilled(ctx, filter.ID) if filterErr != nil { From 39491b9220747338a67a7f0a1ab7b2eed2dd966d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 19:51:10 +0100 Subject: [PATCH 5/8] fix linter --- pkg/solana/logpoller/log_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index d3559d931..4cfc81236 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -264,9 +264,9 @@ func (lp *Service) backfillFilters(ctx context.Context, filters []Filter, to int } func (lp *Service) processBlocksRange(ctx context.Context, addresses []PublicKey, from, to int64) error { + lp.lggr.Infow("Processing block range", "from", from, "to", to) // nolint:gosec // G115: integer overflow conversion uint64 -> int64 - lp.lggr.Infow("Processing block range", "from", from, "to", to) blocks, cleanup, err := lp.loader.BackfillForAddresses(ctx, addresses, uint64(from), uint64(to)) if err != nil { return fmt.Errorf("error backfilling filters: %w", err) From ff42fa4332f15bfa3e0fe636d45467e0ee061e75 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 19:52:41 +0100 Subject: [PATCH 6/8] revert hardcoding from per test --- pkg/solana/logpoller/log_poller.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 4cfc81236..72260f103 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -8,6 +8,7 @@ import ( "fmt" "iter" "math" + "slices" "time" "github.com/gagliardetto/solana-go/rpc" @@ -67,10 +68,6 @@ type Service struct { } func New(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { - return newService(lggr, orm, cl) -} - -func newService(lggr logger.SugaredLogger, orm ORM, cl RPCClient) *Service { lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) lp := &Service{ orm: orm, From 62752023e97aec7f1179855f7b63befc5f967a93 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 20:24:29 +0100 Subject: [PATCH 7/8] fix build --- pkg/solana/logpoller/log_poller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 72260f103..29defd60e 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -8,7 +8,6 @@ import ( "fmt" "iter" "math" - "slices" "time" "github.com/gagliardetto/solana-go/rpc" From 4d1d043a9674434d97fa4962de4be341069c9590 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 6 Feb 2025 20:34:26 +0100 Subject: [PATCH 8/8] fix discriminator test --- pkg/solana/codec/discriminator_extractor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/solana/codec/discriminator_extractor_test.go b/pkg/solana/codec/discriminator_extractor_test.go index 32426fbee..53ad41a2b 100644 --- a/pkg/solana/codec/discriminator_extractor_test.go +++ b/pkg/solana/codec/discriminator_extractor_test.go @@ -38,7 +38,7 @@ func FuzzExtractorHappyPath(f *testing.F) { stdDecoded, err := base64.StdEncoding.DecodeString(testString) if err == nil { - require.Equal(t, stdDecoded[:8], extractor.Extract(testString)) + require.Equal(t, [8]byte(stdDecoded[:8]), extractor.Extract(testString)) } }) }