Skip to content

Commit

Permalink
commit/tokenprice: add async tokenprice observer (#668)
Browse files Browse the repository at this point in the history
* commit/tokenprice: add async tokenprice observer

* contextual logging for chainfee and tokenprice

* simplify close

* add gp and tp int tests to ci

* rm tokenprice test

can only run in docker at the moment
  • Loading branch information
makramkd authored Feb 26, 2025
1 parent fdcfd13 commit f8239cb
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 76 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ccip-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ jobs:
name: "Messaging Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_batching_test.go -timeout 12m -test.parallel=2 -count=1 -json
name: "Batching Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_gas_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json
name: "Gas Price Updates Test"
# TODO: this can only run in docker for now, switch to in-memory and uncomment
# - cmd: cd integration-tests/smoke/ccip && go test ccip_token_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json
# name: "Token Price Updates Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_reader_test.go -timeout 5m -test.parallel=1 -count=1 -json
name: "CCIPReader Test"

Expand Down
6 changes: 3 additions & 3 deletions commit/chainfee/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func (p *processor) Observation(
) (Observation, error) {
lggr := logutil.WithContextValues(ctx, p.lggr)

feeComponents := p.obs.getChainsFeeComponents(ctx)
nativeTokenPrices := p.obs.getNativeTokenPrices(ctx)
chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx)
feeComponents := p.obs.getChainsFeeComponents(ctx, lggr)
nativeTokenPrices := p.obs.getNativeTokenPrices(ctx, lggr)
chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx, lggr)
fChain := p.observeFChain(lggr)
now := time.Now().UTC()

Expand Down
69 changes: 43 additions & 26 deletions commit/chainfee/observation_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/pkg/logutil"
ccipreader "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
)

type observer interface {
getChainsFeeComponents(ctx context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents
getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt
getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update
getChainsFeeComponents(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]types.ChainFeeComponents
getNativeTokenPrices(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]cciptypes.BigInt
getChainFeePriceUpdates(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]Update
close()
}

Expand All @@ -27,11 +28,9 @@ type baseObserver struct {
oracleID commontypes.OracleID
destChain cciptypes.ChainSelector
ccipReader ccipreader.CCIPReader
lggr logger.Logger
}

func newBaseObserver(
lggr logger.Logger,
ccipReader ccipreader.CCIPReader,
destChain cciptypes.ChainSelector,
oracleID commontypes.OracleID,
Expand All @@ -42,34 +41,40 @@ func newBaseObserver(
oracleID: oracleID,
destChain: destChain,
ccipReader: ccipReader,
lggr: lggr,
}
}

func (o *baseObserver) getChainsFeeComponents(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]types.ChainFeeComponents {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err)
lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err)
return map[cciptypes.ChainSelector]types.ChainFeeComponents{}
}
return o.ccipReader.GetChainsFeeComponents(ctx, supportedChains)
}

func (o *baseObserver) getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
func (o *baseObserver) getNativeTokenPrices(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]cciptypes.BigInt {
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err)
lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err)
return map[cciptypes.ChainSelector]cciptypes.BigInt{}
}
return o.ccipReader.GetWrappedNativeTokenPriceUSD(ctx, supportedChains)
}

func (o *baseObserver) getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
func (o *baseObserver) getChainFeePriceUpdates(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]Update {
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err)
lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err)
return map[cciptypes.ChainSelector]Update{}
}
return feeUpdatesFromTimestampedBig(o.ccipReader.GetChainFeePriceUpdate(ctx, supportedChains))
Expand Down Expand Up @@ -120,13 +125,13 @@ func newAsyncObserver(

obs := &asyncObserver{
baseObserver: baseObserver,
lggr: lggr,
lggr: logutil.WithComponent(lggr, "chainfeeAsyncObserver"),
cancelFunc: nil,
mu: &sync.RWMutex{},
}

ticker := time.NewTicker(tickDur)
lggr.Debugw("async observer started", "tickDur", tickDur, "syncTimeout", syncTimeout)
lggr.Debugw("async chainfee observer started", "tickDur", tickDur, "syncTimeout", syncTimeout)
obs.start(ctx, ticker.C, syncTimeout)

obs.cancelFunc = func() {
Expand All @@ -151,7 +156,7 @@ func (o *asyncObserver) start(ctx context.Context, ticker <-chan time.Time, sync
}

func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
o.lggr.Debugw("async observer is syncing", "syncTimeout", syncTimeout)
o.lggr.Debugw("async chainfee observer is syncing", "syncTimeout", syncTimeout)
ctxSync, cf := context.WithTimeout(ctx, syncTimeout)
defer cf()

Expand All @@ -162,7 +167,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "chainsFeeComponents",
op: func(ctx context.Context) {
chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx)
chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx, o.lggr)
o.mu.Lock()
o.chainsFeeComponents = chainsFeeComponents
o.mu.Unlock()
Expand All @@ -171,7 +176,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "nativeTokenPrices",
op: func(ctx context.Context) {
nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx)
nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx, o.lggr)
o.mu.Lock()
o.nativeTokenPrices = nativeTokenPrices
o.mu.Unlock()
Expand All @@ -180,7 +185,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "chainFeePriceUpdates",
op: func(ctx context.Context) {
chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx)
chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx, o.lggr)
o.mu.Lock()
o.chainFeePriceUpdates = chainFeePriceUpdates
o.mu.Unlock()
Expand All @@ -191,37 +196,49 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
wg := &sync.WaitGroup{}
wg.Add(len(syncOps))
for _, op := range syncOps {
go o.applySyncOp(ctxSync, o.lggr, op.id, wg, op.op)
go o.applySyncOp(ctxSync, op.id, wg, op.op)
}
wg.Wait()
}

// applySyncOp applies the given operation synchronously.
func (o *asyncObserver) applySyncOp(
ctx context.Context, lggr logger.Logger, id string, wg *sync.WaitGroup, op func(ctx context.Context)) {
ctx context.Context, id string, wg *sync.WaitGroup, op func(ctx context.Context)) {
defer wg.Done()
tStart := time.Now()
o.lggr.Debugw("async observer applying sync operation", "id", id)
op(ctx)
lggr.Debugw("async observer has applied the sync operation",
o.lggr.Debugw("async observer has applied the sync operation",
"id", id, "duration", time.Since(tStart))
}

func (o *asyncObserver) getChainsFeeComponents(_ context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents {
func (o *asyncObserver) getChainsFeeComponents(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]types.ChainFeeComponents {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getChainsFeeComponents returning cached value", "numFeeComponents", len(o.chainsFeeComponents))
return o.chainsFeeComponents
}

func (o *asyncObserver) getNativeTokenPrices(_ context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt {
func (o *asyncObserver) getNativeTokenPrices(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]cciptypes.BigInt {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getNativeTokenPrices returning cached value", "numPrices", len(o.nativeTokenPrices))
return o.nativeTokenPrices
}

func (o *asyncObserver) getChainFeePriceUpdates(_ context.Context) map[cciptypes.ChainSelector]Update {
func (o *asyncObserver) getChainFeePriceUpdates(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]Update {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getChainFeePriceUpdates returning cached value", "numUpdates", len(o.chainFeePriceUpdates))
return o.chainFeePriceUpdates
}

Expand Down
4 changes: 2 additions & 2 deletions commit/chainfee/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func Test_processor_Observation(t *testing.T) {
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: plugincommon2.NoopReporter{},
obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs),
obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs),
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down Expand Up @@ -323,7 +323,7 @@ func Test_unique_chain_filter_in_Observation(t *testing.T) {
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: plugincommon2.NoopReporter{},
obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs),
obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs),
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down
1 change: 0 additions & 1 deletion commit/chainfee/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewProcessor(
) plugincommon.PluginProcessor[Query, Observation, Outcome] {
var obs observer
baseObs := newBaseObserver(
lggr,
ccipReader,
destChain,
oracleID,
Expand Down
1 change: 1 addition & 0 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ func defaultNodeParams(t *testing.T) SetupNodeParams {
InflightPriceCheckRetries: 10,
MerkleRootAsyncObserverDisabled: true, // we want to keep it disabled since this test is deterministic
ChainFeeAsyncObserverDisabled: true,
TokenPriceAsyncObserverDisabled: true,
}

reportingCfg := ocr3types.ReportingPluginConfig{F: 1, ConfigDigest: digest}
Expand Down
Loading

0 comments on commit f8239cb

Please sign in to comment.