Skip to content

Commit

Permalink
Address race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 10, 2025
1 parent abe6862 commit 6a35122
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/solana/chainreader/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *ContractReaderService) Start(ctx context.Context) error {
// Start EventReader if it hasn't already been
// Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it
err := s.reader.Start(ctx)
if err != nil {
if err != nil &&
!strings.Contains(err.Error(), "has already been started") { // in case another thread calls Start() after Ready() returns
return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err)
}
}
Expand Down
57 changes: 37 additions & 20 deletions pkg/solana/chainreader/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/cometbft/cometbft/libs/service"
"github.com/gagliardetto/solana-go"
"github.com/google/uuid"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -36,6 +38,8 @@ import (
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
lpmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks"
)

const (
Expand Down Expand Up @@ -67,7 +71,7 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedMultipleAccountGetter), config.ContractReader{}, nil)

require.NoError(t, err)
require.NotNil(t, svc)
Expand All @@ -93,9 +97,18 @@ func TestSolanaChainReaderService_Start(t *testing.T) {

ctx := tests.Context(t)
lggr := logger.Test(t)
rpcClient := new(mockedRPCClient)
rpcClient := lpmocks.NewRPCClient(t)
pk := solana.NewWallet().PublicKey()

dbx := sqltest.NewDB(t, sqltest.TestURL(t))
orm := logpoller.NewORM(uuid.NewString(), dbx, lggr)
lp := logpoller.New(logger.Sugared(lggr), orm, rpcClient)
err := lp.Start(ctx)
require.NoError(t, err)
alreadyStartedErr := lp.Start(ctx)
require.Error(t, alreadyStartedErr)
require.NoError(t, lp.Close())

accountReadDef := config.ReadDefinition{
ChainSpecificName: "myAccount",
ReadType: config.Account,
Expand All @@ -111,12 +124,14 @@ func TestSolanaChainReaderService_Start(t *testing.T) {
ReadDef config.ReadDefinition
StartError error
RegisterFilterError error
ExpectError bool
}{
{Name: "no event reads", ReadDef: accountReadDef},
{Name: "already started", ReadDef: eventReadDef},
{Name: "successful start", ReadDef: eventReadDef},
{Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")},
{Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")},
{Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader"), ExpectError: true},
{Name: "already starting", ReadDef: eventReadDef, StartError: alreadyStartedErr},
{Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter"), ExpectError: true},
}

boolType := codec.IdlType{}
Expand All @@ -141,10 +156,12 @@ func TestSolanaChainReaderService_Start(t *testing.T) {
},
AddressShareGroups: nil,
}

mockedMultipleAccountGetter := new(mockedMultipleAccountGetter)
er := mocks.NewEventsReader(t)
svc, err := chainreader.NewContractReaderService(
lggr,
rpcClient,
mockedMultipleAccountGetter,
cfg, er,
)
require.NoError(t, err)
Expand All @@ -158,7 +175,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) {
er.On("Start", mock.Anything).Maybe().Return(tt.StartError)
er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError)
err = svc.Start(ctx)
if tt.StartError != nil || tt.RegisterFilterError != nil {
if tt.ExpectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
Expand All @@ -175,7 +192,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) {
expectedStartCalls = 0
}
er.AssertNumberOfCalls(t, "Start", expectedStartCalls)
if tt.StartError != nil {
if tt.Name == "unsuccessful start" {
expectedRegisterFilterCalls = 0
}
er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls)
Expand All @@ -197,7 +214,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -233,7 +250,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
expectedErr := fmt.Errorf("expected error")
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

Expand Down Expand Up @@ -268,7 +285,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -289,7 +306,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -310,7 +327,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -491,7 +508,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
encoded, err := testCodec.Encode(ctx, expected, testutils.TestStructWithNestedStruct)
require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -541,7 +558,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
}
_, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -645,13 +662,13 @@ type mockedRPCCall struct {
}

// TODO BCI-3156 use a localnet for testing instead of a mock.
type mockedRPCClient struct {
type mockedMultipleAccountGetter struct {
mu sync.Mutex
responseByAddress map[string]mockedRPCCall
sequence []mockedRPCCall
}

func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
func (_m *mockedMultipleAccountGetter) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
result := make([][]byte, len(keys))

for idx, key := range keys {
Expand All @@ -668,7 +685,7 @@ func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...sol
return result, nil
}

func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetNext(bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand All @@ -679,7 +696,7 @@ func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
})
}

func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand Down Expand Up @@ -779,7 +796,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) {
}

func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader {
client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource)
if err != nil {
t.Logf("chain reader service was not able to start: %s", err.Error())
Expand Down Expand Up @@ -807,7 +824,7 @@ type wrappedTestChainReader struct {

test *testing.T
service *chainreader.ContractReaderService
client *mockedRPCClient
client *mockedMultipleAccountGetter
tester ChainComponentsInterfaceTester[*testing.T]
testStructQueue []*TestStruct
}
Expand Down

0 comments on commit 6a35122

Please sign in to comment.