Skip to content

Commit

Permalink
Merge branch 'develop' into implement-relayer-chain-components-constr…
Browse files Browse the repository at this point in the history
…uctors
  • Loading branch information
ilija42 authored Jan 24, 2025
2 parents d4cda01 + 22e6a53 commit b3093a3
Show file tree
Hide file tree
Showing 29 changed files with 2,498 additions and 1,391 deletions.
16 changes: 15 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,23 @@ packages:
github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller:
interfaces:
RPCClient:
WorkerGroup:
filtersI:
config:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_filters.go
mockname: mockFilters
logsLoader:
config:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_logs_loader.go
mockname: mockLogsLoader
ORM:
config:
inpackage: True
dir: "pkg/solana/logpoller"
filename: mock_orm.go
mockname: mockORM
mockname: MockORM

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"encoding/base64"
"fmt"
"os"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/ws"
"github.com/gagliardetto/solana-go/text"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -63,15 +65,18 @@ func TestEventLoader(t *testing.T) {
totalLogsToSend := 30
parser := &printParser{t: t}
sender := newLogSender(t, rpcClient, wsClient)
collector := logpoller.NewEncodedLogCollector(
cl,
parser,
logger.Nop(),
)
orm := logpoller.NewMockORM(t) // TODO: replace with real DB, when available
programPubKey, err := solana.PublicKeyFromBase58(programPubKey)
require.NoError(t, err)
orm.EXPECT().SelectFilters(mock.Anything).Return([]logpoller.Filter{{ID: 1, IsBackfilled: false, Address: logpoller.PublicKey(programPubKey)}}, nil).Once()
orm.EXPECT().MarkFilterBackfilled(mock.Anything, mock.Anything).Return(nil).Once()
orm.EXPECT().GetLatestBlock(mock.Anything).Return(0, sql.ErrNoRows)
orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{1: 0}, nil).Once()
lp := logpoller.NewWithCustomProcessor(logger.TestSugared(t), orm, cl, parser.ProcessBlocks)

require.NoError(t, collector.Start(ctx))
require.NoError(t, lp.Start(ctx))
t.Cleanup(func() {
require.NoError(t, collector.Close())
require.NoError(t, lp.Close())
})

go func(ctx context.Context, sender *logSender, privateKey *solana.PrivateKey) {
Expand Down Expand Up @@ -145,26 +150,39 @@ type printParser struct {
values []uint64
}

func (p *printParser) Process(evt logpoller.ProgramEvent) error {
p.t.Helper()

data, err := base64.StdEncoding.DecodeString(evt.Data)
if err != nil {
return err
func (p *printParser) ProcessBlocks(ctx context.Context, blocks []logpoller.Block) error {
for _, b := range blocks {
err := p.process(b)
if err != nil {
return err
}
}

return nil
}

func (p *printParser) process(block logpoller.Block) error {
p.t.Helper()

sum := sha256.Sum256([]byte("event:TestEvent"))
sig := sum[:8]

if bytes.Equal(sig, data[:8]) {
var event testEvent
if err := bin.UnmarshalBorsh(&event, data[8:]); err != nil {
return nil
for _, evt := range block.Events {
data, err := base64.StdEncoding.DecodeString(evt.Data)
if err != nil {
return err
}

p.mu.Lock()
p.values = append(p.values, event.U64Value)
p.mu.Unlock()
if bytes.Equal(sig, data[:8]) {
var event testEvent
if err := bin.UnmarshalBorsh(&event, data[8:]); err != nil {
return nil
}

p.mu.Lock()
p.values = append(p.values, event.U64Value)
p.mu.Unlock()
}
}

return nil
Expand Down
8 changes: 6 additions & 2 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

// MaxSupportTransactionVersion defines max transaction version to return in responses.
// If the requested block contains a transaction with a higher version, an error will be returned.
const MaxSupportTransactionVersion = uint64(0) // (legacy + v0)

const (
DevnetGenesisHash = "EtWTRABZaYq6iMfeYKouRu166VU2xqa1wcaWoxPkrZBG"
TestnetGenesisHash = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY"
Expand Down Expand Up @@ -44,6 +48,7 @@ type Reader interface {
GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)
GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error)
GetSignaturesForAddressWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)
SlotHeightWithCommitment(ctx context.Context, commitment rpc.CommitmentType) (uint64, error)
}

// AccountReader is an interface that allows users to pass either the solana rpc client or the relay client
Expand Down Expand Up @@ -372,12 +377,11 @@ func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult
defer done()
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()

// Adding slot to the key so concurrent calls to GetBlock for different slots are not merged. Without including the slot,
// it would treat all GetBlock calls as identical and merge them, returning whichever block it fetched first to all callers.
key := fmt.Sprintf("GetBlockWithOpts(%d)", slot)
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
version := uint64(0) // pull all tx types (legacy + v0)
version := MaxSupportTransactionVersion
return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
Commitment: c.commitment,
MaxSupportedTransactionVersion: &version,
Expand Down
57 changes: 57 additions & 0 deletions pkg/solana/client/mocks/reader_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/solana/client/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,12 @@ func (m *MultiClient) GetBlockWithOpts(ctx context.Context, slot uint64, opts *r

return r.GetBlockWithOpts(ctx, slot, opts)
}

func (m *MultiClient) SlotHeightWithCommitment(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) {
r, err := m.getClient()
if err != nil {
return 0, err
}

return r.SlotHeightWithCommitment(ctx, commitment)
}
135 changes: 135 additions & 0 deletions pkg/solana/logpoller/blocks_sorter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package logpoller

import (
"container/list"
"context"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

const blocksChBuffer = 16

type blocksSorter struct {
// service state management
services.Service
engine *services.Engine
lggr logger.Logger

inBlocks <-chan Block
receivedNewBlock chan struct{}

outBlocks chan Block

mu sync.Mutex
queue *list.List
readyBlocks map[uint64]Block
}

// newBlocksSorter - returns new instance of blocksSorter that writes blocks into output channel in order defined by expectedBlocks.
func newBlocksSorter(inBlocks <-chan Block, lggr logger.Logger, expectedBlocks []uint64) (*blocksSorter, <-chan Block) {
op := &blocksSorter{
queue: list.New(),
readyBlocks: make(map[uint64]Block),
inBlocks: inBlocks,
outBlocks: make(chan Block, blocksChBuffer),
receivedNewBlock: make(chan struct{}, 1),
lggr: lggr,
}

for _, b := range expectedBlocks {
op.queue.PushBack(b)
}

op.Service, op.engine = services.Config{
Name: "blocksSorter",
Start: op.start,
Close: nil,
}.NewServiceEngine(lggr)

return op, op.outBlocks
}

func (p *blocksSorter) start(_ context.Context) error {
p.engine.Go(p.writeOrderedBlocks)
p.engine.Go(p.readBlocks)
return nil
}

func (p *blocksSorter) readBlocks(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case block, ok := <-p.inBlocks:
if !ok {
close(p.receivedNewBlock) // trigger last flush of ready blocks
return
}

p.mu.Lock()
p.readyBlocks[block.SlotNumber] = block
p.mu.Unlock()
// try leaving a msg that new block is ready
select {
case p.receivedNewBlock <- struct{}{}:
default:
}
}
}
}

func (p *blocksSorter) writeOrderedBlocks(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case _, ok := <-p.receivedNewBlock:
p.flushReadyBlocks(ctx)
if !ok {
p.mu.Lock()
// signal to consumer that work is done, when it's actually done
if p.queue.Len() == 0 {
close(p.outBlocks)
}
p.mu.Unlock()
return
}
}
}
}

func (p *blocksSorter) readNextReadyBlock() *Block {
p.mu.Lock()
defer p.mu.Unlock()
element := p.queue.Front()
if element == nil {
return nil
}

slotNumber := element.Value.(uint64)
block, ok := p.readyBlocks[slotNumber]
if !ok {
return nil
}

p.queue.Remove(element)
return &block
}

// flushReadyBlocks - sends all blocks in order defined by queue to the consumer.
func (p *blocksSorter) flushReadyBlocks(ctx context.Context) {
for {
block := p.readNextReadyBlock()
if block == nil {
return
}

select {
case p.outBlocks <- *block:
case <-ctx.Done():
return
}
}
}
Loading

0 comments on commit b3093a3

Please sign in to comment.