Skip to content

Commit

Permalink
[NONEVM-916] logpoller log processing & decoding (#1002)
Browse files Browse the repository at this point in the history
* Add client & collector to LogPoller struct

Also:
- Add GetBlocksWithOpts to Reader
- Update loader.RPClient to be consistent with client.Reader interface

* Re-generate RPClient & ReaderWriter mocks

* Update loader tests for interface

* Add ILogpoller interface and call NewLogPoller() from NewChain()

Note: this will hopefully be renamed to LogPoller later, once we
find a better name for the struct. (logPoller, for consistency with evm?)
and the PR's this depends on have been merged, to avoid merge conflicts

* fix orm_test.go

* Fix lints in log_poller.go & types.go

* Fix lints in log_poller.go

* Add BlockTime & Program to Event data passed to Process()

* WIP

* Use EventTypeProvider interface to represent a codec function for retrieving event types from idl

* remove unimplemented test

* Regenerate mock_orm.go

* Fix lints

* Add IndexedValue type for converting indexable types to be stored in db

* Fill in log.ExpiresAt

* Add sequence # tracking, fix floating point encoding, add tests for NewIndexedValue

* Fix tests

* Remove mock_orm.go

* Use UTC time, and fix lints

* Add tests - WIP

* Refactor to use new codec api, based on review

* Add SeqNum test and fix other tests

Decoder had to be moved from Filter into a separate filters map,
otherwise it caused a lot of issues with assert.Equal for comparing
filters.

* Don't set ExpireAt unless Retention is set

* Move decoder creation before InsertFilter

* Remove unused functions in anchor.go, add validation to TestProess test

* Fix lints

* Remove lp.typeProvider, generate EventIdl for test event

* Remove unused Prefix field from ProgramEvent

* Fix decoding issues

* Add ExtractField function, and fix DecodeSubkeyValues

* MatchingFilters -> matchingFilters, and fix rest of expectedLog fields in TestProcess

* Fix lint errors, rename err2 & err3

* Fix overflows, lints, use default map vals, & add early return to Process

* Remove utils package, add 12-character assertion to Discriminator()

This could only fail if someone updates to a new version of
encoding/base64 which is buggy or breaks backward compatibility with the
current version. Using panic instead of returning an error, since this can
only happen during development and would need to be addressed
immediately.

* Remove TODO, uncomment internal loaders

* Remove ILogPoller interface, add LogPoller interface to chain.go

Also: add GetBlockWithOpts to MultiClient

* Start EncodedLogCollector as soon as filters are loaded

Also:
- Double check that filters are loaded at beginning of Process()
- Add missing return from lp.loadFilters

* Add comment explaining < 12 validation

* Address 2 more PR comments, and remove accidentally added file

* Use ch.multiClient instead of lazy-loader

* revert changes to chain.go

* logpoller.LogPoller -> logpoller.Service

Also: keep retrying if LogCollector fails to start

* Make MatchFiltersForEncodedEvent thread safe

Also: add warnings to methods which are not intended to be called concurrently

---------

Co-authored-by: ilija42 <[email protected]>
  • Loading branch information
reductionista and ilija42 authored Jan 21, 2025
1 parent 90cd5f2 commit b4f7b8c
Show file tree
Hide file tree
Showing 26 changed files with 1,289 additions and 234 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ nodejs 18.20.2
yarn 1.22.19
rust 1.59.0
golang 1.23.3
golangci-lint 1.60.1
golangci-lint 1.61.0
actionlint 1.6.22
shellcheck 0.8.0
helm 3.9.4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/hashicorp/go-plugin v1.6.2
github.com/jackc/pgx/v4 v4.18.3
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51
Expand Down Expand Up @@ -76,6 +75,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/linkedin/goavro/v2 v2.12.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
6 changes: 4 additions & 2 deletions integration-tests/smoke/event_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

contract "github.com/smartcontractkit/chainlink-solana/contracts/generated/log_read_test"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"

"github.com/smartcontractkit/chainlink-solana/integration-tests/solclient"
Expand All @@ -49,7 +50,8 @@ func TestEventLoader(t *testing.T) {
require.NoError(t, err)

rpcURL, wsURL := setupTestValidator(t, privateKey.PublicKey().String())
rpcClient := rpc.New(rpcURL)
cl, rpcClient, err := client.NewTestClient(rpcURL, config.NewDefault(), 1*time.Second, logger.Nop())
require.NoError(t, err)
wsClient, err := ws.Connect(ctx, wsURL)
require.NoError(t, err)

Expand All @@ -62,7 +64,7 @@ func TestEventLoader(t *testing.T) {
parser := &printParser{t: t}
sender := newLogSender(t, rpcClient, wsClient)
collector := logpoller.NewEncodedLogCollector(
rpcClient,
cl,
parser,
logger.Nop(),
)
Expand Down
16 changes: 16 additions & 0 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ import (
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
txmutils "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils"
)

type LogPoller interface {
Start(context.Context) error
Close() error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, name string) error
}

type Chain interface {
types.ChainService

ID() string
Config() config.Config
LogPoller() LogPoller
TxManager() TxManager
// Reader returns a new Reader from the available list of nodes (if there are multiple, it will randomly select one)
Reader() (client.Reader, error)
Expand Down Expand Up @@ -90,6 +99,7 @@ type chain struct {
services.StateMachine
id string
cfg *config.TOMLConfig
lp LogPoller
txm *txm.Txm
balanceMonitor services.Service
lggr logger.Logger
Expand Down Expand Up @@ -312,6 +322,8 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L
bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
}

// TODO: import typeProvider function from codec package and pass to constructor
ch.lp = logpoller.New(logger.Sugared(logger.Named(lggr, "LogPoller")), logpoller.NewORM(ch.ID(), ds, lggr), ch.multiClient)
ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
Expand Down Expand Up @@ -401,6 +413,10 @@ func (c *chain) Config() config.Config {
return c.cfg
}

func (c *chain) LogPoller() LogPoller {
return c.lp
}

func (c *chain) TxManager() TxManager {
return c.txm
}
Expand Down
25 changes: 21 additions & 4 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Reader interface {
GetTransaction(ctx context.Context, txHash solana.Signature) (*rpc.GetTransactionResult, error)
GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error)
GetBlocksWithLimit(ctx context.Context, startSlot uint64, limit uint64) (*rpc.BlocksResult, error)
GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)
GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error)
GetSignaturesForAddressWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)
}
Expand Down Expand Up @@ -73,18 +74,25 @@ type Client struct {
requestGroup *singleflight.Group
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
// Return both the client and the underlying rpc client for testing
func NewTestClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, *rpc.Client, error) {
rpcClient := Client{
url: endpoint,
rpc: rpc.New(endpoint),
skipPreflight: cfg.SkipPreflight(),
commitment: cfg.Commitment(),
maxRetries: cfg.MaxRetries(),
txTimeout: cfg.TxTimeout(),
contextDuration: requestTimeout,
log: log,
requestGroup: &singleflight.Group{},
}, nil
}
rpcClient.rpc = rpc.New(endpoint)
return &rpcClient, rpcClient.rpc, nil
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
rpcClient, _, err := NewTestClient(endpoint, cfg, requestTimeout, log)
return rpcClient, err
}

func (c *Client) latency(name string) func() {
Expand Down Expand Up @@ -339,6 +347,15 @@ func (c *Client) GetLatestBlockHeight(ctx context.Context) (uint64, error) {
return v.(uint64), err
}

func (c *Client) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
// get block based on slot with custom options set
done := c.latency("get_block_with_opts")
defer done()
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()
return c.rpc.GetBlockWithOpts(ctx, slot, opts)
}

func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) {
done := c.latency("get_block")
defer done()
Expand Down
60 changes: 60 additions & 0 deletions pkg/solana/client/mocks/reader_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/solana/client/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,12 @@ func (m *MultiClient) GetSignaturesForAddressWithOpts(ctx context.Context, addr

return r.GetSignaturesForAddressWithOpts(ctx, addr, opts)
}

func (m *MultiClient) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
r, err := m.getClient()
if err != nil {
return nil, err
}

return r.GetBlockWithOpts(ctx, slot, opts)
}
33 changes: 33 additions & 0 deletions pkg/solana/codec/solana.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,36 @@ func saveDependency(refs *codecRefs, parent, child string) {

refs.dependencies[parent] = append(deps, child)
}
func NewIDLEventCodec(idl IDL, builder commonencodings.Builder) (commontypes.RemoteCodec, error) {
typeCodecs := make(commonencodings.LenientCodecFromTypeCodec)
refs := &codecRefs{
builder: builder,
codecs: make(map[string]commonencodings.TypeCodec),
typeDefs: idl.Types,
dependencies: make(map[string][]string),
}

for _, event := range idl.Events {
name, instCodec, err := asStruct(eventFieldsAsStandardFields(event.Fields), refs, event.Name, false, false)
if err != nil {
return nil, err
}

typeCodecs[name] = instCodec
}

return typeCodecs, nil
}

func eventFieldsAsStandardFields(event []IdlEventField) []IdlField {
output := make([]IdlField, len(event))

for idx := range output {
output[idx] = IdlField{
Name: event[idx].Name,
Type: event[idx].Type,
}
}

return output
}
14 changes: 14 additions & 0 deletions pkg/solana/logpoller/discriminator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package logpoller

import (
"crypto/sha256"
"fmt"
)

const DiscriminatorLength = 8

func Discriminator(namespace, name string) [DiscriminatorLength]byte {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s:%s", namespace, name)))
return [DiscriminatorLength]byte(h.Sum(nil)[:DiscriminatorLength])
}
Loading

0 comments on commit b4f7b8c

Please sign in to comment.