Skip to content

Commit

Permalink
Lazy-start LogPoller from ContractReader, stop if started on shutdown (
Browse files Browse the repository at this point in the history
…#1051)

* Lazy-start LogPoller from ContractReader, stop if started on shutdown

* Remove StateMachine (causes "ambiguous selector" error for Ready()

* Commit chainreader/mocks

* Resolve merge conflicts

* fix lints

* Address race condition

* Pass EventIdl & fix EventSignature
  • Loading branch information
reductionista authored Feb 12, 2025
1 parent 586087b commit 77f8e8f
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 22 deletions.
4 changes: 3 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ packages:
dir: "pkg/solana/logpoller"
filename: mock_orm.go
mockname: MockORM

github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader:
interfaces:
EventsReader:
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-solana
go 1.23.3

require (
github.com/cometbft/cometbft v0.37.5
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/gagliardetto/binary v0.8.0
github.com/gagliardetto/gofuzz v1.2.2
Expand Down Expand Up @@ -50,6 +51,8 @@ require (
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,13 @@ github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
4 changes: 4 additions & 0 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

type LogPoller interface {
Start(context.Context) error
Ready() error
Close() error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, name string) error
Expand Down Expand Up @@ -547,6 +548,9 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping multinode")
closeAll = append(closeAll, c.multiNode, c.txSender)
}
if c.lp.Ready() == nil {
c.lp.Close()
}
return services.CloseAll(closeAll...)
})
}
Expand Down
29 changes: 23 additions & 6 deletions pkg/solana/chainreader/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
)

type EventsReader interface {
Start(ctx context.Context) error
Ready() error
RegisterFilter(context.Context, logpoller.Filter) error
FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)
}
Expand Down Expand Up @@ -115,13 +117,25 @@ func (s *ContractReaderService) Name() string {
// and error.
func (s *ContractReaderService) Start(ctx context.Context) error {
return s.StartOnce(ServiceName, func() error {
if len(s.filters) == 0 {
// No dependency on EventReader
return nil
}
if s.reader.Ready() != nil {
// Start EventReader if it hasn't already been
// Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it
err := s.reader.Start(ctx)
if err != nil &&
!strings.Contains(err.Error(), "has already been started") { // in case another thread calls Start() after Ready() returns
return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err)
}
}
// registering filters needs a context so we should be able to use the start function context.
for _, filter := range s.filters {
if err := s.reader.RegisterFilter(ctx, filter); err != nil {
return err
}
}

return nil
})
}
Expand Down Expand Up @@ -533,8 +547,8 @@ func (s *ContractReaderService) addAddressResponseHardCoderModifier(namespace st
func (s *ContractReaderService) addEventRead(
namespace, genericName string,
contractAddress solana.PublicKey,
_ codec.IDL,
_ codec.IdlEvent,
idl codec.IDL,
eventIdl codec.IdlEvent,
readDefinition config.ReadDefinition,
events EventsReader,
) error {
Expand All @@ -546,7 +560,8 @@ func (s *ContractReaderService) addEventRead(
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField2, 2)
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField3, 3)

filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:])
filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:],
codec.EventIDLTypes{Event: eventIdl, Types: idl.Types})

s.filters = append(s.filters, filter)
s.bdRegistry.AddReadBinding(namespace, genericName, newEventReadBinding(
Expand All @@ -565,12 +580,14 @@ func toLPFilter(
f *config.PollingFilter,
address solana.PublicKey,
subKeyPaths [][]string,
eventIdl codec.EventIDLTypes,
) logpoller.Filter {
return logpoller.Filter{
Address: logpoller.PublicKey(address),
EventName: f.EventName,
EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]),
SubkeyPaths: logpoller.SubKeyPaths(subKeyPaths),
EventSig: logpoller.NewEventSignatureFromName(f.EventName),
EventIdl: logpoller.EventIdl(eventIdl),
SubkeyPaths: subKeyPaths,
Retention: f.Retention,
MaxLogsKept: f.MaxLogsKept,
}
Expand Down
143 changes: 129 additions & 14 deletions pkg/solana/chainreader/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/service"
"github.com/gagliardetto/solana-go"
"github.com/google/uuid"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"
Expand All @@ -30,9 +34,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader/mocks"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
lpmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks"
)

const (
Expand Down Expand Up @@ -64,7 +71,7 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedMultipleAccountGetter), config.ContractReader{}, nil)

require.NoError(t, err)
require.NotNil(t, svc)
Expand All @@ -85,6 +92,114 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
require.Error(t, svc.Close())
}

func TestSolanaChainReaderService_Start(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
lggr := logger.Test(t)
rpcClient := lpmocks.NewRPCClient(t)
pk := solana.NewWallet().PublicKey()

dbx := sqltest.NewDB(t, sqltest.TestURL(t))
orm := logpoller.NewORM(uuid.NewString(), dbx, lggr)
lp := logpoller.New(logger.Sugared(lggr), orm, rpcClient)
err := lp.Start(ctx)
require.NoError(t, err)
alreadyStartedErr := lp.Start(ctx)
require.Error(t, alreadyStartedErr)
require.NoError(t, lp.Close())

accountReadDef := config.ReadDefinition{
ChainSpecificName: "myAccount",
ReadType: config.Account,
}
eventReadDef := config.ReadDefinition{
ChainSpecificName: "myEvent",
ReadType: config.Event,
PollingFilter: &config.PollingFilter{EventName: "myEventSig.........."},
}

testCases := []struct {
Name string
ReadDef config.ReadDefinition
StartError error
RegisterFilterError error
ExpectError bool
}{
{Name: "no event reads", ReadDef: accountReadDef},
{Name: "already started", ReadDef: eventReadDef},
{Name: "successful start", ReadDef: eventReadDef},
{Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader"), ExpectError: true},
{Name: "already starting", ReadDef: eventReadDef, StartError: alreadyStartedErr},
{Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter"), ExpectError: true},
}

boolType := codec.IdlType{}
require.NoError(t, boolType.UnmarshalJSON([]byte("\"bool\"")))

for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
cfg := config.ContractReader{
Namespaces: map[string]config.ChainContractReader{
"myChainReader": {
IDL: codec.IDL{
Accounts: []codec.IdlTypeDef{{Name: "myAccount",
Type: codec.IdlTypeDefTy{
Kind: codec.IdlTypeDefTyKindStruct,
Fields: &[]codec.IdlField{}}}},
Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}},
},
ContractAddress: pk,
Reads: map[string]config.ReadDefinition{
"myRead": tt.ReadDef},
},
},
AddressShareGroups: nil,
}

mockedMultipleAccountGetter := new(mockedMultipleAccountGetter)
er := mocks.NewEventsReader(t)
svc, err := chainreader.NewContractReaderService(
lggr,
mockedMultipleAccountGetter,
cfg, er,
)
require.NoError(t, err)

er.On("Ready").Maybe().Return(func() error {
if tt.Name == "already started" {
return nil
}
return service.ErrNotStarted
}())
er.On("Start", mock.Anything).Maybe().Return(tt.StartError)
er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError)
err = svc.Start(ctx)
if tt.ExpectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}

var expectedReadyCalls, expectedStartCalls, expectedRegisterFilterCalls int
if tt.ReadDef.ReadType == config.Event {
expectedStartCalls = 1
expectedReadyCalls = 1
expectedRegisterFilterCalls = 1
}
er.AssertNumberOfCalls(t, "Ready", expectedReadyCalls)
if tt.Name == "already started" {
expectedStartCalls = 0
}
er.AssertNumberOfCalls(t, "Start", expectedStartCalls)
if tt.Name == "unsuccessful start" {
expectedRegisterFilterCalls = 0
}
er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls)
})
}
}

func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
ctx := tests.Context(t)

Expand All @@ -99,7 +214,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -135,7 +250,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
expectedErr := fmt.Errorf("expected error")
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

Expand Down Expand Up @@ -170,7 +285,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -191,7 +306,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -212,7 +327,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -393,7 +508,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
encoded, err := testCodec.Encode(ctx, expected, testutils.TestStructWithNestedStruct)
require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -443,7 +558,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
}
_, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -547,13 +662,13 @@ type mockedRPCCall struct {
}

// TODO BCI-3156 use a localnet for testing instead of a mock.
type mockedRPCClient struct {
type mockedMultipleAccountGetter struct {
mu sync.Mutex
responseByAddress map[string]mockedRPCCall
sequence []mockedRPCCall
}

func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
func (_m *mockedMultipleAccountGetter) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
result := make([][]byte, len(keys))

for idx, key := range keys {
Expand All @@ -570,7 +685,7 @@ func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...sol
return result, nil
}

func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetNext(bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand All @@ -581,7 +696,7 @@ func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
})
}

func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand Down Expand Up @@ -681,7 +796,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) {
}

func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader {
client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource)
if err != nil {
t.Logf("chain reader service was not able to start: %s", err.Error())
Expand Down Expand Up @@ -709,7 +824,7 @@ type wrappedTestChainReader struct {

test *testing.T
service *chainreader.ContractReaderService
client *mockedRPCClient
client *mockedMultipleAccountGetter
tester ChainComponentsInterfaceTester[*testing.T]
testStructQueue []*TestStruct
}
Expand Down
Loading

0 comments on commit 77f8e8f

Please sign in to comment.