From 3fd4356e524941ec81c22091fca2f0a68afb1954 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 10 Feb 2025 10:52:22 -0800 Subject: [PATCH] Address race condition --- pkg/solana/chainreader/chain_reader.go | 3 +- pkg/solana/chainreader/chain_reader_test.go | 57 +++++++++++++-------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 43f1bc7e6..5fcb33bcf 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -118,7 +118,8 @@ func (s *ContractReaderService) Start(ctx context.Context) error { // Start EventReader if it hasn't already been // Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it err := s.reader.Start(ctx) - if err != nil { + if err != nil && + !strings.Contains(err.Error(), "has already been started") { // in case another thread calls Start() after Ready() returns return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err) } } diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index e58eebcb5..94b9c785b 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -15,6 +15,8 @@ import ( "github.com/cometbft/cometbft/libs/service" "github.com/gagliardetto/solana-go" + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -36,6 +38,8 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + lpmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" ) const ( @@ -67,7 +71,7 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { t.Parallel() ctx := tests.Context(t) - svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil) + svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedMultipleAccountGetter), config.ContractReader{}, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -93,9 +97,18 @@ func TestSolanaChainReaderService_Start(t *testing.T) { ctx := tests.Context(t) lggr := logger.Test(t) - rpcClient := new(mockedRPCClient) + rpcClient := lpmocks.NewRPCClient(t) pk := solana.NewWallet().PublicKey() + dbx := sqltest.NewDB(t, sqltest.TestURL(t)) + orm := logpoller.NewORM(uuid.NewString(), dbx, lggr) + lp := logpoller.New(logger.Sugared(lggr), orm, rpcClient) + err := lp.Start(ctx) + require.NoError(t, err) + alreadyStartedErr := lp.Start(ctx) + require.Error(t, alreadyStartedErr) + require.NoError(t, lp.Close()) + accountReadDef := config.ReadDefinition{ ChainSpecificName: "myAccount", ReadType: config.Account, @@ -111,12 +124,14 @@ func TestSolanaChainReaderService_Start(t *testing.T) { ReadDef config.ReadDefinition StartError error RegisterFilterError error + ExpectError bool }{ {Name: "no event reads", ReadDef: accountReadDef}, {Name: "already started", ReadDef: eventReadDef}, {Name: "successful start", ReadDef: eventReadDef}, - {Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, - {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")}, + {Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader"), ExpectError: true}, + {Name: "already starting", ReadDef: eventReadDef, StartError: alreadyStartedErr}, + {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter"), ExpectError: true}, } boolType := codec.IdlType{} @@ -141,10 +156,12 @@ func TestSolanaChainReaderService_Start(t *testing.T) { }, AddressShareGroups: nil, } + + mockedMultipleAccountGetter := new(mockedMultipleAccountGetter) er := mocks.NewEventsReader(t) svc, err := chainreader.NewContractReaderService( lggr, - rpcClient, + mockedMultipleAccountGetter, cfg, er, ) require.NoError(t, err) @@ -158,7 +175,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { er.On("Start", mock.Anything).Maybe().Return(tt.StartError) er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError) err = svc.Start(ctx) - if tt.StartError != nil || tt.RegisterFilterError != nil { + if tt.ExpectError { assert.Error(t, err) } else { assert.NoError(t, err) @@ -175,7 +192,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { expectedStartCalls = 0 } er.AssertNumberOfCalls(t, "Start", expectedStartCalls) - if tt.StartError != nil { + if tt.Name == "unsuccessful start" { expectedRegisterFilterCalls = 0 } er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls) @@ -197,7 +214,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -233,7 +250,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) expectedErr := fmt.Errorf("expected error") svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) @@ -268,7 +285,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -289,7 +306,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -310,7 +327,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -491,7 +508,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { encoded, err := testCodec.Encode(ctx, expected, testutils.TestStructWithNestedStruct) require.NoError(t, err) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -541,7 +558,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { } _, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -645,13 +662,13 @@ type mockedRPCCall struct { } // TODO BCI-3156 use a localnet for testing instead of a mock. -type mockedRPCClient struct { +type mockedMultipleAccountGetter struct { mu sync.Mutex responseByAddress map[string]mockedRPCCall sequence []mockedRPCCall } -func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) { +func (_m *mockedMultipleAccountGetter) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) { result := make([][]byte, len(keys)) for idx, key := range keys { @@ -668,7 +685,7 @@ func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...sol return result, nil } -func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) { +func (_m *mockedMultipleAccountGetter) SetNext(bts []byte, err error, delay time.Duration) { _m.mu.Lock() defer _m.mu.Unlock() @@ -679,7 +696,7 @@ func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) { }) } -func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) { +func (_m *mockedMultipleAccountGetter) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) { _m.mu.Lock() defer _m.mu.Unlock() @@ -779,7 +796,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) { } func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader { - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource) if err != nil { t.Logf("chain reader service was not able to start: %s", err.Error()) @@ -807,7 +824,7 @@ type wrappedTestChainReader struct { test *testing.T service *chainreader.ContractReaderService - client *mockedRPCClient + client *mockedMultipleAccountGetter tester ChainComponentsInterfaceTester[*testing.T] testStructQueue []*TestStruct }