Skip to content

Commit

Permalink
update to positioned subkeys
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Feb 3, 2025
1 parent 2817e8b commit 06683c1
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 125 deletions.
22 changes: 16 additions & 6 deletions pkg/solana/chainreader/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,21 @@ func (s *ContractReaderService) addEventRead(
readDefinition config.ReadDefinition,
events EventsReader,
) error {
subKeys := [][]string{}
for _, onChain := range readDefinition.IndexedFields {
subKeys = append(subKeys, strings.Split(onChain, "."))
}
mappedTuples := make(map[string]subkeyTuple)
subKeys := [4][]string{}

applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField0, 0)
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField1, 1)
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField2, 2)
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField3, 3)

filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys)
filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:])

s.filters = append(s.filters, filter)
s.bindings.AddReadBinding(namespace, genericName, newEventReadBinding(
namespace,
genericName,
readDefinition.IndexedFields,
mappedTuples,
events,
filter.EventSig,
))
Expand Down Expand Up @@ -462,3 +465,10 @@ func toLPFilter(
MaxLogsKept: f.MaxLogsKept,
}
}

func applyIndexedFieldTuple(lookup map[string]subkeyTuple, subKeys [4][]string, conf *config.IndexedField, idx uint64) {
if conf != nil {
lookup[conf.OffChainPath] = subkeyTuple{conf.OnChainPath, idx}
subKeys[idx] = strings.Split(conf.OnChainPath, ".")
}
}
13 changes: 9 additions & 4 deletions pkg/solana/chainreader/event_read_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ import (
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
)

type subkeyTuple struct {
string
uint64
}

type eventReadBinding struct {
namespace, genericName string
codec types.RemoteCodec
modifier commoncodec.Modifier
key solana.PublicKey
remapper remapHelper
indexedSubkeys map[string]string
indexedSubkeys map[string]subkeyTuple
events EventsReader
eventSig [logpoller.EventSignatureLength]byte
}

func newEventReadBinding(
namespace, genericName string,
indexedSubkeys map[string]string,
indexedSubkeys map[string]subkeyTuple,
events EventsReader,
eventSig [logpoller.EventSignatureLength]byte,
) *eventReadBinding {
Expand Down Expand Up @@ -135,7 +140,7 @@ func (b *eventReadBinding) remapPrimitive(expression query.Expression) (query.Ex
}

func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) (query.Expression, error) {
onChainName, ok := b.indexedSubkeys[comparator.Name]
onChainTuple, ok := b.indexedSubkeys[comparator.Name]
if !ok {
return query.Expression{}, fmt.Errorf("%w: unknown indexed subkey mapping %s", types.ErrInvalidConfig, comparator.Name)
}
Expand All @@ -152,7 +157,7 @@ func (b *eventReadBinding) encodeComparator(comparator *primitives.Comparator) (
comparator.ValueComparators[idx].Value = newValue
}

return logpoller.NewEventSubkeyFilter(strings.Split(onChainName, "."), comparator.ValueComparators), nil
return logpoller.NewEventBySubKeyFilter(onChainTuple.uint64, comparator.ValueComparators)
}

func (b *eventReadBinding) decodeLogsIntoSequences(
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/codec/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (e *testErrEncodeTypeEntry) GetCodecType() commonencodings.TypeCodec {
}

func TestEncoder_Encode_Errors(t *testing.T) {
someType := "intput.Some.Type"
someType := "input.Some.Type"

t.Run("error when item type not found", func(t *testing.T) {
_, err := newEncoder(map[string]Entry{}).
Expand Down
11 changes: 10 additions & 1 deletion pkg/solana/config/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ type ReadDefinition struct {
InputModifications commoncodec.ModifiersConfig `json:"inputModifications,omitempty"`
OutputModifications commoncodec.ModifiersConfig `json:"outputModifications,omitempty"`
PDADefiniton codec.PDATypeDef `json:"pdaDefinition,omitempty"` // Only used for PDA account reads
IndexedFields map[string]string `json:"indexedFields,omitempty"`
IndexedField0 *IndexedField `json:"indexedField0"`
IndexedField1 *IndexedField `json:"indexedField1"`
IndexedField2 *IndexedField `json:"indexedField2"`
IndexedField3 *IndexedField `json:"indexedField3"`
// This will create a log poller filter for this event.
*PollingFilter `json:"pollingFilter,omitempty"`
}
Expand All @@ -54,6 +57,12 @@ func (r ReadType) String() string {
}
}

type IndexedField struct {
OffChainPath string `json:"offChainPath"`
OnChainPath string `json:"onChainPath"`
IndexPosition int `json:"-"`
}

func (c *ChainContractReader) UnmarshalJSON(bytes []byte) error {
rawJSON := make(map[string]json.RawMessage)
if err := json.Unmarshal(bytes, &rawJSON); err != nil {
Expand Down
172 changes: 71 additions & 101 deletions pkg/solana/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
txHashFieldName = "tx_hash"
addressFieldName = "address"
eventSigFieldName = "event_sig"
eventSubkeyFieldName = "event_subkey"
defaultSort = "block_number ASC, log_index ASC"
subKeyValuesFieldName = "subkey_values"
subKeyValueArg = "subkey_value"
Expand All @@ -44,6 +43,11 @@ var (
"event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted", "is_backfilled"}
)

type IndexedValueComparator struct {
Value IndexedValue
Operator primitives.ComparisonOperator
}

// The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression
// values after every call.
type pgDSLParser struct {
Expand All @@ -58,80 +62,6 @@ var _ primitives.Visitor = (*pgDSLParser)(nil)

func (v *pgDSLParser) Comparator(_ primitives.Comparator) {}

type IndexedValueComparator struct {
Value IndexedValue
Operator primitives.ComparisonOperator
}

type eventBySubKeyFilter struct {
SubKeyIndex uint64
ValueComparers []IndexedValueComparator
}

func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) {
switch v := visitor.(type) {
case *pgDSLParser:
v.VisitEventSubKeysByValueFilter(f)
}
}

func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) {
var indexedValueComparators []IndexedValueComparator
for _, cmp := range valueComparers {
iVal, err := newIndexedValue(cmp.Value)
if err != nil {
return query.Expression{}, err
}
iValCmp := IndexedValueComparator{
Value: iVal,
Operator: cmp.Operator,
}
indexedValueComparators = append(indexedValueComparators, iValCmp)
}
return query.Expression{
Primitive: &eventBySubKeyFilter{
SubKeyIndex: subKeyIndex,
ValueComparers: indexedValueComparators,
},
}, nil
}

func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) {
if len(p.ValueComparers) > 0 {
if p.SubKeyIndex > 3 { // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding more db indexes
v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex)
return
}

// Add 1 since postgresql arrays are 1-indexed.
subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1)

comps := make([]string, len(p.ValueComparers))
for idx, comp := range p.ValueComparers {
comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s")
if v.err != nil {
return
}
}

v.expression = strings.Join(comps, " AND ")
}
}

func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) {
cmp, err := cmpOpToString(comp.Operator)
if err != nil {
return "", err
}

return fmt.Sprintf(
pattern,
subfield,
cmp,
args.withIndexedField(field, comp.Value),
), nil
}

func (v *pgDSLParser) Block(prim primitives.Block) {
cmp, err := cmpOpToString(prim.Operator)
if err != nil {
Expand Down Expand Up @@ -207,18 +137,38 @@ func (v *pgDSLParser) VisitAddressFilter(p *addressFilter) {
)
}

func (v *pgDSLParser) VisitEventSubkeyFilter(p *eventSubkeyFilter) {
// TODO: build a proper expression
// TODO: the value type will be the off-chain field type that a raw IDL codec would decode into
// this value will need to be wrapped in a special type that will encode the value properly for
// direct comparison.
func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) {
v.expression = fmt.Sprintf(
"%s = :%s",
eventSubkeyFieldName,
v.args.withIndexedField(eventSubkeyFieldName, p.Subkey),
eventSigFieldName,
v.args.withIndexedField(eventSigFieldName, p.eventSig),
)
}

func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) {
if len(p.ValueComparers) > 0 {
// For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding
// more db indexes.
if p.SubKeyIndex > 3 {
v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex)
return
}

// Add 1 since postgresql arrays are 1-indexed.
subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1)

comps := make([]string, len(p.ValueComparers))
for idx, comp := range p.ValueComparers {
comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s")
if v.err != nil {
return
}
}

v.expression = strings.Join(comps, " AND ")
}
}

func (v *pgDSLParser) buildQuery(
chainID string,
expressions []query.Expression,
Expand Down Expand Up @@ -495,34 +445,54 @@ func (f *eventSigFilter) Accept(visitor primitives.Visitor) {
}
}

func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) {
v.expression = fmt.Sprintf(
"%s = :%s",
eventSigFieldName,
v.args.withIndexedField(eventSigFieldName, p.eventSig),
)
}

type eventSubkeyFilter struct {
Subkey []string
ValueComparers []primitives.ValueComparator
type eventBySubKeyFilter struct {
SubKeyIndex uint64
ValueComparers []IndexedValueComparator
}

func NewEventSubkeyFilter(subkey []string, valueComparers []primitives.ValueComparator) query.Expression {
return query.Expression{Primitive: &eventSubkeyFilter{
Subkey: subkey,
ValueComparers: valueComparers,
}}
func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) {
var indexedValueComparators []IndexedValueComparator
for _, cmp := range valueComparers {
iVal, err := newIndexedValue(cmp.Value)
if err != nil {
return query.Expression{}, err
}
iValCmp := IndexedValueComparator{
Value: iVal,
Operator: cmp.Operator,
}
indexedValueComparators = append(indexedValueComparators, iValCmp)
}
return query.Expression{
Primitive: &eventBySubKeyFilter{
SubKeyIndex: subKeyIndex,
ValueComparers: indexedValueComparators,
},
}, nil
}

func (f *eventSubkeyFilter) Accept(visitor primitives.Visitor) {
func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) {
switch v := visitor.(type) {
case *pgDSLParser:
v.VisitEventSubkeyFilter(f)
v.VisitEventSubKeysByValueFilter(f)
}
}

// MakeContractReaderCursor is exported to ensure cursor structure remains consistent.
// FormatContractReaderCursor is exported to ensure cursor structure remains consistent.
func FormatContractReaderCursor(log Log) string {
return fmt.Sprintf("%d-%d-%s", log.BlockNumber, log.LogIndex, log.TxHash)
}

func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) {
cmp, err := cmpOpToString(comp.Operator)
if err != nil {
return "", err
}

return fmt.Sprintf(
pattern,
subfield,
cmp,
args.withIndexedField(field, comp.Value),
), nil
}
Loading

0 comments on commit 06683c1

Please sign in to comment.