diff --git a/go.mod b/go.mod index 4fcaabc0a..f5a877706 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/lib/pq v1.10.9 github.com/pelletier/go-toml/v2 v2.2.0 github.com/prometheus/client_golang v1.17.0 - github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51 + github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 @@ -29,6 +29,7 @@ require ( require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.4 // indirect filippo.io/edwards25519 v1.1.0 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/apache/arrow-go/v18 v18.0.0 // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect diff --git a/go.sum b/go.sum index f604bc88e..40ec804f9 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5 github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -543,8 +545,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51 h1:YdjQiEu5uHWM1ApwdV+nLyJmu1+tt3IeiwPKNGoXwBI= -github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e h1:YsUsHCRFK0DmlQOVDz0rElogOPyAFHj8KdZrfgiu2nk= +github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e/go.mod h1:V3BHfvLnQNBUoZ4bGjD29ZPhyzPE++DkYkhvPb9tcRs= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA= github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 h1:NzZGjaqez21I3DU7objl3xExTH4fxYvzTqar8DC6360= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 0cd768e8c..decc2304a 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -14,7 +14,7 @@ require ( github.com/lib/pq v1.10.9 github.com/pelletier/go-toml/v2 v2.2.3 github.com/rs/zerolog v1.33.0 - github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51 + github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e github.com/smartcontractkit/chainlink-solana v1.1.1-0.20250116220429-efced748f123 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.19 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.9 @@ -54,7 +54,7 @@ require ( github.com/NethermindEth/juno v0.3.1 // indirect github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect - github.com/XSAM/otelsql v0.27.0 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/andybalholm/brotli v1.1.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index f72967d68..b333707da 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -125,8 +125,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= -github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -1221,8 +1221,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20250116220331-a7ce842532c7 h1:Qp7OjFbDz4dOZXi45X3uWlu28dEA1chQZzRYWCQS3lI= github.com/smartcontractkit/chainlink-ccip v0.0.0-20250116220331-a7ce842532c7/go.mod h1:JJZMCB75aVSAiPNW032F9WUKTlLztTd8bbQB5MEaZa4= -github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51 h1:YdjQiEu5uHWM1ApwdV+nLyJmu1+tt3IeiwPKNGoXwBI= -github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e h1:YsUsHCRFK0DmlQOVDz0rElogOPyAFHj8KdZrfgiu2nk= +github.com/smartcontractkit/chainlink-common v0.4.2-0.20250121155026-c4128edb3e4e/go.mod h1:V3BHfvLnQNBUoZ4bGjD29ZPhyzPE++DkYkhvPb9tcRs= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= diff --git a/pkg/solana/chainreader/account_read_binding.go b/pkg/solana/chainreader/account_read_binding.go index b8854b38c..a0eaec1c8 100644 --- a/pkg/solana/chainreader/account_read_binding.go +++ b/pkg/solana/chainreader/account_read_binding.go @@ -2,11 +2,14 @@ package chainreader import ( "context" + "errors" "fmt" "github.com/gagliardetto/solana-go" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) @@ -35,6 +38,8 @@ func (b *accountReadBinding) SetCodec(codec types.RemoteCodec) { b.codec = codec } +func (b *accountReadBinding) SetModifier(commoncodec.Modifier) {} + func (b *accountReadBinding) SetAddress(key solana.PublicKey) { b.key = key } @@ -99,3 +104,7 @@ func (b *accountReadBinding) buildSeedsSlice(ctx context.Context, params any) ([ } return seedByteArray, nil } + +func (b *accountReadBinding) QueryKey(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ any) ([]types.Sequence, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/solana/chainreader/bindings.go b/pkg/solana/chainreader/bindings.go index 1b927df85..eba7eca15 100644 --- a/pkg/solana/chainreader/bindings.go +++ b/pkg/solana/chainreader/bindings.go @@ -6,15 +6,19 @@ import ( "github.com/gagliardetto/solana-go" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) type readBinding interface { SetAddress(solana.PublicKey) GetAddress(context.Context, any) (solana.PublicKey, error) SetCodec(types.RemoteCodec) + SetModifier(commoncodec.Modifier) CreateType(bool) (any, error) Decode(context.Context, []byte, any) error + QueryKey(context.Context, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) } // key is namespace @@ -72,10 +76,18 @@ func (b namespaceBindings) Bind(binding types.BoundContract) error { return nil } -func (b namespaceBindings) SetCodec(codec types.RemoteCodec) { +func (b namespaceBindings) SetCodecs(codec types.RemoteCodec) { for _, nbs := range b { for _, rb := range nbs { rb.SetCodec(codec) } } } + +func (b namespaceBindings) SetModifiers(modifier commoncodec.Modifier) { + for _, nbs := range b { + for _, rb := range nbs { + rb.SetModifier(modifier) + } + } +} diff --git a/pkg/solana/chainreader/bindings_test.go b/pkg/solana/chainreader/bindings_test.go index 3dec21194..51c583c52 100644 --- a/pkg/solana/chainreader/bindings_test.go +++ b/pkg/solana/chainreader/bindings_test.go @@ -9,7 +9,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) func TestBindings_CreateType(t *testing.T) { @@ -54,6 +56,10 @@ func (_m *mockBinding) GetAddress(_ context.Context, _ any) (solana.PublicKey, e return solana.PublicKey{}, nil } +func (_m *mockBinding) SetModifier(a commoncodec.Modifier) { + _m.Called(a) +} + func (_m *mockBinding) CreateType(b bool) (any, error) { ret := _m.Called(b) @@ -63,3 +69,14 @@ func (_m *mockBinding) CreateType(b bool) (any, error) { func (_m *mockBinding) Decode(_ context.Context, _ []byte, _ any) error { return nil } + +func (_m *mockBinding) QueryKey( + a context.Context, + b query.KeyFilter, + c query.LimitAndSort, + d any, +) ([]types.Sequence, error) { + ret := _m.Called(a, b, c, d) + + return ret.Get(0).([]types.Sequence), ret.Error(1) +} diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 41d70161b..e0d52560e 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -15,25 +15,33 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" ) -const ServiceName = "SolanaChainReader" +type EventSourcer interface { + FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error) +} + +const ServiceName = "SolanaContractReader" -type SolanaChainReaderService struct { +type ContractReaderService struct { types.UnimplementedContractReader - // provided values + // provided dependencies lggr logger.Logger client MultipleAccountGetter + events EventSourcer // internal values - bindings namespaceBindings - lookup *lookup - parsed *codec.ParsedTypes - codec types.RemoteCodec + bindings namespaceBindings + lookup *lookup + parsed *codec.ParsedTypes + codec types.RemoteCodec + codecWithoutModifiers types.RemoteCodec // service state management wg sync.WaitGroup @@ -41,13 +49,18 @@ type SolanaChainReaderService struct { } var ( - _ services.Service = &SolanaChainReaderService{} - _ types.ContractReader = &SolanaChainReaderService{} + _ services.Service = &ContractReaderService{} + _ types.ContractReader = &ContractReaderService{} ) -// NewChainReaderService is a constructor for a new ChainReaderService for Solana. Returns a nil service on error. -func NewChainReaderService(lggr logger.Logger, dataReader MultipleAccountGetter, cfg config.ContractReader) (*SolanaChainReaderService, error) { - svc := &SolanaChainReaderService{ +// NewContractReaderService is a constructor for a new ContractReaderService for Solana. Returns a nil service on error. +func NewContractReaderService( + lggr logger.Logger, + dataReader MultipleAccountGetter, + cfg config.ContractReader, + events EventSourcer, +) (*ContractReaderService, error) { + svc := &ContractReaderService{ lggr: logger.Named(lggr, ServiceName), client: dataReader, bindings: namespaceBindings{}, @@ -66,19 +79,21 @@ func NewChainReaderService(lggr logger.Logger, dataReader MultipleAccountGetter, svc.codec = svcCodec - svc.bindings.SetCodec(svcCodec) + svc.bindings.SetCodecs(svcCodec) + svc.bindings.SetModifiers(svc.parsed.Modifiers) + return svc, nil } // Name implements the services.ServiceCtx interface and returns the logger service name. -func (s *SolanaChainReaderService) Name() string { +func (s *ContractReaderService) Name() string { return s.lggr.Name() } // Start implements the services.ServiceCtx interface and starts necessary background services. // An error is returned if starting any internal services fails. Subsequent calls to Start return // and error. -func (s *SolanaChainReaderService) Start(_ context.Context) error { +func (s *ContractReaderService) Start(_ context.Context) error { return s.StartOnce(ServiceName, func() error { return nil }) @@ -86,7 +101,7 @@ func (s *SolanaChainReaderService) Start(_ context.Context) error { // Close implements the services.ServiceCtx interface and stops all background services and cleans // up used resources. Subsequent calls to Close return an error. -func (s *SolanaChainReaderService) Close() error { +func (s *ContractReaderService) Close() error { return s.StopOnce(ServiceName, func() error { s.wg.Wait() @@ -96,19 +111,19 @@ func (s *SolanaChainReaderService) Close() error { // Ready implements the services.ServiceCtx interface and returns an error if starting the service // encountered any errors or if the service is not ready to serve requests. -func (s *SolanaChainReaderService) Ready() error { +func (s *ContractReaderService) Ready() error { return s.StateMachine.Ready() } // HealthReport implements the services.ServiceCtx interface and returns errors for any internal // function or service that may have failed. -func (s *SolanaChainReaderService) HealthReport() map[string]error { +func (s *ContractReaderService) HealthReport() map[string]error { return map[string]error{s.Name(): s.Healthy()} } // GetLatestValue implements the types.ContractReader interface and requests and parses on-chain // data named by the provided contract, method, and params. -func (s *SolanaChainReaderService) GetLatestValue(ctx context.Context, readIdentifier string, _ primitives.ConfidenceLevel, params any, returnVal any) error { +func (s *ContractReaderService) GetLatestValue(ctx context.Context, readIdentifier string, _ primitives.ConfidenceLevel, params any, returnVal any) error { if err := s.Ready(); err != nil { return err } @@ -147,7 +162,7 @@ func (s *SolanaChainReaderService) GetLatestValue(ctx context.Context, readIdent } // BatchGetLatestValues implements the types.ContractReader interface. -func (s *SolanaChainReaderService) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) { +func (s *ContractReaderService) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) { idxLookup := make(map[types.BoundContract][]int) batch := []call{} @@ -191,13 +206,46 @@ func (s *SolanaChainReaderService) BatchGetLatestValues(ctx context.Context, req } // QueryKey implements the types.ContractReader interface. -func (s *SolanaChainReaderService) QueryKey(_ context.Context, _ types.BoundContract, _ query.KeyFilter, _ query.LimitAndSort, _ any) ([]types.Sequence, error) { - return nil, errors.New("unimplemented") +func (s *ContractReaderService) QueryKey(ctx context.Context, contract types.BoundContract, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]types.Sequence, error) { + binding, err := s.bindings.GetReadBinding(contract.Name, filter.Key) + if err != nil { + return nil, err + } + + _, isValuePtr := sequenceDataType.(*values.Value) + if !isValuePtr { + return binding.QueryKey(ctx, filter, limitAndSort, sequenceDataType) + } + + dataTypeFromReadIdentifier, err := s.CreateContractType(contract.ReadIdentifier(filter.Key), false) + if err != nil { + return nil, err + } + + sequence, err := binding.QueryKey(ctx, filter, limitAndSort, dataTypeFromReadIdentifier) + if err != nil { + return nil, err + } + + sequenceOfValues := make([]types.Sequence, len(sequence)) + for idx, entry := range sequence { + value, err := values.Wrap(entry.Data) + if err != nil { + return nil, err + } + sequenceOfValues[idx] = types.Sequence{ + Cursor: entry.Cursor, + Head: entry.Head, + Data: &value, + } + } + + return sequenceOfValues, nil } // Bind implements the types.ContractReader interface and allows new contract bindings to be added // to the service. -func (s *SolanaChainReaderService) Bind(_ context.Context, bindings []types.BoundContract) error { +func (s *ContractReaderService) Bind(_ context.Context, bindings []types.BoundContract) error { for _, binding := range bindings { if err := s.bindings.Bind(binding); err != nil { return err @@ -211,7 +259,7 @@ func (s *SolanaChainReaderService) Bind(_ context.Context, bindings []types.Boun // Unbind implements the types.ContractReader interface and allows existing contract bindings to be removed // from the service. -func (s *SolanaChainReaderService) Unbind(_ context.Context, bindings []types.BoundContract) error { +func (s *ContractReaderService) Unbind(_ context.Context, bindings []types.BoundContract) error { for _, binding := range bindings { s.lookup.unbindAddressForContract(binding.Name, binding.Address) } @@ -221,7 +269,7 @@ func (s *SolanaChainReaderService) Unbind(_ context.Context, bindings []types.Bo // CreateContractType implements the ContractTypeProvider interface and allows the chain reader // service to explicitly define the expected type for a grpc server to provide. -func (s *SolanaChainReaderService) CreateContractType(readIdentifier string, forEncoding bool) (any, error) { +func (s *ContractReaderService) CreateContractType(readIdentifier string, forEncoding bool) (any, error) { values, ok := s.lookup.getContractForReadIdentifiers(readIdentifier) if !ok { return nil, fmt.Errorf("%w: no contract for read identifier", types.ErrInvalidConfig) @@ -230,7 +278,7 @@ func (s *SolanaChainReaderService) CreateContractType(readIdentifier string, for return s.bindings.CreateType(values.contract, values.genericName, forEncoding) } -func (s *SolanaChainReaderService) addCodecDef(forEncoding bool, namespace, genericName string, readType codec.ChainConfigType, idl codec.IDL, idlDefinition interface{}, modCfg commoncodec.ModifiersConfig) error { +func (s *ContractReaderService) addCodecDef(forEncoding bool, namespace, genericName string, readType codec.ChainConfigType, idl codec.IDL, idlDefinition interface{}, modCfg commoncodec.ModifiersConfig) error { mod, err := modCfg.ToModifier(codec.DecoderHooks...) if err != nil { return err @@ -249,17 +297,18 @@ func (s *SolanaChainReaderService) addCodecDef(forEncoding bool, namespace, gene return nil } -func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContractReader) error { +func (s *ContractReaderService) init(namespaces map[string]config.ChainContractReader) error { for namespace, nameSpaceDef := range namespaces { for genericName, read := range nameSpaceDef.Reads { injectAddressModifier(read.InputModifications, read.OutputModifications) - idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeAccountDef, read.ChainSpecificName, nameSpaceDef.IDL) - if err != nil { - return err - } switch read.ReadType { case config.Account: + idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeAccountDef, read.ChainSpecificName, nameSpaceDef.IDL) + if err != nil { + return err + } + accountIDLDef, isOk := idlDef.(codec.IdlTypeDef) if !isOk { return fmt.Errorf("unexpected type %T from IDL definition for account read: %q, with chainSpecificName: %q, of type: %q", accountIDLDef, genericName, read.ChainSpecificName, read.ReadType) @@ -268,12 +317,19 @@ func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContra return err } case config.Event: + idlDef, err := codec.FindDefinitionFromIDL(codec.ChainConfigTypeEventDef, read.ChainSpecificName, nameSpaceDef.IDL) + if err != nil { + return err + } + eventIDlDef, isOk := idlDef.(codec.IdlEvent) if !isOk { - return fmt.Errorf("unexpected type %T from IDL definition for log read: %q, with chainSpecificName: %q, of type: %q", eventIDlDef, genericName, read.ChainSpecificName, read.ReadType) + return fmt.Errorf("unexpected type %T from IDL definition for event read: %q, with chainSpecificName: %q, of type: %q", eventIDlDef, genericName, read.ChainSpecificName, read.ReadType) + } + + if err = s.addEventRead(namespace, genericName, nameSpaceDef.IDL, eventIDlDef, read); err != nil { + return err } - // TODO s.addLogRead() - return fmt.Errorf("implement me") default: return fmt.Errorf("unexpected read type %q for: %q in namespace: %q", read.ReadType, genericName, namespace) } @@ -283,7 +339,12 @@ func (s *SolanaChainReaderService) init(namespaces map[string]config.ChainContra return nil } -func (s *SolanaChainReaderService) addAccountRead(namespace string, genericName string, idl codec.IDL, idlType codec.IdlTypeDef, readDefinition config.ReadDefinition) error { +func (s *ContractReaderService) addAccountRead( + namespace, genericName string, + idl codec.IDL, + idlType codec.IdlTypeDef, + readDefinition config.ReadDefinition, +) error { if err := s.addCodecDef(false, namespace, genericName, codec.ChainConfigTypeAccountDef, idl, idlType, readDefinition.OutputModifications); err != nil { return err } @@ -308,6 +369,23 @@ func (s *SolanaChainReaderService) addAccountRead(namespace string, genericName return nil } +func (s *ContractReaderService) addEventRead( + namespace, genericName string, + idl codec.IDL, + idlType codec.IdlEvent, + readDefinition config.ReadDefinition, +) error { + // TODO: set log poller filter + + s.bindings.AddReadBinding(namespace, genericName, newEventReadBinding( + namespace, + genericName, + readDefinition.IndexedFields, + )) + + return nil +} + // injectAddressModifier injects AddressModifier into OutputModifications. // This is necessary because AddressModifier cannot be serialized and must be applied at runtime. func injectAddressModifier(inputModifications, outputModifications commoncodec.ModifiersConfig) { diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 019ef3e09..25a2196f1 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -60,11 +60,11 @@ func TestSolanaChainReaderService_ReaderInterface(t *testing.T) { RunContractReaderInterfaceTests(t, lsIt, true, false) } -func TestSolanaChainReaderService_ServiceCtx(t *testing.T) { +func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { t.Parallel() ctx := tests.Context(t) - svc, err := chainreader.NewChainReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}) + svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -100,7 +100,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -137,7 +137,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { client := new(mockedRPCClient) expectedErr := fmt.Errorf("expected error") - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -171,7 +171,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -192,7 +192,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -213,7 +213,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -394,7 +394,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) require.NoError(t, svc.Start(ctx)) @@ -444,7 +444,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef) client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) require.NoError(t, svc.Start(ctx)) @@ -681,7 +681,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) { func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader { client := new(mockedRPCClient) - svc, err := chainreader.NewChainReaderService(logger.Test(t), client, r.conf) + svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, nil) if err != nil { t.Logf("chain reader service was not able to start: %s", err.Error()) t.FailNow() @@ -707,7 +707,7 @@ type wrappedTestChainReader struct { types.UnimplementedContractReader test *testing.T - service *chainreader.SolanaChainReaderService + service *chainreader.ContractReaderService client *mockedRPCClient tester ChainComponentsInterfaceTester[*testing.T] testStructQueue []*TestStruct diff --git a/pkg/solana/chainreader/event_read_binding.go b/pkg/solana/chainreader/event_read_binding.go new file mode 100644 index 000000000..a6aba4214 --- /dev/null +++ b/pkg/solana/chainreader/event_read_binding.go @@ -0,0 +1,227 @@ +package chainreader + +import ( + "context" + "fmt" + "reflect" + "strings" + + "github.com/gagliardetto/solana-go" + + commoncodec "github.com/smartcontractkit/chainlink-common/pkg/codec" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" +) + +type EventsReader interface { + FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error) +} + +type eventReadBinding struct { + namespace, genericName string + codec types.RemoteCodec + modifier commoncodec.Modifier + key solana.PublicKey + remapper remapHelper + indexedSubkeys map[string]string + reader EventsReader +} + +func newEventReadBinding(namespace, genericName string, indexedSubkeys map[string]string) *eventReadBinding { + binding := &eventReadBinding{ + namespace: namespace, + genericName: genericName, + indexedSubkeys: indexedSubkeys, + } + + binding.remapper = remapHelper{binding.remapPrimitive} + + return binding +} + +func (b *eventReadBinding) SetAddress(key solana.PublicKey) { + b.key = key +} + +func (b *eventReadBinding) GetAddress(_ context.Context, _ any) (solana.PublicKey, error) { + return b.key, nil +} + +func (b *eventReadBinding) SetCodec(codec types.RemoteCodec) { + b.codec = codec +} + +func (b *eventReadBinding) SetModifier(modifier commoncodec.Modifier) { + b.modifier = modifier +} + +func (b *eventReadBinding) CreateType(forEncoding bool) (any, error) { + itemType := codec.WrapItemType(forEncoding, b.namespace, b.genericName, codec.ChainConfigTypeEventDef) + + return b.codec.CreateType(itemType, forEncoding) +} + +func (b *eventReadBinding) Decode(ctx context.Context, bts []byte, outVal any) error { + itemType := codec.WrapItemType(false, b.namespace, b.genericName, codec.ChainConfigTypeEventDef) + + return b.codec.Decode(ctx, bts, outVal, itemType) +} + +func (b *eventReadBinding) QueryKey( + ctx context.Context, + filter query.KeyFilter, + limitAndSort query.LimitAndSort, + sequenceDataType any, +) ([]types.Sequence, error) { + var err error + + if filter, err = b.remapper.remap(filter); err != nil { + return nil, err + } + + itemType := strings.Join([]string{b.namespace, b.genericName}, ".") + + logs, err := b.reader.FilteredLogs(ctx, filter.Expressions, limitAndSort, itemType) + if err != nil { + return nil, err + } + + sequences, err := b.decodeLogsIntoSequences(ctx, logs, sequenceDataType) + if err != nil { + return nil, err + } + + return sequences, nil +} + +func (b *eventReadBinding) remapPrimitive(expression query.Expression) (query.Expression, error) { + var ( + comp query.Expression + err error + ) + + switch primitive := expression.Primitive.(type) { + case *primitives.Comparator: + if comp, err = b.encodeComparator(primitive); err != nil { + return query.Expression{}, fmt.Errorf("failed to encode comparator %q: %w", primitive.Name, err) + } + case *primitives.Confidence: + // TODO: maybe use an ignore filter? + // confidence is ignored in solana + } + + return comp, nil +} + +func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) (query.Expression, error) { + onChainName, ok := b.indexedSubkeys[comparator.Name] + if !ok { + return query.Expression{}, fmt.Errorf("%w: unknown indexed subkey mapping %s", types.ErrInvalidConfig, comparator.Name) + } + + itemType := strings.Join([]string{b.namespace, b.genericName, comparator.Name}, ".") + + for idx, comp := range comparator.ValueComparators { + newValue, err := b.modifier.TransformToOnChain(comp.Value, itemType) + if err != nil { + return query.Expression{}, err + } + + comparator.ValueComparators[idx].Value = newValue + } + + return logpoller.NewEventSubkeyFilter(strings.Split(onChainName, "."), comparator.ValueComparators), nil +} + +func (b *eventReadBinding) decodeLogsIntoSequences( + ctx context.Context, + logs []logpoller.Log, + into any, +) ([]types.Sequence, error) { + sequences := make([]types.Sequence, len(logs)) + + for idx := range logs { + sequences[idx] = types.Sequence{ + Cursor: logpoller.FormatContractReaderCursor(logs[idx]), + Head: types.Head{ + Height: fmt.Sprint(logs[idx].BlockNumber), + Hash: solana.PublicKey(logs[idx].BlockHash).Bytes(), + Timestamp: uint64(logs[idx].BlockTimestamp.Unix()), + }, + } + + var typeVal reflect.Value + + typeInto := reflect.TypeOf(into) + if typeInto.Kind() == reflect.Pointer { + typeVal = reflect.New(typeInto.Elem()) + } else { + typeVal = reflect.Indirect(reflect.New(typeInto)) + } + + // create a new value of the same type as 'into' for the data to be extracted to + sequences[idx].Data = typeVal.Interface() + + if err := b.decodeLog(ctx, &logs[idx], sequences[idx].Data); err != nil { + return nil, err + } + } + + return sequences, nil +} + +func (b *eventReadBinding) decodeLog(ctx context.Context, log *logpoller.Log, into any) error { + itemType := codec.WrapItemType(false, b.namespace, b.genericName, codec.ChainConfigTypeEventDef) + + // decode non indexed topics and apply output modifiers + if err := b.codec.Decode(ctx, log.Data, into, itemType); err != nil { + return fmt.Errorf("%w: failed to decode log data: %s", types.ErrInvalidType, err.Error()) + } + + return nil +} + +type remapHelper struct { + primitive func(query.Expression) (query.Expression, error) +} + +func (r remapHelper) remap(filter query.KeyFilter) (query.KeyFilter, error) { + var remapped query.KeyFilter + + for _, expression := range filter.Expressions { + remappedExpression, err := r.remapExpression(filter.Key, expression) + if err != nil { + return query.KeyFilter{}, err + } + + remapped.Expressions = append(remapped.Expressions, remappedExpression) + } + + return remapped, nil +} + +func (r remapHelper) remapExpression(key string, expression query.Expression) (query.Expression, error) { + if !expression.IsPrimitive() { + remappedBoolExpressions := make([]query.Expression, len(expression.BoolExpression.Expressions)) + for i := range expression.BoolExpression.Expressions { + remapped, err := r.remapExpression(key, expression.BoolExpression.Expressions[i]) + if err != nil { + return query.Expression{}, err + } + + remappedBoolExpressions[i] = remapped + } + + if expression.BoolExpression.BoolOperator == query.AND { + return query.And(remappedBoolExpressions...), nil + } + + return query.Or(remappedBoolExpressions...), nil + } + + return r.primitive(expression) +} diff --git a/pkg/solana/codec/parsed_types.go b/pkg/solana/codec/parsed_types.go index 3144f6dd0..f9d571401 100644 --- a/pkg/solana/codec/parsed_types.go +++ b/pkg/solana/codec/parsed_types.go @@ -11,38 +11,47 @@ import ( type ParsedTypes struct { EncoderDefs map[string]Entry DecoderDefs map[string]Entry + Modifiers commoncodec.Modifier } func (parsed *ParsedTypes) ToCodec() (commontypes.RemoteCodec, error) { - modByTypeName := map[string]commoncodec.Modifier{} - if err := AddEntries(parsed.EncoderDefs, modByTypeName); err != nil { + var ( + modByTypeName = map[string]commoncodec.Modifier{} + mod commoncodec.Modifier + err error + ) + + if err = AddEntries(parsed.EncoderDefs, modByTypeName); err != nil { return nil, err } - if err := AddEntries(parsed.DecoderDefs, modByTypeName); err != nil { + + if err = AddEntries(parsed.DecoderDefs, modByTypeName); err != nil { return nil, err } - mod, err := commoncodec.NewByItemTypeModifier(modByTypeName) - if err != nil { + if mod, err = commoncodec.NewByItemTypeModifier(modByTypeName); err != nil { return nil, err } + + parsed.Modifiers = mod + underlying := &solanaCodec{ Encoder: &Encoder{definitions: parsed.EncoderDefs}, Decoder: &Decoder{definitions: parsed.DecoderDefs}, ParsedTypes: parsed, } + return commoncodec.NewModifierCodec(underlying, mod, DecoderHooks...) } -// AddEntries extracts the mods from entry and adds them to modByTypeName use with codec.NewByItemTypeModifier -// Since each input/output can have its own modifications, we need to keep track of them by type name func AddEntries(defs map[string]Entry, modByTypeName map[string]commoncodec.Modifier) error { for k, def := range defs { modByTypeName[k] = def.Modifier() - _, err := def.Modifier().RetypeToOffChain(reflect.PointerTo(def.GetType()), k) - if err != nil { + + if _, err := def.Modifier().RetypeToOffChain(reflect.PointerTo(def.GetType()), k); err != nil { return fmt.Errorf("%w: cannot retype %v: %w", commontypes.ErrInvalidConfig, k, err) } } + return nil } diff --git a/pkg/solana/codec/solana.go b/pkg/solana/codec/solana.go index 08ff964a9..c97a0f461 100644 --- a/pkg/solana/codec/solana.go +++ b/pkg/solana/codec/solana.go @@ -174,6 +174,40 @@ func NewIDLInstructionsCodec(idl IDL, builder commonencodings.Builder) (commonty return typeCodecs, nil } +func NewIDLEventCodec(idl IDL, builder commonencodings.Builder) (commontypes.RemoteCodec, error) { + typeCodecs := make(commonencodings.LenientCodecFromTypeCodec) + refs := &codecRefs{ + builder: builder, + codecs: make(map[string]commonencodings.TypeCodec), + typeDefs: idl.Types, + dependencies: make(map[string][]string), + } + + for _, event := range idl.Events { + name, instCodec, err := asStruct(eventFieldsAsStandardFields(event.Fields), refs, event.Name, false, false) + if err != nil { + return nil, err + } + + typeCodecs[name] = instCodec + } + + return typeCodecs, nil +} + +func eventFieldsAsStandardFields(event []IdlEventField) []IdlField { + output := make([]IdlField, len(event)) + + for idx := range output { + output[idx] = IdlField{ + Name: event[idx].Name, + Type: event[idx].Type, + } + } + + return output +} + func NewNamedModifierCodec(original commontypes.RemoteCodec, itemType string, modifier commoncodec.Modifier) (commontypes.RemoteCodec, error) { mod, err := commoncodec.NewByItemTypeModifier(map[string]commoncodec.Modifier{itemType: modifier}) if err != nil { diff --git a/pkg/solana/codec/solana_test.go b/pkg/solana/codec/solana_test.go index 4dd116691..d5d14b453 100644 --- a/pkg/solana/codec/solana_test.go +++ b/pkg/solana/codec/solana_test.go @@ -12,6 +12,7 @@ import ( codeccommon "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/codec/encodings/binary" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/interfacetests" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" @@ -23,14 +24,13 @@ func TestNewIDLAccountCodec(t *testing.T) { t.Parallel() ctx := tests.Context(t) - _, _, entry := newTestIDLAndCodec(t, true) + _, _, entry := newTestIDLAndCodec(t, accountIDLType) expected := testutils.DefaultTestStruct bts, err := entry.Encode(ctx, expected, testutils.TestStructWithNestedStruct) // length of fields + discriminator require.Equal(t, 262, len(bts)) - require.NoError(t, err) var decoded testutils.StructWithNestedStruct @@ -39,12 +39,32 @@ func TestNewIDLAccountCodec(t *testing.T) { require.Equal(t, expected, decoded) } +func TestCodecProperties(t *testing.T) { + t.Parallel() + + tester := &codecInterfaceTester{} + ctx := tests.Context(t) + _, _, entry := newTestIDLAndCodec(t, eventIDLType) + + expected := interfacetests.CreateTestStruct(1, tester) + bts, err := entry.Encode(ctx, expected, interfacetests.TestItemType) + + // length of fields + discriminator + require.Equal(t, 262, len(bts)) + require.NoError(t, err) + + var decoded interfacetests.TestStruct + + require.NoError(t, entry.Decode(ctx, bts, &decoded, interfacetests.TestItemType)) + require.Equal(t, expected, decoded) +} + func TestNewIDLDefinedTypesCodecCodec(t *testing.T) { /// TODO BCI-3155 this should run the codec interface tests t.Parallel() ctx := tests.Context(t) - _, _, entry := newTestIDLAndCodec(t, false) + _, _, entry := newTestIDLAndCodec(t, definedTypesIDLType) expected := testutils.DefaultTestStruct bts, err := entry.Encode(ctx, expected, testutils.TestStructWithNestedStructType) @@ -64,7 +84,7 @@ func TestNewIDLCodec_WithModifiers(t *testing.T) { t.Parallel() ctx := tests.Context(t) - _, _, idlCodec := newTestIDLAndCodec(t, true) + _, _, idlCodec := newTestIDLAndCodec(t, accountIDLType) modConfig := codeccommon.ModifiersConfig{ &codeccommon.RenameModifierConfig{Fields: map[string]string{"Value": "V"}}, } @@ -143,21 +163,48 @@ func TestNewIDLCodec_CircularDependency(t *testing.T) { assert.ErrorIs(t, err, types.ErrInvalidConfig) } -func newTestIDLAndCodec(t *testing.T, account bool) (string, codec.IDL, types.RemoteCodec) { +type idlType string + +const ( + accountIDLType idlType = "account" + definedTypesIDLType idlType = "types" + instructionIDLType idlType = "instruction" + eventIDLType idlType = "event" +) + +func newTestIDLAndCodec(t *testing.T, idlTP idlType) (string, codec.IDL, types.RemoteCodec) { t.Helper() + var idlDef string + + switch idlTP { + case accountIDLType, definedTypesIDLType: + idlDef = testutils.JSONIDLWithAllTypes + case eventIDLType: + defs := testutils.CodecDefs[testutils.TestEventItem] + idlDef = defs.IDL + } + var idl codec.IDL - if err := json.Unmarshal([]byte(testutils.JSONIDLWithAllTypes), &idl); err != nil { + if err := json.Unmarshal([]byte(idlDef), &idl); err != nil { t.Logf("failed to unmarshal test IDL: %s", err.Error()) t.FailNow() } - var entry types.RemoteCodec - var err error - if account { + var ( + entry types.RemoteCodec + err error + ) + + switch idlTP { + case accountIDLType: entry, err = codec.NewIDLAccountCodec(idl, binary.LittleEndian()) - } else { + case definedTypesIDLType: entry, err = codec.NewIDLDefinedTypesCodec(idl, binary.LittleEndian()) + case instructionIDLType: + entry, err = codec.NewIDLInstructionsCodec(idl, binary.LittleEndian()) + case eventIDLType: + entry, err = codec.NewIDLEventCodec(idl, binary.LittleEndian()) } if err != nil { @@ -167,5 +214,5 @@ func newTestIDLAndCodec(t *testing.T, account bool) (string, codec.IDL, types.Re require.NotNil(t, entry) - return testutils.JSONIDLWithAllTypes, idl, entry + return idlDef, idl, entry } diff --git a/pkg/solana/config/chain_reader.go b/pkg/solana/config/chain_reader.go index ab09e013a..a6362ea1c 100644 --- a/pkg/solana/config/chain_reader.go +++ b/pkg/solana/config/chain_reader.go @@ -27,6 +27,7 @@ type ReadDefinition struct { InputModifications commoncodec.ModifiersConfig `json:"inputModifications,omitempty"` OutputModifications commoncodec.ModifiersConfig `json:"outputModifications,omitempty"` PDADefiniton codec.PDATypeDef `json:"pdaDefinition,omitempty"` // Only used for PDA account reads + IndexedFields map[string]string `json:"indexedFields,omitempty"` } type ReadType int diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index fcb3a8da9..9c62743e5 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -129,12 +129,18 @@ func (v *pgDSLParser) VisitAddressFilter(p *addressFilter) { ) } -func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { - v.expression = fmt.Sprintf( - "%s = :%s", - eventSigFieldName, - v.args.withIndexedField(eventSigFieldName, p.eventSig), - ) +func (v *pgDSLParser) VisitEventSubkeyFilter(p *eventSubkeyFilter) { + // TODO: build a proper expression + // TODO: the value type will be the off-chain field type that a raw IDL codec would decode into + // this value will need to be wrapped in a special type that will encode the value properly for + // direct comparison. + /* + v.expression = fmt.Sprintf( + "%s = :%s", + eventSigFieldName, + v.args.withIndexedField(eventSigFieldName, p.eventSig), + ) + */ } func (v *pgDSLParser) buildQuery( @@ -396,19 +402,26 @@ func (f *addressFilter) Accept(visitor primitives.Visitor) { } } -type eventSigFilter struct { - eventSig []byte +type eventSubkeyFilter struct { + Subkey []string + ValueComparers []primitives.ValueComparator } -func NewEventSigFilter(sig []byte) query.Expression { - return query.Expression{ - Primitive: &eventSigFilter{eventSig: sig}, - } +func NewEventSubkeyFilter(subkey []string, valueComparers []primitives.ValueComparator) query.Expression { + return query.Expression{Primitive: &eventSubkeyFilter{ + Subkey: subkey, + ValueComparers: valueComparers, + }} } -func (f *eventSigFilter) Accept(visitor primitives.Visitor) { +func (f *eventSubkeyFilter) Accept(visitor primitives.Visitor) { switch v := visitor.(type) { case *pgDSLParser: - v.VisitEventSigFilter(f) + v.VisitEventSubkeyFilter(f) } } + +// MakeContractReaderCursor is exported to ensure cursor structure remains consistent. +func FormatContractReaderCursor(log Log) string { + return fmt.Sprintf("%d-%d-%s", log.BlockNumber, log.LogIndex, log.TxHash) +} diff --git a/pkg/solana/logpoller/parser_test.go b/pkg/solana/logpoller/parser_test.go index 96d2d8656..4ca6cccff 100644 --- a/pkg/solana/logpoller/parser_test.go +++ b/pkg/solana/logpoller/parser_test.go @@ -53,7 +53,7 @@ func TestDSLParser(t *testing.T) { parser := &pgDSLParser{} expressions := []query.Expression{ NewAddressFilter(pk), - NewEventSigFilter([]byte("test")), + NewEventSubkeyFilter([]string{"test"}, nil), query.Confidence(primitives.Unconfirmed), } limiter := query.NewLimitAndSort(query.CursorLimit(fmt.Sprintf("10-5-%s", txHash), query.CursorFollowing, 20)) @@ -82,7 +82,7 @@ func TestDSLParser(t *testing.T) { parser := &pgDSLParser{} expressions := []query.Expression{ NewAddressFilter(pk), - NewEventSigFilter([]byte("test")), + NewEventSubkeyFilter([]string{"test"}, nil), } limiter := query.NewLimitAndSort(query.CountLimit(20)) @@ -237,7 +237,7 @@ func TestDSLParser(t *testing.T) { t.Run("nested query deep", func(t *testing.T) { t.Parallel() - sigFilter := NewEventSigFilter([]byte("test")) + sigFilter := NewEventSubkeyFilter([]string{"test"}, nil) parser := &pgDSLParser{} expressions := []query.Expression{