From 06683c1d9172d2a2a21901ef9a4e27f61126ce3c Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Fri, 31 Jan 2025 11:41:51 -0600 Subject: [PATCH] update to positioned subkeys --- pkg/solana/chainreader/chain_reader.go | 22 ++- pkg/solana/chainreader/event_read_binding.go | 13 +- pkg/solana/codec/encoder_test.go | 2 +- pkg/solana/config/chain_reader.go | 11 +- pkg/solana/logpoller/parser.go | 172 ++++++++----------- pkg/solana/logpoller/parser_test.go | 34 ++-- 6 files changed, 129 insertions(+), 125 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index ab26832c5..d8815f66e 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -392,18 +392,21 @@ func (s *ContractReaderService) addEventRead( readDefinition config.ReadDefinition, events EventsReader, ) error { - subKeys := [][]string{} - for _, onChain := range readDefinition.IndexedFields { - subKeys = append(subKeys, strings.Split(onChain, ".")) - } + mappedTuples := make(map[string]subkeyTuple) + subKeys := [4][]string{} + + applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField0, 0) + applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField1, 1) + applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField2, 2) + applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField3, 3) - filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys) + filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:]) s.filters = append(s.filters, filter) s.bindings.AddReadBinding(namespace, genericName, newEventReadBinding( namespace, genericName, - readDefinition.IndexedFields, + mappedTuples, events, filter.EventSig, )) @@ -462,3 +465,10 @@ func toLPFilter( MaxLogsKept: f.MaxLogsKept, } } + +func applyIndexedFieldTuple(lookup map[string]subkeyTuple, subKeys [4][]string, conf *config.IndexedField, idx uint64) { + if conf != nil { + lookup[conf.OffChainPath] = subkeyTuple{conf.OnChainPath, idx} + subKeys[idx] = strings.Split(conf.OnChainPath, ".") + } +} diff --git a/pkg/solana/chainreader/event_read_binding.go b/pkg/solana/chainreader/event_read_binding.go index 2d06ab85f..dcd70d5ac 100644 --- a/pkg/solana/chainreader/event_read_binding.go +++ b/pkg/solana/chainreader/event_read_binding.go @@ -17,20 +17,25 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" ) +type subkeyTuple struct { + string + uint64 +} + type eventReadBinding struct { namespace, genericName string codec types.RemoteCodec modifier commoncodec.Modifier key solana.PublicKey remapper remapHelper - indexedSubkeys map[string]string + indexedSubkeys map[string]subkeyTuple events EventsReader eventSig [logpoller.EventSignatureLength]byte } func newEventReadBinding( namespace, genericName string, - indexedSubkeys map[string]string, + indexedSubkeys map[string]subkeyTuple, events EventsReader, eventSig [logpoller.EventSignatureLength]byte, ) *eventReadBinding { @@ -135,7 +140,7 @@ func (b *eventReadBinding) remapPrimitive(expression query.Expression) (query.Ex } func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) (query.Expression, error) { - onChainName, ok := b.indexedSubkeys[comparator.Name] + onChainTuple, ok := b.indexedSubkeys[comparator.Name] if !ok { return query.Expression{}, fmt.Errorf("%w: unknown indexed subkey mapping %s", types.ErrInvalidConfig, comparator.Name) } @@ -152,7 +157,7 @@ func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) ( comparator.ValueComparators[idx].Value = newValue } - return logpoller.NewEventSubkeyFilter(strings.Split(onChainName, "."), comparator.ValueComparators), nil + return logpoller.NewEventBySubKeyFilter(onChainTuple.uint64, comparator.ValueComparators) } func (b *eventReadBinding) decodeLogsIntoSequences( diff --git a/pkg/solana/codec/encoder_test.go b/pkg/solana/codec/encoder_test.go index c6078004b..2356ec8eb 100644 --- a/pkg/solana/codec/encoder_test.go +++ b/pkg/solana/codec/encoder_test.go @@ -35,7 +35,7 @@ func (e *testErrEncodeTypeEntry) GetCodecType() commonencodings.TypeCodec { } func TestEncoder_Encode_Errors(t *testing.T) { - someType := "intput.Some.Type" + someType := "input.Some.Type" t.Run("error when item type not found", func(t *testing.T) { _, err := newEncoder(map[string]Entry{}). diff --git a/pkg/solana/config/chain_reader.go b/pkg/solana/config/chain_reader.go index f2b067c4f..de3fa361e 100644 --- a/pkg/solana/config/chain_reader.go +++ b/pkg/solana/config/chain_reader.go @@ -31,7 +31,10 @@ type ReadDefinition struct { InputModifications commoncodec.ModifiersConfig `json:"inputModifications,omitempty"` OutputModifications commoncodec.ModifiersConfig `json:"outputModifications,omitempty"` PDADefiniton codec.PDATypeDef `json:"pdaDefinition,omitempty"` // Only used for PDA account reads - IndexedFields map[string]string `json:"indexedFields,omitempty"` + IndexedField0 *IndexedField `json:"indexedField0"` + IndexedField1 *IndexedField `json:"indexedField1"` + IndexedField2 *IndexedField `json:"indexedField2"` + IndexedField3 *IndexedField `json:"indexedField3"` // This will create a log poller filter for this event. *PollingFilter `json:"pollingFilter,omitempty"` } @@ -54,6 +57,12 @@ func (r ReadType) String() string { } } +type IndexedField struct { + OffChainPath string `json:"offChainPath"` + OnChainPath string `json:"onChainPath"` + IndexPosition int `json:"-"` +} + func (c *ChainContractReader) UnmarshalJSON(bytes []byte) error { rawJSON := make(map[string]json.RawMessage) if err := json.Unmarshal(bytes, &rawJSON); err != nil { diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index f0348cc7e..aa6a026ea 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -21,7 +21,6 @@ const ( txHashFieldName = "tx_hash" addressFieldName = "address" eventSigFieldName = "event_sig" - eventSubkeyFieldName = "event_subkey" defaultSort = "block_number ASC, log_index ASC" subKeyValuesFieldName = "subkey_values" subKeyValueArg = "subkey_value" @@ -44,6 +43,11 @@ var ( "event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted", "is_backfilled"} ) +type IndexedValueComparator struct { + Value IndexedValue + Operator primitives.ComparisonOperator +} + // The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression // values after every call. type pgDSLParser struct { @@ -58,80 +62,6 @@ var _ primitives.Visitor = (*pgDSLParser)(nil) func (v *pgDSLParser) Comparator(_ primitives.Comparator) {} -type IndexedValueComparator struct { - Value IndexedValue - Operator primitives.ComparisonOperator -} - -type eventBySubKeyFilter struct { - SubKeyIndex uint64 - ValueComparers []IndexedValueComparator -} - -func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) { - switch v := visitor.(type) { - case *pgDSLParser: - v.VisitEventSubKeysByValueFilter(f) - } -} - -func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) { - var indexedValueComparators []IndexedValueComparator - for _, cmp := range valueComparers { - iVal, err := newIndexedValue(cmp.Value) - if err != nil { - return query.Expression{}, err - } - iValCmp := IndexedValueComparator{ - Value: iVal, - Operator: cmp.Operator, - } - indexedValueComparators = append(indexedValueComparators, iValCmp) - } - return query.Expression{ - Primitive: &eventBySubKeyFilter{ - SubKeyIndex: subKeyIndex, - ValueComparers: indexedValueComparators, - }, - }, nil -} - -func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) { - if len(p.ValueComparers) > 0 { - if p.SubKeyIndex > 3 { // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding more db indexes - v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex) - return - } - - // Add 1 since postgresql arrays are 1-indexed. - subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1) - - comps := make([]string, len(p.ValueComparers)) - for idx, comp := range p.ValueComparers { - comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s") - if v.err != nil { - return - } - } - - v.expression = strings.Join(comps, " AND ") - } -} - -func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) { - cmp, err := cmpOpToString(comp.Operator) - if err != nil { - return "", err - } - - return fmt.Sprintf( - pattern, - subfield, - cmp, - args.withIndexedField(field, comp.Value), - ), nil -} - func (v *pgDSLParser) Block(prim primitives.Block) { cmp, err := cmpOpToString(prim.Operator) if err != nil { @@ -207,18 +137,38 @@ func (v *pgDSLParser) VisitAddressFilter(p *addressFilter) { ) } -func (v *pgDSLParser) VisitEventSubkeyFilter(p *eventSubkeyFilter) { - // TODO: build a proper expression - // TODO: the value type will be the off-chain field type that a raw IDL codec would decode into - // this value will need to be wrapped in a special type that will encode the value properly for - // direct comparison. +func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { v.expression = fmt.Sprintf( "%s = :%s", - eventSubkeyFieldName, - v.args.withIndexedField(eventSubkeyFieldName, p.Subkey), + eventSigFieldName, + v.args.withIndexedField(eventSigFieldName, p.eventSig), ) } +func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) { + if len(p.ValueComparers) > 0 { + // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding + // more db indexes. + if p.SubKeyIndex > 3 { + v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex) + return + } + + // Add 1 since postgresql arrays are 1-indexed. + subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1) + + comps := make([]string, len(p.ValueComparers)) + for idx, comp := range p.ValueComparers { + comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s") + if v.err != nil { + return + } + } + + v.expression = strings.Join(comps, " AND ") + } +} + func (v *pgDSLParser) buildQuery( chainID string, expressions []query.Expression, @@ -495,34 +445,54 @@ func (f *eventSigFilter) Accept(visitor primitives.Visitor) { } } -func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { - v.expression = fmt.Sprintf( - "%s = :%s", - eventSigFieldName, - v.args.withIndexedField(eventSigFieldName, p.eventSig), - ) -} - -type eventSubkeyFilter struct { - Subkey []string - ValueComparers []primitives.ValueComparator +type eventBySubKeyFilter struct { + SubKeyIndex uint64 + ValueComparers []IndexedValueComparator } -func NewEventSubkeyFilter(subkey []string, valueComparers []primitives.ValueComparator) query.Expression { - return query.Expression{Primitive: &eventSubkeyFilter{ - Subkey: subkey, - ValueComparers: valueComparers, - }} +func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) { + var indexedValueComparators []IndexedValueComparator + for _, cmp := range valueComparers { + iVal, err := newIndexedValue(cmp.Value) + if err != nil { + return query.Expression{}, err + } + iValCmp := IndexedValueComparator{ + Value: iVal, + Operator: cmp.Operator, + } + indexedValueComparators = append(indexedValueComparators, iValCmp) + } + return query.Expression{ + Primitive: &eventBySubKeyFilter{ + SubKeyIndex: subKeyIndex, + ValueComparers: indexedValueComparators, + }, + }, nil } -func (f *eventSubkeyFilter) Accept(visitor primitives.Visitor) { +func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) { switch v := visitor.(type) { case *pgDSLParser: - v.VisitEventSubkeyFilter(f) + v.VisitEventSubKeysByValueFilter(f) } } -// MakeContractReaderCursor is exported to ensure cursor structure remains consistent. +// FormatContractReaderCursor is exported to ensure cursor structure remains consistent. func FormatContractReaderCursor(log Log) string { return fmt.Sprintf("%d-%d-%s", log.BlockNumber, log.LogIndex, log.TxHash) } + +func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) { + cmp, err := cmpOpToString(comp.Operator) + if err != nil { + return "", err + } + + return fmt.Sprintf( + pattern, + subfield, + cmp, + args.withIndexedField(field, comp.Value), + ), nil +} diff --git a/pkg/solana/logpoller/parser_test.go b/pkg/solana/logpoller/parser_test.go index f04cd438d..e9e444161 100644 --- a/pkg/solana/logpoller/parser_test.go +++ b/pkg/solana/logpoller/parser_test.go @@ -50,14 +50,18 @@ func TestDSLParser(t *testing.T) { _, _ = rand.Read(pk[:]) + subkey, err := NewEventBySubKeyFilter(0, []primitives.ValueComparator{ + {Value: 42, Operator: primitives.Gte}, + {Value: "test_value", Operator: primitives.Eq}, + }) + + require.NoError(t, err) + parser := &pgDSLParser{} expressions := []query.Expression{ NewAddressFilter(pk), NewEventSigFilter([]byte("test")), - NewEventSubkeyFilter([]string{"test"}, []primitives.ValueComparator{ - {Value: 42, Operator: primitives.Gte}, - {Value: "test_value", Operator: primitives.Eq}, - }), + subkey, query.Confidence(primitives.Unconfirmed), } limiter := query.NewLimitAndSort(query.CursorLimit(fmt.Sprintf("10-5-%s", txHash), query.CursorFollowing, 20)) @@ -65,7 +69,8 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := logsQuery( " WHERE chain_id = :chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 AND event_subkey = :event_subkey_0) " + + "AND (address = :address_0 AND event_sig = :event_sig_0 AND subkey_values[:subkey_index_0] >= :subkey_value_0 " + + "AND subkey_values[:subkey_index_0] = :subkey_value_1) " + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number " + "AND log_index > :cursor_log_index)) " + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC LIMIT 20") @@ -73,7 +78,7 @@ func TestDSLParser(t *testing.T) { require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 6) + assertArgs(t, args, 8) }) t.Run("query with limit and no order by", func(t *testing.T) { @@ -83,28 +88,33 @@ func TestDSLParser(t *testing.T) { _, _ = rand.Read(pk[:]) + subkey, err := NewEventBySubKeyFilter(0, []primitives.ValueComparator{ + {Value: 42, Operator: primitives.Gte}, + {Value: "test_value", Operator: primitives.Eq}, + }) + + require.NoError(t, err) + parser := &pgDSLParser{} expressions := []query.Expression{ NewAddressFilter(pk), NewEventSigFilter([]byte("test")), - NewEventSubkeyFilter([]string{"test"}, []primitives.ValueComparator{ - {Value: 42, Operator: primitives.Gte}, - {Value: "test_value", Operator: primitives.Eq}, - }), + subkey, } limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := logsQuery( " WHERE chain_id = :chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 AND event_subkey = :event_subkey_0) " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND subkey_values[:subkey_index_0] >= :subkey_value_0 AND subkey_values[:subkey_index_0] = :subkey_value_1) " + "ORDER BY " + defaultSort + " " + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 4) + assertArgs(t, args, 6) }) t.Run("query with order by sequence no cursor no limit", func(t *testing.T) {