From 77f005550d019ab1a210e0395f41d5c98acc5c19 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Tue, 28 Jan 2025 09:15:47 -0600 Subject: [PATCH] Filter Comparator Remapping This commit remaps chain-agnostic filters into solana specific subkey filters. Event fields can be queried by providing a comparator filter with the off-chain field name and the off-chain field value. Both the field name and value will be converted to their on-chain equivalent. All input modifiers defined in the contract reader config will be applied to the value. --- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- .../chainreader/account_read_binding.go | 15 +- pkg/solana/chainreader/bindings.go | 14 +- pkg/solana/chainreader/bindings_test.go | 17 ++ pkg/solana/chainreader/chain_reader.go | 192 +++++++++++--- pkg/solana/chainreader/chain_reader_test.go | 29 ++- pkg/solana/chainreader/event_read_binding.go | 245 ++++++++++++++++++ pkg/solana/chainwriter/chain_writer.go | 4 +- pkg/solana/codec/decoder.go | 43 ++- pkg/solana/codec/encoder.go | 20 +- pkg/solana/codec/parsed_types.go | 57 +++- pkg/solana/codec/solana.go | 4 +- pkg/solana/codec/solana_test.go | 67 ++++- pkg/solana/config/chain_reader.go | 17 +- pkg/solana/logpoller/parser.go | 43 ++- pkg/solana/logpoller/parser_test.go | 16 +- pkg/solana/relay.go | 5 +- 20 files changed, 683 insertions(+), 117 deletions(-) create mode 100644 pkg/solana/chainreader/event_read_binding.go diff --git a/go.mod b/go.mod index 602265192..cd180c673 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/jackc/pgx/v4 v4.18.3 github.com/pelletier/go-toml/v2 v2.2.3 github.com/prometheus/client_golang v1.20.5 - github.com/smartcontractkit/chainlink-ccip v0.0.0-20250128162345-af4c8fd4481a + github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405 github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36 github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250203183025-939526523893 diff --git a/go.sum b/go.sum index 038a893d8..d25c9732e 100644 --- a/go.sum +++ b/go.sum @@ -575,8 +575,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20250128162345-af4c8fd4481a h1:xr9/8DDQKWTLcswFJVDquEOuICfsVKEAtJDrzTCknSM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20250128162345-af4c8fd4481a/go.mod h1:UEnHaxkUsfreeA7rR45LMmua1Uen95tOFUR8/AI9BAo= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405 h1:5QyaPGLmt+rlnvQL7drAE23Wq9rX5hO35kTZirAb97A= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405/go.mod h1:UEnHaxkUsfreeA7rR45LMmua1Uen95tOFUR8/AI9BAo= github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a h1:1MrD2OiP/CRfyBSwTQE66R1+gLWBgWcU/SYl/+DmZ/Y= github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a/go.mod h1:Bmwq4lNb5tE47sydN0TKetcLEGbgl+VxHEWp4S0LI60= github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36 h1:dytZPggag6auyzmbhpIDmkHu7KrflIBEhLLec4/xFIk= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index b1eb47507..b1eb9535d 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -342,7 +342,7 @@ require ( github.com/slack-go/slack v0.15.0 // indirect github.com/smartcontractkit/chain-selectors v1.0.37 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect - github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203130001-13e2609047e9 // indirect + github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405 // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250130125138-3df261e09ddc // indirect github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 5fd9e4c64..53c3cb104 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1227,8 +1227,8 @@ github.com/smartcontractkit/chain-selectors v1.0.37 h1:EKVl8wayhOVfnlqfVmEyZ8rXO github.com/smartcontractkit/chain-selectors v1.0.37/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8= github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgBc2xpDKBco/Q4h4ydl6+UUU= github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203130001-13e2609047e9 h1:+/KEPuWctPObgOoEEBCnli1/H3XnjMdCY3Tn+J32XRM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203130001-13e2609047e9/go.mod h1:UEnHaxkUsfreeA7rR45LMmua1Uen95tOFUR8/AI9BAo= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405 h1:5QyaPGLmt+rlnvQL7drAE23Wq9rX5hO35kTZirAb97A= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20250203132120-f0d42463e405/go.mod h1:UEnHaxkUsfreeA7rR45LMmua1Uen95tOFUR8/AI9BAo= github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a h1:1MrD2OiP/CRfyBSwTQE66R1+gLWBgWcU/SYl/+DmZ/Y= github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250128162345-af4c8fd4481a/go.mod h1:Bmwq4lNb5tE47sydN0TKetcLEGbgl+VxHEWp4S0LI60= github.com/smartcontractkit/chainlink-common v0.4.2-0.20250130202959-6f1f48342e36 h1:bS51NFGHVjkCy7yu9L2Ss4sBsCW6jpa5GuhRAdWWxzM= diff --git a/pkg/solana/chainreader/account_read_binding.go b/pkg/solana/chainreader/account_read_binding.go index b8854b38c..6a164be4e 100644 --- a/pkg/solana/chainreader/account_read_binding.go +++ b/pkg/solana/chainreader/account_read_binding.go @@ -2,11 +2,14 @@ package chainreader import ( "context" + "errors" "fmt" "github.com/gagliardetto/solana-go" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) @@ -35,6 +38,8 @@ func (b *accountReadBinding) SetCodec(codec types.RemoteCodec) { b.codec = codec } +func (b *accountReadBinding) SetModifier(commoncodec.Modifier) {} + func (b *accountReadBinding) SetAddress(key solana.PublicKey) { b.key = key } @@ -57,11 +62,11 @@ func (b *accountReadBinding) GetAddress(ctx context.Context, params any) (solana } func (b *accountReadBinding) CreateType(forEncoding bool) (any, error) { - return b.codec.CreateType(codec.WrapItemType(forEncoding, b.namespace, b.genericName, codec.ChainConfigTypeAccountDef), forEncoding) + return b.codec.CreateType(codec.WrapItemType(forEncoding, b.namespace, b.genericName), forEncoding) } func (b *accountReadBinding) Decode(ctx context.Context, bts []byte, outVal any) error { - return b.codec.Decode(ctx, bts, outVal, codec.WrapItemType(false, b.namespace, b.genericName, codec.ChainConfigTypeAccountDef)) + return b.codec.Decode(ctx, bts, outVal, codec.WrapItemType(false, b.namespace, b.genericName)) } // buildSeedsSlice encodes and builds the seedslist to calculate the PDA public key @@ -70,7 +75,7 @@ func (b *accountReadBinding) buildSeedsSlice(ctx context.Context, params any) ([ // Append the static prefix string first flattenedSeeds = append(flattenedSeeds, []byte(b.prefix)...) // Encode the seeds provided in the params - encodedParamSeeds, err := b.codec.Encode(ctx, params, codec.WrapItemType(true, b.namespace, b.genericName, "")) + encodedParamSeeds, err := b.codec.Encode(ctx, params, codec.WrapItemType(true, b.namespace, b.genericName)) if err != nil { return nil, fmt.Errorf("failed to encode params into bytes for PDA seeds: %w", err) } @@ -99,3 +104,7 @@ func (b *accountReadBinding) buildSeedsSlice(ctx context.Context, params any) ([ } return seedByteArray, nil } + +func (b *accountReadBinding) QueryKey(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ any) ([]types.Sequence, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/solana/chainreader/bindings.go b/pkg/solana/chainreader/bindings.go index 1b927df85..eba7eca15 100644 --- a/pkg/solana/chainreader/bindings.go +++ b/pkg/solana/chainreader/bindings.go @@ -6,15 +6,19 @@ import ( "github.com/gagliardetto/solana-go" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) type readBinding interface { SetAddress(solana.PublicKey) GetAddress(context.Context, any) (solana.PublicKey, error) SetCodec(types.RemoteCodec) + SetModifier(commoncodec.Modifier) CreateType(bool) (any, error) Decode(context.Context, []byte, any) error + QueryKey(context.Context, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) } // key is namespace @@ -72,10 +76,18 @@ func (b namespaceBindings) Bind(binding types.BoundContract) error { return nil } -func (b namespaceBindings) SetCodec(codec types.RemoteCodec) { +func (b namespaceBindings) SetCodecs(codec types.RemoteCodec) { for _, nbs := range b { for _, rb := range nbs { rb.SetCodec(codec) } } } + +func (b namespaceBindings) SetModifiers(modifier commoncodec.Modifier) { + for _, nbs := range b { + for _, rb := range nbs { + rb.SetModifier(modifier) + } + } +} diff --git a/pkg/solana/chainreader/bindings_test.go b/pkg/solana/chainreader/bindings_test.go index 3dec21194..51c583c52 100644 --- a/pkg/solana/chainreader/bindings_test.go +++ b/pkg/solana/chainreader/bindings_test.go @@ -9,7 +9,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) func TestBindings_CreateType(t *testing.T) { @@ -54,6 +56,10 @@ func (_m *mockBinding) GetAddress(_ context.Context, _ any) (solana.PublicKey, e return solana.PublicKey{}, nil } +func (_m *mockBinding) SetModifier(a commoncodec.Modifier) { + _m.Called(a) +} + func (_m *mockBinding) CreateType(b bool) (any, error) { ret := _m.Called(b) @@ -63,3 +69,14 @@ func (_m *mockBinding) CreateType(b bool) (any, error) { func (_m *mockBinding) Decode(_ context.Context, _ []byte, _ any) error { return nil } + +func (_m *mockBinding) QueryKey( + a context.Context, + b query.KeyFilter, + c query.LimitAndSort, + d any, +) ([]types.Sequence, error) { + ret := _m.Called(a, b, c, d) + + return ret.Get(0).([]types.Sequence), ret.Error(1) +} diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 41d70161b..ab26832c5 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "github.com/gagliardetto/solana-go" @@ -15,25 +16,34 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" ) -const ServiceName = "SolanaChainReader" +type EventsReader interface { + RegisterFilter(context.Context, logpoller.Filter) error + FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error) +} + +const ServiceName = "SolanaContractReader" -type SolanaChainReaderService struct { +type ContractReaderService struct { types.UnimplementedContractReader - // provided values + // provided dependencies lggr logger.Logger client MultipleAccountGetter + events EventsReader // internal values bindings namespaceBindings lookup *lookup parsed *codec.ParsedTypes codec types.RemoteCodec + filters []logpoller.Filter // service state management wg sync.WaitGroup @@ -41,18 +51,24 @@ type SolanaChainReaderService struct { } var ( - _ services.Service = &SolanaChainReaderService{} - _ types.ContractReader = &SolanaChainReaderService{} + _ services.Service = &ContractReaderService{} + _ types.ContractReader = &ContractReaderService{} ) -// NewChainReaderService is a constructor for a new ChainReaderService for Solana. Returns a nil service on error. -func NewChainReaderService(lggr logger.Logger, dataReader MultipleAccountGetter, cfg config.ContractReader) (*SolanaChainReaderService, error) { - svc := &SolanaChainReaderService{ +// NewContractReaderService is a constructor for a new ContractReaderService for Solana. Returns a nil service on error. +func NewContractReaderService( + lggr logger.Logger, + dataReader MultipleAccountGetter, + cfg config.ContractReader, + events EventsReader, +) (*ContractReaderService, error) { + svc := &ContractReaderService{ lggr: logger.Named(lggr, ServiceName), client: dataReader, bindings: namespaceBindings{}, lookup: newLookup(), parsed: &codec.ParsedTypes{EncoderDefs: map[string]codec.Entry{}, DecoderDefs: map[string]codec.Entry{}}, + filters: []logpoller.Filter{}, } if err := svc.init(cfg.Namespaces); err != nil { @@ -66,27 +82,36 @@ func NewChainReaderService(lggr logger.Logger, dataReader MultipleAccountGetter, svc.codec = svcCodec - svc.bindings.SetCodec(svcCodec) + svc.bindings.SetCodecs(svcCodec) + svc.bindings.SetModifiers(svc.parsed.Modifiers) + return svc, nil } // Name implements the services.ServiceCtx interface and returns the logger service name. -func (s *SolanaChainReaderService) Name() string { +func (s *ContractReaderService) Name() string { return s.lggr.Name() } // Start implements the services.ServiceCtx interface and starts necessary background services. // An error is returned if starting any internal services fails. Subsequent calls to Start return // and error. -func (s *SolanaChainReaderService) Start(_ context.Context) error { +func (s *ContractReaderService) Start(ctx context.Context) error { return s.StartOnce(ServiceName, func() error { + // registering filters needs a context so we should be able to use the start function context. + for _, filter := range s.filters { + if err := s.events.RegisterFilter(ctx, filter); err != nil { + return err + } + } + return nil }) } // Close implements the services.ServiceCtx interface and stops all background services and cleans // up used resources. Subsequent calls to Close return an error. -func (s *SolanaChainReaderService) Close() error { +func (s *ContractReaderService) Close() error { return s.StopOnce(ServiceName, func() error { s.wg.Wait() @@ -96,19 +121,19 @@ func (s *SolanaChainReaderService) Close() error { // Ready implements the services.ServiceCtx interface and returns an error if starting the service // encountered any errors or if the service is not ready to serve requests. -func (s *SolanaChainReaderService) Ready() error { +func (s *ContractReaderService) Ready() error { return s.StateMachine.Ready() } // HealthReport implements the services.ServiceCtx interface and returns errors for any internal // function or service that may have failed. -func (s *SolanaChainReaderService) HealthReport() map[string]error { +func (s *ContractReaderService) HealthReport() map[string]error { return map[string]error{s.Name(): s.Healthy()} } // GetLatestValue implements the types.ContractReader interface and requests and parses on-chain // data named by the provided contract, method, and params. -func (s *SolanaChainReaderService) GetLatestValue(ctx context.Context, readIdentifier string, _ primitives.ConfidenceLevel, params any, returnVal any) error { +func (s *ContractReaderService) GetLatestValue(ctx context.Context, readIdentifier string, _ primitives.ConfidenceLevel, params any, returnVal any) error { if err := s.Ready(); err != nil { return err } @@ -147,7 +172,7 @@ func (s *SolanaChainReaderService) GetLatestValue(ctx context.Context, readIdent } // BatchGetLatestValues implements the types.ContractReader interface. -func (s *SolanaChainReaderService) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) { +func (s *ContractReaderService) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) { idxLookup := make(map[types.BoundContract][]int) batch := []call{} @@ -191,13 +216,46 @@ func (s *SolanaChainReaderService) BatchGetLatestValues(ctx context.Context, req } // QueryKey implements the types.ContractReader interface. -func (s *SolanaChainReaderService) QueryKey(_ context.Context, _ types.BoundContract, _ query.KeyFilter, _ query.LimitAndSort, _ any) ([]types.Sequence, error) { - return nil, errors.New("unimplemented") +func (s *ContractReaderService) QueryKey(ctx context.Context, contract types.BoundContract, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]types.Sequence, error) { + binding, err := s.bindings.GetReadBinding(contract.Name, filter.Key) + if err != nil { + return nil, err + } + + _, isValuePtr := sequenceDataType.(*values.Value) + if !isValuePtr { + return binding.QueryKey(ctx, filter, limitAndSort, sequenceDataType) + } + + dataTypeFromReadIdentifier, err := s.CreateContractType(contract.ReadIdentifier(filter.Key), false) + if err != nil { + return nil, err + } + + sequence, err := binding.QueryKey(ctx, filter, limitAndSort, dataTypeFromReadIdentifier) + if err != nil { + return nil, err + } + + sequenceOfValues := make([]types.Sequence, len(sequence)) + for idx, entry := range sequence { + value, err := values.Wrap(entry.Data) + if err != nil { + return nil, err + } + sequenceOfValues[idx] = types.Sequence{ + Cursor: entry.Cursor, + Head: entry.Head, + Data: &value, + } + } + + return sequenceOfValues, nil } // Bind implements the types.ContractReader interface and allows new contract bindings to be added // to the service. -func (s *SolanaChainReaderService) Bind(_ context.Context, bindings []types.BoundContract) error { +func (s *ContractReaderService) Bind(_ context.Context, bindings []types.BoundContract) error { for _, binding := range bindings { if err := s.bindings.Bind(binding); err != nil { return err @@ -211,7 +269,7 @@ func (s *SolanaChainReaderService) Bind(_ context.Context, bindings []types.Boun // Unbind implements the types.ContractReader interface and allows existing contract bindings to be removed // from the service. -func (s *SolanaChainReaderService) Unbind(_ context.Context, bindings []types.BoundContract) error { +func (s *ContractReaderService) Unbind(_ context.Context, bindings []types.BoundContract) error { for _, binding := range bindings { s.lookup.unbindAddressForContract(binding.Name, binding.Address) } @@ -221,7 +279,7 @@ func (s *SolanaChainReaderService) Unbind(_ context.Context, bindings []types.Bo // CreateContractType implements the ContractTypeProvider interface and allows the chain reader // service to explicitly define the expected type for a grpc server to provide. -func (s *SolanaChainReaderService) CreateContractType(readIdentifier string, forEncoding bool) (any, error) { +func (s *ContractReaderService) CreateContractType(readIdentifier string, forEncoding bool) (any, error) { values, ok := s.lookup.getContractForReadIdentifiers(readIdentifier) if !ok { return nil, fmt.Errorf("%w: no contract for read identifier", types.ErrInvalidConfig) @@ -230,7 +288,7 @@ func (s *SolanaChainReaderService) CreateContractType(readIdentifier string, for return s.bindings.CreateType(values.contract, values.genericName, forEncoding) } -func (s *SolanaChainReaderService) addCodecDef(forEncoding bool, namespace, genericName string, readType codec.ChainConfigType, idl codec.IDL, idlDefinition interface{}, modCfg commoncodec.ModifiersConfig) error { +func (s *ContractReaderService) addCodecDef(forEncoding bool, namespace, genericName string, idl codec.IDL, idlDefinition interface{}, modCfg commoncodec.ModifiersConfig) error { mod, err := modCfg.ToModifier(codec.DecoderHooks...) if err != nil { return err @@ -242,24 +300,25 @@ func (s *SolanaChainReaderService) addCodecDef(forEncoding bool, namespace, gene } if forEncoding { - s.parsed.EncoderDefs[codec.WrapItemType(forEncoding, namespace, genericName, readType)] = cEntry + s.parsed.EncoderDefs[codec.WrapItemType(true, namespace, genericName)] = cEntry } else { - s.parsed.DecoderDefs[codec.WrapItemType(forEncoding, namespace, genericName, readType)] = cEntry + s.parsed.DecoderDefs[codec.WrapItemType(false, namespace, genericName)] = cEntry } return nil } -func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContractReader) error { +func (s *ContractReaderService) init(namespaces map[string]config.ChainContractReader) error { for namespace, nameSpaceDef := range namespaces { for genericName, read := range nameSpaceDef.Reads { injectAddressModifier(read.InputModifications, read.OutputModifications) - idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeAccountDef, read.ChainSpecificName, nameSpaceDef.IDL) - if err != nil { - return err - } switch read.ReadType { case config.Account: + idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeAccountDef, read.ChainSpecificName, nameSpaceDef.IDL) + if err != nil { + return err + } + accountIDLDef, isOk := idlDef.(codec.IdlTypeDef) if !isOk { return fmt.Errorf("unexpected type %T from IDL definition for account read: %q, with chainSpecificName: %q, of type: %q", accountIDLDef, genericName, read.ChainSpecificName, read.ReadType) @@ -268,12 +327,25 @@ func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContra return err } case config.Event: + idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeEventDef, read.ChainSpecificName, nameSpaceDef.IDL) + if err != nil { + return err + } + eventIDlDef, isOk := idlDef.(codec.IdlEvent) if !isOk { - return fmt.Errorf("unexpected type %T from IDL definition for log read: %q, with chainSpecificName: %q, of type: %q", eventIDlDef, genericName, read.ChainSpecificName, read.ReadType) + return fmt.Errorf("unexpected type %T from IDL definition for event read: %q, with chainSpecificName: %q, of type: %q", eventIDlDef, genericName, read.ChainSpecificName, read.ReadType) + } + + if err = s.addEventRead( + namespace, genericName, + nameSpaceDef.ContractAddress, + nameSpaceDef.IDL, eventIDlDef, + read, + s.events, + ); err != nil { + return err } - // TODO s.addLogRead() - return fmt.Errorf("implement me") default: return fmt.Errorf("unexpected read type %q for: %q in namespace: %q", read.ReadType, genericName, namespace) } @@ -283,15 +355,18 @@ func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContra return nil } -func (s *SolanaChainReaderService) addAccountRead(namespace string, genericName string, idl codec.IDL, idlType codec.IdlTypeDef, readDefinition config.ReadDefinition) error { - if err := s.addCodecDef(false, namespace, genericName, codec.ChainConfigTypeAccountDef, idl, idlType, readDefinition.OutputModifications); err != nil { +func (s *ContractReaderService) addAccountRead(namespace string, genericName string, idl codec.IDL, idlType codec.IdlTypeDef, readDefinition config.ReadDefinition) error { + if err := s.addCodecDef(false, namespace, genericName, idl, idlType, readDefinition.OutputModifications); err != nil { return err } s.lookup.addReadNameForContract(namespace, genericName) - var reader readBinding - var inputAccountIDLDef interface{} + var ( + reader readBinding + inputAccountIDLDef interface{} + ) + // Create PDA read binding if PDA prefix or seeds configs are populated if len(readDefinition.PDADefiniton.Prefix) > 0 || len(readDefinition.PDADefiniton.Seeds) > 0 { inputAccountIDLDef = readDefinition.PDADefiniton @@ -300,14 +375,42 @@ func (s *SolanaChainReaderService) addAccountRead(namespace string, genericName inputAccountIDLDef = codec.NilIdlTypeDefTy reader = newAccountReadBinding(namespace, genericName, "", false) } - if err := s.addCodecDef(true, namespace, genericName, codec.ChainConfigTypeAccountDef, idl, inputAccountIDLDef, readDefinition.InputModifications); err != nil { + if err := s.addCodecDef(true, namespace, genericName, idl, inputAccountIDLDef, readDefinition.InputModifications); err != nil { return err } + s.bindings.AddReadBinding(namespace, genericName, reader) return nil } +func (s *ContractReaderService) addEventRead( + namespace, genericName string, + contractAddress solana.PublicKey, + _ codec.IDL, + _ codec.IdlEvent, + readDefinition config.ReadDefinition, + events EventsReader, +) error { + subKeys := [][]string{} + for _, onChain := range readDefinition.IndexedFields { + subKeys = append(subKeys, strings.Split(onChain, ".")) + } + + filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys) + + s.filters = append(s.filters, filter) + s.bindings.AddReadBinding(namespace, genericName, newEventReadBinding( + namespace, + genericName, + readDefinition.IndexedFields, + events, + filter.EventSig, + )) + + return nil +} + // injectAddressModifier injects AddressModifier into OutputModifications. // This is necessary because AddressModifier cannot be serialized and must be applied at runtime. func injectAddressModifier(inputModifications, outputModifications commoncodec.ModifiersConfig) { @@ -344,3 +447,18 @@ func (r *accountDataReader) ReadAll(ctx context.Context, pk solana.PublicKey, op return bts, nil } + +func toLPFilter( + f *config.PollingFilter, + address solana.PublicKey, + subKeyPaths [][]string, +) logpoller.Filter { + return logpoller.Filter{ + Address: logpoller.PublicKey(address), + EventName: f.EventName, + EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]), + SubKeyPaths: logpoller.SubKeyPaths(subKeyPaths), + Retention: f.Retention, + MaxLogsKept: f.MaxLogsKept, + } +} diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 019ef3e09..77b72a200 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -60,11 +60,11 @@ func TestSolanaChainReaderService_ReaderInterface(t *testing.T) { RunContractReaderInterfaceTests(t, lsIt, true, false) } -func TestSolanaChainReaderService_ServiceCtx(t *testing.T) { +func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { t.Parallel() ctx := tests.Context(t) - svc, err := chainreader.NewChainReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}) + svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -100,7 +100,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -137,7 +137,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { client := new(mockedRPCClient) expectedErr := fmt.Errorf("expected error") - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -171,7 +171,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -192,7 +192,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -213,7 +213,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -394,7 +394,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) require.NoError(t, svc.Start(ctx)) @@ -444,7 +444,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) require.NoError(t, svc.Start(ctx)) @@ -598,9 +598,10 @@ func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err er type chainReaderInterfaceTester struct { TestSelectionSupport - conf config.ContractReader - address []string - reader *wrappedTestChainReader + conf config.ContractReader + address []string + reader *wrappedTestChainReader + eventSource chainreader.EventsReader } func (r *chainReaderInterfaceTester) GetAccountBytes(i int) []byte { @@ -681,7 +682,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) { func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader { client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, r.conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource) if err != nil { t.Logf("chain reader service was not able to start: %s", err.Error()) t.FailNow() @@ -707,7 +708,7 @@ type wrappedTestChainReader struct { types.UnimplementedContractReader test *testing.T - service *chainreader.SolanaChainReaderService + service *chainreader.ContractReaderService client *mockedRPCClient tester ChainComponentsInterfaceTester[*testing.T] testStructQueue []*TestStruct diff --git a/pkg/solana/chainreader/event_read_binding.go b/pkg/solana/chainreader/event_read_binding.go new file mode 100644 index 000000000..2d06ab85f --- /dev/null +++ b/pkg/solana/chainreader/event_read_binding.go @@ -0,0 +1,245 @@ +package chainreader + +import ( + "context" + "fmt" + "reflect" + "strings" + + "github.com/gagliardetto/solana-go" + + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" +) + +type eventReadBinding struct { + namespace, genericName string + codec types.RemoteCodec + modifier commoncodec.Modifier + key solana.PublicKey + remapper remapHelper + indexedSubkeys map[string]string + events EventsReader + eventSig [logpoller.EventSignatureLength]byte +} + +func newEventReadBinding( + namespace, genericName string, + indexedSubkeys map[string]string, + events EventsReader, + eventSig [logpoller.EventSignatureLength]byte, +) *eventReadBinding { + binding := &eventReadBinding{ + namespace: namespace, + genericName: genericName, + indexedSubkeys: indexedSubkeys, + events: events, + eventSig: eventSig, + } + + binding.remapper = remapHelper{binding.remapPrimitive} + + return binding +} + +func (b *eventReadBinding) SetAddress(key solana.PublicKey) { + b.key = key +} + +func (b *eventReadBinding) GetAddress(_ context.Context, _ any) (solana.PublicKey, error) { + return b.key, nil +} + +func (b *eventReadBinding) SetCodec(codec types.RemoteCodec) { + b.codec = codec +} + +func (b *eventReadBinding) SetModifier(modifier commoncodec.Modifier) { + b.modifier = modifier +} + +func (b *eventReadBinding) CreateType(forEncoding bool) (any, error) { + itemType := codec.WrapItemType(forEncoding, b.namespace, b.genericName) + + return b.codec.CreateType(itemType, forEncoding) +} + +func (b *eventReadBinding) Decode(ctx context.Context, bts []byte, outVal any) error { + itemType := codec.WrapItemType(false, b.namespace, b.genericName) + + return b.codec.Decode(ctx, bts, outVal, itemType) +} + +func (b *eventReadBinding) QueryKey( + ctx context.Context, + filter query.KeyFilter, + limitAndSort query.LimitAndSort, + sequenceDataType any, +) ([]types.Sequence, error) { + var ( + pubKey solana.PublicKey + err error + ) + + if pubKey, err = b.GetAddress(ctx, nil); err != nil { + return nil, err + } + + if filter, err = b.remapper.remap(filter); err != nil { + return nil, err + } + + // filter should always use the address and event sig + filter.Expressions = append([]query.Expression{ + logpoller.NewAddressFilter(pubKey), + logpoller.NewEventSigFilter(b.eventSig[:]), // TODO: genericName is wrong; need the event sig hash + }, filter.Expressions...) + + itemType := strings.Join([]string{b.namespace, b.genericName}, ".") + + logs, err := b.events.FilteredLogs(ctx, filter.Expressions, limitAndSort, itemType) + if err != nil { + return nil, err + } + + sequences, err := b.decodeLogsIntoSequences(ctx, logs, sequenceDataType) + if err != nil { + return nil, err + } + + return sequences, nil +} + +func (b *eventReadBinding) remapPrimitive(expression query.Expression) (query.Expression, error) { + var ( + comp query.Expression + err error + ) + + switch primitive := expression.Primitive.(type) { + case *primitives.Comparator: + if comp, err = b.encodeComparator(primitive); err != nil { + return query.Expression{}, fmt.Errorf("failed to encode comparator %q: %w", primitive.Name, err) + } + case *primitives.Confidence: + // TODO: maybe use an ignore filter? + // confidence is ignored in solana + } + + return comp, nil +} + +func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) (query.Expression, error) { + onChainName, ok := b.indexedSubkeys[comparator.Name] + if !ok { + return query.Expression{}, fmt.Errorf("%w: unknown indexed subkey mapping %s", types.ErrInvalidConfig, comparator.Name) + } + + itemType := strings.Join([]string{b.namespace, b.genericName, comparator.Name}, ".") + + for idx, comp := range comparator.ValueComparators { + // need to do a transform and then extract the value for the subkey + newValue, err := b.modifier.TransformToOnChain(comp.Value, itemType) + if err != nil { + return query.Expression{}, err + } + + comparator.ValueComparators[idx].Value = newValue + } + + return logpoller.NewEventSubkeyFilter(strings.Split(onChainName, "."), comparator.ValueComparators), nil +} + +func (b *eventReadBinding) decodeLogsIntoSequences( + ctx context.Context, + logs []logpoller.Log, + into any, +) ([]types.Sequence, error) { + sequences := make([]types.Sequence, len(logs)) + + for idx := range logs { + sequences[idx] = types.Sequence{ + Cursor: logpoller.FormatContractReaderCursor(logs[idx]), + Head: types.Head{ + Height: fmt.Sprint(logs[idx].BlockNumber), + Hash: solana.PublicKey(logs[idx].BlockHash).Bytes(), + Timestamp: uint64(logs[idx].BlockTimestamp.Unix()), //nolint:gosec + }, + } + + var typeVal reflect.Value + + typeInto := reflect.TypeOf(into) + if typeInto.Kind() == reflect.Pointer { + typeVal = reflect.New(typeInto.Elem()) + } else { + typeVal = reflect.Indirect(reflect.New(typeInto)) + } + + // create a new value of the same type as 'into' for the data to be extracted to + sequences[idx].Data = typeVal.Interface() + + if err := b.decodeLog(ctx, &logs[idx], sequences[idx].Data); err != nil { + return nil, err + } + } + + return sequences, nil +} + +func (b *eventReadBinding) decodeLog(ctx context.Context, log *logpoller.Log, into any) error { + itemType := codec.WrapItemType(false, b.namespace, b.genericName) + + // decode non indexed topics and apply output modifiers + if err := b.codec.Decode(ctx, log.Data, into, itemType); err != nil { + return fmt.Errorf("%w: failed to decode log data: %s", types.ErrInvalidType, err.Error()) + } + + return nil +} + +type remapHelper struct { + primitive func(query.Expression) (query.Expression, error) +} + +func (r remapHelper) remap(filter query.KeyFilter) (query.KeyFilter, error) { + var remapped query.KeyFilter + + for _, expression := range filter.Expressions { + remappedExpression, err := r.remapExpression(filter.Key, expression) + if err != nil { + return query.KeyFilter{}, err + } + + remapped.Expressions = append(remapped.Expressions, remappedExpression) + } + + return remapped, nil +} + +func (r remapHelper) remapExpression(key string, expression query.Expression) (query.Expression, error) { + if !expression.IsPrimitive() { + remappedBoolExpressions := make([]query.Expression, len(expression.BoolExpression.Expressions)) + for i := range expression.BoolExpression.Expressions { + remapped, err := r.remapExpression(key, expression.BoolExpression.Expressions[i]) + if err != nil { + return query.Expression{}, err + } + + remappedBoolExpressions[i] = remapped + } + + if expression.BoolExpression.BoolOperator == query.AND { + return query.And(remappedBoolExpressions...), nil + } + + return query.Or(remappedBoolExpressions...), nil + } + + return r.primitive(expression) +} diff --git a/pkg/solana/chainwriter/chain_writer.go b/pkg/solana/chainwriter/chain_writer.go index 1bf8a3a8b..a914d6aa3 100644 --- a/pkg/solana/chainwriter/chain_writer.go +++ b/pkg/solana/chainwriter/chain_writer.go @@ -106,7 +106,7 @@ func (s *SolanaChainWriterService) parsePrograms(config ChainWriterConfig) error return fmt.Errorf("failed to create codec entry for method %s.%s, error: %w", program, method, err) } - s.parsed.EncoderDefs[codec.WrapItemType(true, program, method, "")] = input + s.parsed.EncoderDefs[codec.WrapItemType(true, program, method)] = input } } @@ -301,7 +301,7 @@ func (s *SolanaChainWriterService) SubmitTransaction(ctx context.Context, contra return errorWithDebugID(fmt.Errorf("error parsing program ID: %w", err), debugID) } - encodedPayload, err := s.encoder.Encode(ctx, args, codec.WrapItemType(true, contractName, method, "")) + encodedPayload, err := s.encoder.Encode(ctx, args, codec.WrapItemType(true, contractName, method)) if err != nil { return errorWithDebugID(fmt.Errorf("error encoding transaction payload: %w", err), debugID) diff --git a/pkg/solana/codec/decoder.go b/pkg/solana/codec/decoder.go index c91805f71..df0ce2584 100644 --- a/pkg/solana/codec/decoder.go +++ b/pkg/solana/codec/decoder.go @@ -4,25 +4,20 @@ import ( "context" "fmt" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/codec/encodings" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" ) -// decoder should be initialized with newDecoder type decoder struct { - definitions map[string]Entry - lenientCodecFromTypeCodec encodings.LenientCodecFromTypeCodec + definitions map[string]Entry + lenientFromTypeCodec map[string]encodings.LenientCodecFromTypeCodec } func newDecoder(definitions map[string]Entry) commontypes.Decoder { - lenientCodecFromTypeCodec := make(encodings.LenientCodecFromTypeCodec) - for k, v := range definitions { - lenientCodecFromTypeCodec[k] = v - } - return &decoder{ - definitions: definitions, - lenientCodecFromTypeCodec: lenientCodecFromTypeCodec, + definitions: definitions, + lenientFromTypeCodec: makeCodecFromDefs(definitions), } } @@ -33,11 +28,15 @@ func (d *decoder) Decode(ctx context.Context, raw []byte, into any, itemType str } }() - if d.lenientCodecFromTypeCodec == nil { - return fmt.Errorf("decoder is not properly initialised, underlying lenientCodecFromTypeCodec is nil") + _, itemType = commoncodec.ItemTyper(itemType).Next() + head, tail := commoncodec.ItemTyper(itemType).Next() + + codec, ok := d.lenientFromTypeCodec[head] + if !ok { + return fmt.Errorf("%w: codec not available for itemType: %s", commontypes.ErrInvalidType, itemType) } - return d.lenientCodecFromTypeCodec.Decode(ctx, raw, into, itemType) + return codec.Decode(ctx, raw, into, tail) } func (d *decoder) GetMaxDecodingSize(_ context.Context, n int, itemType string) (int, error) { @@ -51,3 +50,21 @@ func (d *decoder) GetMaxDecodingSize(_ context.Context, n int, itemType string) } return codecEntry.GetCodecType().Size(n) } + +func makeCodecFromDefs(definitions map[string]Entry) map[string]encodings.LenientCodecFromTypeCodec { + // itemType is constructed as a dot-separated string of values that separates contract + // names from itemType names within the contract + lenientFromTypeCodec := make(map[string]encodings.LenientCodecFromTypeCodec) + for key, value := range definitions { + _, key = commoncodec.ItemTyper(key).Next() + head, tail := commoncodec.ItemTyper(key).Next() + + if _, ok := lenientFromTypeCodec[head]; !ok { + lenientFromTypeCodec[head] = make(encodings.LenientCodecFromTypeCodec) + } + + lenientFromTypeCodec[head][tail] = value + } + + return lenientFromTypeCodec +} diff --git a/pkg/solana/codec/encoder.go b/pkg/solana/codec/encoder.go index 9bc519bd0..b7b581782 100644 --- a/pkg/solana/codec/encoder.go +++ b/pkg/solana/codec/encoder.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/codec/encodings" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -11,18 +12,13 @@ import ( // encoder should be initialized with newEncoder type encoder struct { definitions map[string]Entry - lenientCodecFromTypeCodec encodings.LenientCodecFromTypeCodec + lenientCodecFromTypeCodec map[string]encodings.LenientCodecFromTypeCodec } func newEncoder(definitions map[string]Entry) commontypes.Encoder { - lenientCodecFromTypeCodec := make(encodings.LenientCodecFromTypeCodec) - for k, v := range definitions { - lenientCodecFromTypeCodec[k] = v - } - return &encoder{ - lenientCodecFromTypeCodec: lenientCodecFromTypeCodec, definitions: definitions, + lenientCodecFromTypeCodec: makeCodecFromDefs(definitions), } } @@ -33,11 +29,15 @@ func (e *encoder) Encode(ctx context.Context, item any, itemType string) (res [] } }() - if e.lenientCodecFromTypeCodec == nil { - return nil, fmt.Errorf("encoder is not properly initialised, underlying lenientCodecFromTypeCodec is nil") + _, itemType = commoncodec.ItemTyper(itemType).Next() + head, tail := commoncodec.ItemTyper(itemType).Next() + + codec, ok := e.lenientCodecFromTypeCodec[head] + if !ok { + return nil, fmt.Errorf("%w: codec not available for itemType: %s", commontypes.ErrInvalidType, itemType) } - return e.lenientCodecFromTypeCodec.Encode(ctx, item, itemType) + return codec.Encode(ctx, item, tail) } func (e *encoder) GetMaxEncodingSize(_ context.Context, n int, itemType string) (int, error) { diff --git a/pkg/solana/codec/parsed_types.go b/pkg/solana/codec/parsed_types.go index d0d1d4693..a512c96f8 100644 --- a/pkg/solana/codec/parsed_types.go +++ b/pkg/solana/codec/parsed_types.go @@ -11,38 +11,79 @@ import ( type ParsedTypes struct { EncoderDefs map[string]Entry DecoderDefs map[string]Entry + Modifiers commoncodec.Modifier } func (parsed *ParsedTypes) ToCodec() (commontypes.RemoteCodec, error) { - modByTypeName := map[string]commoncodec.Modifier{} + directionalMods := map[string]map[string]map[string]commoncodec.Modifier{} + + modByTypeName := map[string]map[string]commoncodec.Modifier{} if err := AddEntries(parsed.EncoderDefs, modByTypeName); err != nil { return nil, err } + + directionalMods["input"] = modByTypeName + + modByTypeName = map[string]map[string]commoncodec.Modifier{} if err := AddEntries(parsed.DecoderDefs, modByTypeName); err != nil { return nil, err } - mod, err := commoncodec.NewByItemTypeModifier(modByTypeName) + directionalMods["output"] = modByTypeName + + collapsed := map[string]commoncodec.Modifier{} + for direction, dMods := range directionalMods { + dCollapsed := map[string]commoncodec.Modifier{} + + for namespace, mods := range dMods { + mod, err := commoncodec.NewNestableByItemTypeModifier(mods) + if err != nil { + return nil, err + } + + dCollapsed[namespace] = mod + } + + mod, err := commoncodec.NewNestableByItemTypeModifier(dCollapsed) + if err != nil { + return nil, err + } + + collapsed[direction] = mod + } + + mod, err := commoncodec.NewNestableByItemTypeModifier(collapsed) if err != nil { return nil, err } + + parsed.Modifiers = mod underlying := &solanaCodec{ Encoder: newEncoder(parsed.EncoderDefs), Decoder: newDecoder(parsed.DecoderDefs), ParsedTypes: parsed, } + return commoncodec.NewModifierCodec(underlying, mod, DecoderHooks...) } // AddEntries extracts the mods from entry and adds them to modByTypeName use with codec.NewByItemTypeModifier // Since each input/output can have its own modifications, we need to keep track of them by type name -func AddEntries(defs map[string]Entry, modByTypeName map[string]commoncodec.Modifier) error { - for k, def := range defs { - modByTypeName[k] = def.Modifier() - _, err := def.Modifier().RetypeToOffChain(reflect.PointerTo(def.GetType()), k) - if err != nil { - return fmt.Errorf("%w: cannot retype %v: %w", commontypes.ErrInvalidConfig, k, err) +func AddEntries(defs map[string]Entry, modByTypeName map[string]map[string]commoncodec.Modifier) error { + for itemType, def := range defs { + _, tail := commoncodec.ItemTyper(itemType).Next() + head, tail := commoncodec.ItemTyper(tail).Next() + + if _, ok := modByTypeName[head]; !ok { + modByTypeName[head] = make(map[string]commoncodec.Modifier) + } + + modByTypeName[head][tail] = def.Modifier() + + if _, err := def.Modifier().RetypeToOffChain(reflect.PointerTo(def.GetType()), ""); err != nil { + return fmt.Errorf("%w: cannot retype %v: %w", commontypes.ErrInvalidConfig, itemType, err) } } + return nil } diff --git a/pkg/solana/codec/solana.go b/pkg/solana/codec/solana.go index f6556b48d..d93dbc171 100644 --- a/pkg/solana/codec/solana.go +++ b/pkg/solana/codec/solana.go @@ -140,12 +140,12 @@ func FindDefinitionFromIDL(cfgType ChainConfigType, chainSpecificName string, id return nil, fmt.Errorf("unknown type: %q", cfgType) } -func WrapItemType(forEncoding bool, contractName, itemType string, readType ChainConfigType) string { +func WrapItemType(forEncoding bool, contractName, itemType string) string { if forEncoding { return fmt.Sprintf("input.%s.%s", contractName, itemType) } - return fmt.Sprintf("output.%s.%s.%s", readType, contractName, itemType) + return fmt.Sprintf("output.%s.%s", contractName, itemType) } // TODO Deprecate and remove this. diff --git a/pkg/solana/codec/solana_test.go b/pkg/solana/codec/solana_test.go index 4dd116691..8f3c3c393 100644 --- a/pkg/solana/codec/solana_test.go +++ b/pkg/solana/codec/solana_test.go @@ -12,6 +12,7 @@ import ( codeccommon "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/codec/encodings/binary" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/interfacetests" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" @@ -23,14 +24,13 @@ func TestNewIDLAccountCodec(t *testing.T) { t.Parallel() ctx := tests.Context(t) - _, _, entry := newTestIDLAndCodec(t, true) + _, _, entry := newTestIDLAndCodec(t, accountIDLType) expected := testutils.DefaultTestStruct bts, err := entry.Encode(ctx, expected, testutils.TestStructWithNestedStruct) // length of fields + discriminator require.Equal(t, 262, len(bts)) - require.NoError(t, err) var decoded testutils.StructWithNestedStruct @@ -39,12 +39,32 @@ func TestNewIDLAccountCodec(t *testing.T) { require.Equal(t, expected, decoded) } +func TestCodecProperties(t *testing.T) { + t.Parallel() + + tester := &codecInterfaceTester{} + ctx := tests.Context(t) + _, _, entry := newTestIDLAndCodec(t, eventIDLType) + + expected := interfacetests.CreateTestStruct(1, tester) + bts, err := entry.Encode(ctx, expected, interfacetests.TestItemType) + + // length of fields + discriminator + require.Equal(t, 262, len(bts)) + require.NoError(t, err) + + var decoded interfacetests.TestStruct + + require.NoError(t, entry.Decode(ctx, bts, &decoded, interfacetests.TestItemType)) + require.Equal(t, expected, decoded) +} + func TestNewIDLDefinedTypesCodecCodec(t *testing.T) { /// TODO BCI-3155 this should run the codec interface tests t.Parallel() ctx := tests.Context(t) - _, _, entry := newTestIDLAndCodec(t, false) + _, _, entry := newTestIDLAndCodec(t, definedTypesIDLType) expected := testutils.DefaultTestStruct bts, err := entry.Encode(ctx, expected, testutils.TestStructWithNestedStructType) @@ -64,7 +84,7 @@ func TestNewIDLCodec_WithModifiers(t *testing.T) { t.Parallel() ctx := tests.Context(t) - _, _, idlCodec := newTestIDLAndCodec(t, true) + _, _, idlCodec := newTestIDLAndCodec(t, accountIDLType) modConfig := codeccommon.ModifiersConfig{ &codeccommon.RenameModifierConfig{Fields: map[string]string{"Value": "V"}}, } @@ -143,20 +163,45 @@ func TestNewIDLCodec_CircularDependency(t *testing.T) { assert.ErrorIs(t, err, types.ErrInvalidConfig) } -func newTestIDLAndCodec(t *testing.T, account bool) (string, codec.IDL, types.RemoteCodec) { +type idlType string + +const ( + accountIDLType idlType = "account" + definedTypesIDLType idlType = "types" + instructionIDLType idlType = "instruction" + eventIDLType idlType = "event" +) + +func newTestIDLAndCodec(t *testing.T, idlTP idlType) (string, codec.IDL, types.RemoteCodec) { t.Helper() + var idlDef string + + //nolint:exhaustive + switch idlTP { + case accountIDLType, definedTypesIDLType: + idlDef = testutils.JSONIDLWithAllTypes + case eventIDLType: + defs := testutils.CodecDefs[testutils.TestEventItem] + idlDef = defs.IDL + } + var idl codec.IDL - if err := json.Unmarshal([]byte(testutils.JSONIDLWithAllTypes), &idl); err != nil { + if err := json.Unmarshal([]byte(idlDef), &idl); err != nil { t.Logf("failed to unmarshal test IDL: %s", err.Error()) t.FailNow() } - var entry types.RemoteCodec - var err error - if account { + var ( + entry types.RemoteCodec + err error + ) + + //nolint:exhaustive + switch idlTP { + case accountIDLType: entry, err = codec.NewIDLAccountCodec(idl, binary.LittleEndian()) - } else { + case definedTypesIDLType: entry, err = codec.NewIDLDefinedTypesCodec(idl, binary.LittleEndian()) } @@ -167,5 +212,5 @@ func newTestIDLAndCodec(t *testing.T, account bool) (string, codec.IDL, types.Re require.NotNil(t, entry) - return testutils.JSONIDLWithAllTypes, idl, entry + return idlDef, idl, entry } diff --git a/pkg/solana/config/chain_reader.go b/pkg/solana/config/chain_reader.go index ab09e013a..f2b067c4f 100644 --- a/pkg/solana/config/chain_reader.go +++ b/pkg/solana/config/chain_reader.go @@ -3,6 +3,9 @@ package config import ( "encoding/json" "fmt" + "time" + + "github.com/gagliardetto/solana-go" commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -15,7 +18,8 @@ type ContractReader struct { } type ChainContractReader struct { - codec.IDL `json:"anchorIDL"` + codec.IDL `json:"anchorIDL"` + ContractAddress solana.PublicKey `json:"contractAddress"` // Reads key is the off-chain name for this read. Reads map[string]ReadDefinition `json:"reads"` // TODO ContractPollingFilter same as EVM? @@ -27,6 +31,9 @@ type ReadDefinition struct { InputModifications commoncodec.ModifiersConfig `json:"inputModifications,omitempty"` OutputModifications commoncodec.ModifiersConfig `json:"outputModifications,omitempty"` PDADefiniton codec.PDATypeDef `json:"pdaDefinition,omitempty"` // Only used for PDA account reads + IndexedFields map[string]string `json:"indexedFields,omitempty"` + // This will create a log poller filter for this event. + *PollingFilter `json:"pollingFilter,omitempty"` } type ReadType int @@ -67,7 +74,7 @@ func (c *ChainContractReader) UnmarshalJSON(bytes []byte) error { return fmt.Errorf("anchorIDL field is neither a valid JSON string nor a valid IDL object: %w", err) } - if len(c.Accounts) == 0 && len(c.Events) == 0 { + if len(c.IDL.Accounts) == 0 && len(c.IDL.Events) == 0 { return fmt.Errorf("namespace idl must have at least one account or event: %w", commontypes.ErrInvalidConfig) } @@ -81,3 +88,9 @@ func (c *ChainContractReader) UnmarshalJSON(bytes []byte) error { return nil } + +type PollingFilter struct { + EventName string `json:"eventName"` + Retention time.Duration `json:"retention"` // maximum amount of time to retain logs + MaxLogsKept int64 `json:"maxLogsKept"` // maximum number of logs to retain ( 0 = unlimited ) +} diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index 78a74d17d..f0348cc7e 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -21,6 +21,7 @@ const ( txHashFieldName = "tx_hash" addressFieldName = "address" eventSigFieldName = "event_sig" + eventSubkeyFieldName = "event_subkey" defaultSort = "block_number ASC, log_index ASC" subKeyValuesFieldName = "subkey_values" subKeyValueArg = "subkey_value" @@ -206,11 +207,15 @@ func (v *pgDSLParser) VisitAddressFilter(p *addressFilter) { ) } -func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { +func (v *pgDSLParser) VisitEventSubkeyFilter(p *eventSubkeyFilter) { + // TODO: build a proper expression + // TODO: the value type will be the off-chain field type that a raw IDL codec would decode into + // this value will need to be wrapped in a special type that will encode the value properly for + // direct comparison. v.expression = fmt.Sprintf( "%s = :%s", - eventSigFieldName, - v.args.withIndexedField(eventSigFieldName, p.eventSig), + eventSubkeyFieldName, + v.args.withIndexedField(eventSubkeyFieldName, p.Subkey), ) } @@ -489,3 +494,35 @@ func (f *eventSigFilter) Accept(visitor primitives.Visitor) { v.VisitEventSigFilter(f) } } + +func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { + v.expression = fmt.Sprintf( + "%s = :%s", + eventSigFieldName, + v.args.withIndexedField(eventSigFieldName, p.eventSig), + ) +} + +type eventSubkeyFilter struct { + Subkey []string + ValueComparers []primitives.ValueComparator +} + +func NewEventSubkeyFilter(subkey []string, valueComparers []primitives.ValueComparator) query.Expression { + return query.Expression{Primitive: &eventSubkeyFilter{ + Subkey: subkey, + ValueComparers: valueComparers, + }} +} + +func (f *eventSubkeyFilter) Accept(visitor primitives.Visitor) { + switch v := visitor.(type) { + case *pgDSLParser: + v.VisitEventSubkeyFilter(f) + } +} + +// MakeContractReaderCursor is exported to ensure cursor structure remains consistent. +func FormatContractReaderCursor(log Log) string { + return fmt.Sprintf("%d-%d-%s", log.BlockNumber, log.LogIndex, log.TxHash) +} diff --git a/pkg/solana/logpoller/parser_test.go b/pkg/solana/logpoller/parser_test.go index 31588d8d8..45ecdc6f9 100644 --- a/pkg/solana/logpoller/parser_test.go +++ b/pkg/solana/logpoller/parser_test.go @@ -54,6 +54,10 @@ func TestDSLParser(t *testing.T) { expressions := []query.Expression{ NewAddressFilter(pk), NewEventSigFilter([]byte("test")), + NewEventSubkeyFilter([]string{"test"}, []primitives.ValueComparator{ + {Value: 42, Operator: primitives.Gte}, + {Value: "test_value", Operator: primitives.Eq}, + }), query.Confidence(primitives.Unconfirmed), } limiter := query.NewLimitAndSort(query.CursorLimit(fmt.Sprintf("10-5-%s", txHash), query.CursorFollowing, 20)) @@ -61,7 +65,7 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := logsQuery( " WHERE chain_id = :chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "AND (address = :address_0 AND event_sig = :event_sig_0 AND event_subkey = :event_subkey_0) " + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number " + "AND log_index > :cursor_log_index)) " + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC LIMIT 20") @@ -83,13 +87,17 @@ func TestDSLParser(t *testing.T) { expressions := []query.Expression{ NewAddressFilter(pk), NewEventSigFilter([]byte("test")), + NewEventSubkeyFilter([]string{"test"}, []primitives.ValueComparator{ + {Value: 42, Operator: primitives.Gte}, + {Value: "test_value", Operator: primitives.Eq}, + }), } limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := logsQuery( " WHERE chain_id = :chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "AND (address = :address_0 AND event_sig = :event_sig_0 AND event_subkey = :event_subkey_0) " + "ORDER BY " + defaultSort + " " + "LIMIT 20") @@ -278,9 +286,10 @@ func TestDSLParser(t *testing.T) { t.Run("nested query deep", func(t *testing.T) { t.Parallel() - sigFilter := NewEventSigFilter([]byte("test")) parser := &pgDSLParser{} + sigFilter := NewEventSigFilter([]byte("test")) + limiter := query.LimitAndSort{} expressions := []query.Expression{ {BoolExpression: query.BoolExpression{ Expressions: []query.Expression{ @@ -302,7 +311,6 @@ func TestDSLParser(t *testing.T) { BoolOperator: query.AND, }}, } - limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := logsQuery( diff --git a/pkg/solana/relay.go b/pkg/solana/relay.go index 78ff07163..057d71ac6 100644 --- a/pkg/solana/relay.go +++ b/pkg/solana/relay.go @@ -162,7 +162,10 @@ func (r *Relayer) NewContractReader(_ context.Context, chainReaderConfig []byte) return nil, fmt.Errorf("failed to init account reader err: %s", err) } - return chainreader.NewChainReaderService(r.lggr, &chainreader.RPCClientWrapper{AccountReader: accountReader}, cfg) + // TODO: create a logpoller + var logpoller chainreader.EventsReader + + return chainreader.NewContractReaderService(r.lggr, &chainreader.RPCClientWrapper{AccountReader: accountReader}, cfg, logpoller) } func (r *Relayer) NewMedianProvider(ctx context.Context, rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.MedianProvider, error) {