From f8239cbbbcad0fd006417c57f903d5b5277a073e Mon Sep 17 00:00:00 2001 From: Makram Date: Wed, 26 Feb 2025 20:53:25 +0200 Subject: [PATCH] commit/tokenprice: add async tokenprice observer (#668) * commit/tokenprice: add async tokenprice observer * contextual logging for chainfee and tokenprice * simplify close * add gp and tp int tests to ci * rm tokenprice test can only run in docker at the moment --- .github/workflows/ccip-integration-test.yml | 5 + commit/chainfee/observation.go | 6 +- commit/chainfee/observation_async.go | 69 ++++--- commit/chainfee/observation_test.go | 4 +- commit/chainfee/processor.go | 1 - commit/plugin_e2e_test.go | 1 + commit/tokenprice/observation.go | 202 ++++++++++++++++++-- commit/tokenprice/observation_test.go | 57 +++--- commit/tokenprice/processor.go | 21 ++ pluginconfig/commit.go | 18 ++ pluginconfig/commit_test.go | 17 ++ 11 files changed, 325 insertions(+), 76 deletions(-) diff --git a/.github/workflows/ccip-integration-test.yml b/.github/workflows/ccip-integration-test.yml index 602c9ae78..c8f3ec1d9 100644 --- a/.github/workflows/ccip-integration-test.yml +++ b/.github/workflows/ccip-integration-test.yml @@ -29,6 +29,11 @@ jobs: name: "Messaging Test" - cmd: cd integration-tests/smoke/ccip && go test ccip_batching_test.go -timeout 12m -test.parallel=2 -count=1 -json name: "Batching Test" + - cmd: cd integration-tests/smoke/ccip && go test ccip_gas_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json + name: "Gas Price Updates Test" + # TODO: this can only run in docker for now, switch to in-memory and uncomment + # - cmd: cd integration-tests/smoke/ccip && go test ccip_token_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json + # name: "Token Price Updates Test" - cmd: cd integration-tests/smoke/ccip && go test ccip_reader_test.go -timeout 5m -test.parallel=1 -count=1 -json name: "CCIPReader Test" diff --git a/commit/chainfee/observation.go b/commit/chainfee/observation.go index 22acb5816..85e74183a 100644 --- a/commit/chainfee/observation.go +++ b/commit/chainfee/observation.go @@ -29,9 +29,9 @@ func (p *processor) Observation( ) (Observation, error) { lggr := logutil.WithContextValues(ctx, p.lggr) - feeComponents := p.obs.getChainsFeeComponents(ctx) - nativeTokenPrices := p.obs.getNativeTokenPrices(ctx) - chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx) + feeComponents := p.obs.getChainsFeeComponents(ctx, lggr) + nativeTokenPrices := p.obs.getNativeTokenPrices(ctx, lggr) + chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx, lggr) fChain := p.observeFChain(lggr) now := time.Now().UTC() diff --git a/commit/chainfee/observation_async.go b/commit/chainfee/observation_async.go index 60ecf93da..466470abf 100644 --- a/commit/chainfee/observation_async.go +++ b/commit/chainfee/observation_async.go @@ -11,14 +11,15 @@ import ( "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon" + "github.com/smartcontractkit/chainlink-ccip/pkg/logutil" ccipreader "github.com/smartcontractkit/chainlink-ccip/pkg/reader" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" ) type observer interface { - getChainsFeeComponents(ctx context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents - getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt - getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update + getChainsFeeComponents(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]types.ChainFeeComponents + getNativeTokenPrices(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]cciptypes.BigInt + getChainFeePriceUpdates(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]Update close() } @@ -27,11 +28,9 @@ type baseObserver struct { oracleID commontypes.OracleID destChain cciptypes.ChainSelector ccipReader ccipreader.CCIPReader - lggr logger.Logger } func newBaseObserver( - lggr logger.Logger, ccipReader ccipreader.CCIPReader, destChain cciptypes.ChainSelector, oracleID commontypes.OracleID, @@ -42,34 +41,40 @@ func newBaseObserver( oracleID: oracleID, destChain: destChain, ccipReader: ccipReader, - lggr: lggr, } } func (o *baseObserver) getChainsFeeComponents( ctx context.Context, + lggr logger.Logger, ) map[cciptypes.ChainSelector]types.ChainFeeComponents { - supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain) + supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain) if err != nil { - o.lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err) + lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err) return map[cciptypes.ChainSelector]types.ChainFeeComponents{} } return o.ccipReader.GetChainsFeeComponents(ctx, supportedChains) } -func (o *baseObserver) getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt { - supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain) +func (o *baseObserver) getNativeTokenPrices( + ctx context.Context, + lggr logger.Logger, +) map[cciptypes.ChainSelector]cciptypes.BigInt { + supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain) if err != nil { - o.lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err) + lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err) return map[cciptypes.ChainSelector]cciptypes.BigInt{} } return o.ccipReader.GetWrappedNativeTokenPriceUSD(ctx, supportedChains) } -func (o *baseObserver) getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update { - supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain) +func (o *baseObserver) getChainFeePriceUpdates( + ctx context.Context, + lggr logger.Logger, +) map[cciptypes.ChainSelector]Update { + supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain) if err != nil { - o.lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err) + lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err) return map[cciptypes.ChainSelector]Update{} } return feeUpdatesFromTimestampedBig(o.ccipReader.GetChainFeePriceUpdate(ctx, supportedChains)) @@ -120,13 +125,13 @@ func newAsyncObserver( obs := &asyncObserver{ baseObserver: baseObserver, - lggr: lggr, + lggr: logutil.WithComponent(lggr, "chainfeeAsyncObserver"), cancelFunc: nil, mu: &sync.RWMutex{}, } ticker := time.NewTicker(tickDur) - lggr.Debugw("async observer started", "tickDur", tickDur, "syncTimeout", syncTimeout) + lggr.Debugw("async chainfee observer started", "tickDur", tickDur, "syncTimeout", syncTimeout) obs.start(ctx, ticker.C, syncTimeout) obs.cancelFunc = func() { @@ -151,7 +156,7 @@ func (o *asyncObserver) start(ctx context.Context, ticker <-chan time.Time, sync } func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { - o.lggr.Debugw("async observer is syncing", "syncTimeout", syncTimeout) + o.lggr.Debugw("async chainfee observer is syncing", "syncTimeout", syncTimeout) ctxSync, cf := context.WithTimeout(ctx, syncTimeout) defer cf() @@ -162,7 +167,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { { id: "chainsFeeComponents", op: func(ctx context.Context) { - chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx) + chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx, o.lggr) o.mu.Lock() o.chainsFeeComponents = chainsFeeComponents o.mu.Unlock() @@ -171,7 +176,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { { id: "nativeTokenPrices", op: func(ctx context.Context) { - nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx) + nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx, o.lggr) o.mu.Lock() o.nativeTokenPrices = nativeTokenPrices o.mu.Unlock() @@ -180,7 +185,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { { id: "chainFeePriceUpdates", op: func(ctx context.Context) { - chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx) + chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx, o.lggr) o.mu.Lock() o.chainFeePriceUpdates = chainFeePriceUpdates o.mu.Unlock() @@ -191,37 +196,49 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { wg := &sync.WaitGroup{} wg.Add(len(syncOps)) for _, op := range syncOps { - go o.applySyncOp(ctxSync, o.lggr, op.id, wg, op.op) + go o.applySyncOp(ctxSync, op.id, wg, op.op) } wg.Wait() } // applySyncOp applies the given operation synchronously. func (o *asyncObserver) applySyncOp( - ctx context.Context, lggr logger.Logger, id string, wg *sync.WaitGroup, op func(ctx context.Context)) { + ctx context.Context, id string, wg *sync.WaitGroup, op func(ctx context.Context)) { defer wg.Done() tStart := time.Now() o.lggr.Debugw("async observer applying sync operation", "id", id) op(ctx) - lggr.Debugw("async observer has applied the sync operation", + o.lggr.Debugw("async observer has applied the sync operation", "id", id, "duration", time.Since(tStart)) } -func (o *asyncObserver) getChainsFeeComponents(_ context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents { +func (o *asyncObserver) getChainsFeeComponents( + _ context.Context, + lggr logger.Logger, +) map[cciptypes.ChainSelector]types.ChainFeeComponents { o.mu.RLock() defer o.mu.RUnlock() + lggr.Debugw("getChainsFeeComponents returning cached value", "numFeeComponents", len(o.chainsFeeComponents)) return o.chainsFeeComponents } -func (o *asyncObserver) getNativeTokenPrices(_ context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt { +func (o *asyncObserver) getNativeTokenPrices( + _ context.Context, + lggr logger.Logger, +) map[cciptypes.ChainSelector]cciptypes.BigInt { o.mu.RLock() defer o.mu.RUnlock() + lggr.Debugw("getNativeTokenPrices returning cached value", "numPrices", len(o.nativeTokenPrices)) return o.nativeTokenPrices } -func (o *asyncObserver) getChainFeePriceUpdates(_ context.Context) map[cciptypes.ChainSelector]Update { +func (o *asyncObserver) getChainFeePriceUpdates( + _ context.Context, + lggr logger.Logger, +) map[cciptypes.ChainSelector]Update { o.mu.RLock() defer o.mu.RUnlock() + lggr.Debugw("getChainFeePriceUpdates returning cached value", "numUpdates", len(o.chainFeePriceUpdates)) return o.chainFeePriceUpdates } diff --git a/commit/chainfee/observation_test.go b/commit/chainfee/observation_test.go index 731d24653..b83f08b58 100644 --- a/commit/chainfee/observation_test.go +++ b/commit/chainfee/observation_test.go @@ -124,7 +124,7 @@ func Test_processor_Observation(t *testing.T) { oracleID: oracleID, homeChain: homeChain, metricsReporter: plugincommon2.NoopReporter{}, - obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs), + obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs), } supportedSet := mapset.NewSet(tc.supportedChains...) @@ -323,7 +323,7 @@ func Test_unique_chain_filter_in_Observation(t *testing.T) { oracleID: oracleID, homeChain: homeChain, metricsReporter: plugincommon2.NoopReporter{}, - obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs), + obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs), } supportedSet := mapset.NewSet(tc.supportedChains...) diff --git a/commit/chainfee/processor.go b/commit/chainfee/processor.go index 8942850c3..6d267c2be 100644 --- a/commit/chainfee/processor.go +++ b/commit/chainfee/processor.go @@ -40,7 +40,6 @@ func NewProcessor( ) plugincommon.PluginProcessor[Query, Observation, Outcome] { var obs observer baseObs := newBaseObserver( - lggr, ccipReader, destChain, oracleID, diff --git a/commit/plugin_e2e_test.go b/commit/plugin_e2e_test.go index 39b9e9dce..c96a907bb 100644 --- a/commit/plugin_e2e_test.go +++ b/commit/plugin_e2e_test.go @@ -979,6 +979,7 @@ func defaultNodeParams(t *testing.T) SetupNodeParams { InflightPriceCheckRetries: 10, MerkleRootAsyncObserverDisabled: true, // we want to keep it disabled since this test is deterministic ChainFeeAsyncObserverDisabled: true, + TokenPriceAsyncObserverDisabled: true, } reportingCfg := ocr3types.ReportingPluginConfig{F: 1, ConfigDigest: digest} diff --git a/commit/tokenprice/observation.go b/commit/tokenprice/observation.go index 8e02af8b1..0b82ddea5 100644 --- a/commit/tokenprice/observation.go +++ b/commit/tokenprice/observation.go @@ -3,15 +3,20 @@ package tokenprice import ( "context" "sort" + "sync" "time" "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon" "github.com/smartcontractkit/chainlink-ccip/internal/plugintypes" "github.com/smartcontractkit/chainlink-ccip/pkg/logutil" + pkgreader "github.com/smartcontractkit/chainlink-ccip/pkg/reader" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" + "github.com/smartcontractkit/chainlink-ccip/pluginconfig" ) func (p *processor) Observation( @@ -21,13 +26,13 @@ func (p *processor) Observation( ) (Observation, error) { lggr := logutil.WithContextValues(ctx, p.lggr) - fChain := p.ObserveFChain(lggr) + fChain := p.observeFChain(lggr) if len(fChain) == 0 { return Observation{}, nil } - feedTokenPrices := p.ObserveFeedTokenPrices(ctx, lggr) - feeQuoterUpdates := p.ObserveFeeQuoterTokenUpdates(ctx, lggr) + feedTokenPrices := p.obs.observeFeedTokenPrices(ctx, lggr) + feeQuoterUpdates := p.obs.observeFeeQuoterTokenUpdates(ctx, lggr) now := time.Now().UTC() lggr.Infow( "observed token prices", @@ -45,7 +50,7 @@ func (p *processor) Observation( return obs, nil } -func (p *processor) ObserveFChain(lggr logger.Logger) map[cciptypes.ChainSelector]int { +func (p *processor) observeFChain(lggr logger.Logger) map[cciptypes.ChainSelector]int { fChain, err := p.homeChain.GetFChain() if err != nil { lggr.Errorw("call to GetFChain failed", "err", err) @@ -54,26 +59,58 @@ func (p *processor) ObserveFChain(lggr logger.Logger) map[cciptypes.ChainSelecto return fChain } -func (p *processor) ObserveFeedTokenPrices(ctx context.Context, lggr logger.Logger) cciptypes.TokenPriceMap { - if p.tokenPriceReader == nil { +type observer interface { + observeFeedTokenPrices(ctx context.Context, lggr logger.Logger) cciptypes.TokenPriceMap + observeFeeQuoterTokenUpdates( + ctx context.Context, + lggr logger.Logger) map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig + close() +} + +type baseObserver struct { + oracleID commontypes.OracleID + tokenPriceReader pkgreader.PriceReader + chainSupport plugincommon.ChainSupport + offChainCfg pluginconfig.CommitOffchainConfig + destChain cciptypes.ChainSelector +} + +func newBaseObserver( + tokenPriceReader pkgreader.PriceReader, + destChain cciptypes.ChainSelector, + oracleID commontypes.OracleID, + chainSupport plugincommon.ChainSupport, + offchainCfg pluginconfig.CommitOffchainConfig, +) *baseObserver { + return &baseObserver{ + oracleID: oracleID, + tokenPriceReader: tokenPriceReader, + chainSupport: chainSupport, + destChain: destChain, + offChainCfg: offchainCfg, + } +} + +func (b *baseObserver) observeFeedTokenPrices(ctx context.Context, lggr logger.Logger) cciptypes.TokenPriceMap { + if b.tokenPriceReader == nil { lggr.Debugw("no token price reader available") return cciptypes.TokenPriceMap{} } - supportedChains, err := p.chainSupport.SupportedChains(p.oracleID) + supportedChains, err := b.chainSupport.SupportedChains(b.oracleID) if err != nil { lggr.Warnw("call to SupportedChains failed", "err", err) return cciptypes.TokenPriceMap{} } - if !supportedChains.Contains(p.offChainCfg.PriceFeedChainSelector) { - lggr.Debugf("oracle does not support feed chain %d", p.offChainCfg.PriceFeedChainSelector) + if !supportedChains.Contains(b.offChainCfg.PriceFeedChainSelector) { + lggr.Debugf("oracle does not support feed chain %d", b.offChainCfg.PriceFeedChainSelector) return cciptypes.TokenPriceMap{} } - tokensToQuery := maps.Keys(p.offChainCfg.TokenInfo) + tokensToQuery := maps.Keys(b.offChainCfg.TokenInfo) lggr.Infow("observing feed token prices", "tokens", tokensToQuery) - tokenPrices, err := p.tokenPriceReader.GetFeedPricesUSD(ctx, tokensToQuery) + tokenPrices, err := b.tokenPriceReader.GetFeedPricesUSD(ctx, tokensToQuery) if err != nil { lggr.Errorw("call to GetFeedPricesUSD failed", "err", err) @@ -83,16 +120,15 @@ func (p *processor) ObserveFeedTokenPrices(ctx context.Context, lggr logger.Logg return tokenPrices } -func (p *processor) ObserveFeeQuoterTokenUpdates( +func (b *baseObserver) observeFeeQuoterTokenUpdates( ctx context.Context, - lggr logger.Logger, -) map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig { - if p.tokenPriceReader == nil { + lggr logger.Logger) map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig { + if b.tokenPriceReader == nil { lggr.Debugw("no token price reader available") return map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig{} } - supportsDestChain, err := p.chainSupport.SupportsDestChain(p.oracleID) + supportsDestChain, err := b.chainSupport.SupportsDestChain(b.oracleID) if err != nil { lggr.Warnw("call to SupportsDestChain failed", "err", err) return map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig{} @@ -102,11 +138,11 @@ func (p *processor) ObserveFeeQuoterTokenUpdates( return map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig{} } - tokensToQuery := maps.Keys(p.offChainCfg.TokenInfo) + tokensToQuery := maps.Keys(b.offChainCfg.TokenInfo) // sort tokens to query to ensure deterministic order sort.Slice(tokensToQuery, func(i, j int) bool { return tokensToQuery[i] < tokensToQuery[j] }) lggr.Infow("observing fee quoter token updates") - priceUpdates, err := p.tokenPriceReader.GetFeeQuoterTokenUpdates(ctx, tokensToQuery, p.destChain) + priceUpdates, err := b.tokenPriceReader.GetFeeQuoterTokenUpdates(ctx, tokensToQuery, b.destChain) if err != nil { lggr.Errorw("call to GetFeeQuoterTokenUpdates failed", "err", err) return map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig{} @@ -123,3 +159,133 @@ func (p *processor) ObserveFeeQuoterTokenUpdates( return tokenUpdates } + +func (b *baseObserver) close() {} + +// asyncObserver wraps baseObserver and periodically syncs the tokenPriceMap and tokenUpdates. +// It is used to avoid blocking the processor when querying the tokenPriceReader. +type asyncObserver struct { + lggr logger.Logger + base *baseObserver + cancelFunc func() + mu sync.RWMutex + + // cached values, only ever read thru mutex. + tokenPriceMap cciptypes.TokenPriceMap + tokenUpdates map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig +} + +func newAsyncObserver( + lggr logger.Logger, + base *baseObserver, + tickDuration, syncTimeout time.Duration, +) *asyncObserver { + ctx, cancel := context.WithCancel(context.Background()) + + obs := &asyncObserver{ + lggr: logutil.WithComponent(lggr, "tokenpriceAsyncObserver"), + base: base, + mu: sync.RWMutex{}, + } + + ticker := time.NewTicker(tickDuration) + lggr.Debugw("async tokenprice observer started", "tickDuration", tickDuration, "syncTimeout", syncTimeout) + obs.start(ctx, ticker.C, syncTimeout) + + obs.cancelFunc = func() { + cancel() + ticker.Stop() + } + + return obs +} + +func (a *asyncObserver) start(ctx context.Context, tickerC <-chan time.Time, syncTimeout time.Duration) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-tickerC: + a.sync(ctx, syncTimeout) + } + } + }() +} + +func (a *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { + a.lggr.Debugw("async tokenprice observer is syncing") + ctxSync, cancel := context.WithTimeout(ctx, syncTimeout) + defer cancel() + + syncOps := []struct { + id string + op func(context.Context) + }{ + { + id: "feedTokenPrices", + op: func(ctx context.Context) { + tokenPriceMap := a.base.observeFeedTokenPrices(ctx, a.lggr) + a.mu.Lock() + a.tokenPriceMap = tokenPriceMap + a.mu.Unlock() + }, + }, + { + id: "feeQuoterTokenUpdates", + op: func(ctx context.Context) { + tokenUpdates := a.base.observeFeeQuoterTokenUpdates(ctx, a.lggr) + a.mu.Lock() + a.tokenUpdates = tokenUpdates + a.mu.Unlock() + }, + }, + } + + wg := &sync.WaitGroup{} + wg.Add(len(syncOps)) + for _, op := range syncOps { + go a.applySyncOp(ctxSync, op.id, wg, op.op) + } + wg.Wait() +} + +// applySyncOp applies the given operation synchronously. +func (a *asyncObserver) applySyncOp( + ctx context.Context, id string, wg *sync.WaitGroup, op func(ctx context.Context)) { + defer wg.Done() + tStart := time.Now() + a.lggr.Debugw("async observer applying sync operation", "id", id) + op(ctx) + a.lggr.Debugw("async observer has applied the sync operation", + "id", id, "duration", time.Since(tStart)) +} + +// observeFeeQuoterTokenUpdates implements observer by returning the cached tokenUpdates. +func (a *asyncObserver) observeFeeQuoterTokenUpdates( + ctx context.Context, + lggr logger.Logger, +) map[cciptypes.UnknownEncodedAddress]plugintypes.TimestampedBig { + a.mu.RLock() + defer a.mu.RUnlock() + lggr.Debugw("observeFeeQuoterTokenUpdates returning cached value", "numUpdates", len(a.tokenUpdates)) + return a.tokenUpdates +} + +// observeFeedTokenPrices implements observer by returning the cached tokenPriceMap. +func (a *asyncObserver) observeFeedTokenPrices( + ctx context.Context, + lggr logger.Logger, +) cciptypes.TokenPriceMap { + a.mu.RLock() + defer a.mu.RUnlock() + lggr.Debugw("observeFeedTokenPrices returning cached value", "numPrices", len(a.tokenPriceMap)) + return a.tokenPriceMap +} + +func (a *asyncObserver) close() { + a.cancelFunc() +} + +var _ observer = &asyncObserver{} +var _ observer = &baseObserver{} diff --git a/commit/tokenprice/observation_test.go b/commit/tokenprice/observation_test.go index 6c3ca6218..b37a13f89 100644 --- a/commit/tokenprice/observation_test.go +++ b/commit/tokenprice/observation_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon" "github.com/smartcontractkit/chainlink-ccip/internal/plugintypes" @@ -37,19 +38,21 @@ func Test_Observation(t *testing.T) { tokenA: plugintypes.NewTimestampedBig(bi100.Int64(), timestamp), tokenB: plugintypes.NewTimestampedBig(bi200.Int64(), timestamp), } + oracleID := commontypes.OracleID(1) + lggr := logger.Test(t) testCases := []struct { name string - getProcessor func(t *testing.T) *processor + getProcessor func(t *testing.T) plugincommon.PluginProcessor[Query, Observation, Outcome] expObs Observation expErr error }{ { name: "Successful observation", - getProcessor: func(t *testing.T) *processor { + getProcessor: func(t *testing.T) plugincommon.PluginProcessor[Query, Observation, Outcome] { chainSupport := common_mock.NewMockChainSupport(t) chainSupport.EXPECT().SupportedChains(mock.Anything).Return( - mapset.NewSet[cciptypes.ChainSelector](feedChainSel, destChainSel), nil, + mapset.NewSet(feedChainSel, destChainSel), nil, ) chainSupport.EXPECT().SupportsDestChain(mock.Anything).Return(true, nil).Maybe() @@ -78,17 +81,17 @@ func Test_Observation(t *testing.T) { nil, ) - return &processor{ - oracleID: 1, - lggr: logger.Test(t), - chainSupport: chainSupport, - tokenPriceReader: tokenPriceReader, - homeChain: homeChain, - offChainCfg: defaultCfg, - destChain: destChainSel, - fRoleDON: f, - metricsReporter: plugincommon.NoopReporter{}, - } + return NewProcessor( + oracleID, + lggr, + defaultCfg, + destChainSel, + chainSupport, + tokenPriceReader, + homeChain, + f, + plugincommon.NoopReporter{}, + ) }, expObs: Observation{ FeedTokenPrices: feedTokenPrices, @@ -99,24 +102,24 @@ func Test_Observation(t *testing.T) { }, { name: "Failed to get FDestChain", - getProcessor: func(t *testing.T) *processor { + getProcessor: func(t *testing.T) plugincommon.PluginProcessor[Query, Observation, Outcome] { homeChain := readermock.NewMockHomeChain(t) homeChain.EXPECT().GetFChain().Return(nil, errors.New("failed to get FChain")) chainSupport := common_mock.NewMockChainSupport(t) tokenPriceReader := readerpkg_mock.NewMockPriceReader(t) - return &processor{ - oracleID: 1, - lggr: logger.Test(t), - chainSupport: chainSupport, - tokenPriceReader: tokenPriceReader, - homeChain: homeChain, - destChain: destChainSel, - offChainCfg: defaultCfg, - fRoleDON: f, - metricsReporter: plugincommon.NoopReporter{}, - } + return NewProcessor( + oracleID, + lggr, + defaultCfg, + destChainSel, + chainSupport, + tokenPriceReader, + homeChain, + f, + plugincommon.NoopReporter{}, + ) }, expObs: Observation{}, }, @@ -155,4 +158,6 @@ var defaultCfg = pluginconfig.CommitOffchainConfig{ DeviationPPB: cciptypes.BigInt{Int: big.NewInt(1)}}, }, PriceFeedChainSelector: feedChainSel, + // Have this disabled for testing purposes + TokenPriceAsyncObserverDisabled: true, } diff --git a/commit/tokenprice/processor.go b/commit/tokenprice/processor.go index cead6a220..c1ebea6bf 100644 --- a/commit/tokenprice/processor.go +++ b/commit/tokenprice/processor.go @@ -26,6 +26,7 @@ type processor struct { homeChain reader.HomeChain metricsReporter plugincommon.MetricsReporter fRoleDON int + obs observer } func NewProcessor( @@ -39,6 +40,24 @@ func NewProcessor( fRoleDON int, metricsReporter plugincommon.MetricsReporter, ) plugincommon.PluginProcessor[Query, Observation, Outcome] { + var obs observer + baseObs := newBaseObserver( + tokenPriceReader, + destChain, + oracleID, + chainSupport, + offChainCfg, + ) + if !offChainCfg.TokenPriceAsyncObserverDisabled { + obs = newAsyncObserver( + lggr, + baseObs, + offChainCfg.TokenPriceAsyncObserverSyncFreq.Duration(), + offChainCfg.TokenPriceAsyncObserverSyncTimeout.Duration(), + ) + } else { + obs = baseObs + } p := &processor{ oracleID: oracleID, lggr: lggr, @@ -49,6 +68,7 @@ func NewProcessor( homeChain: homeChain, fRoleDON: fRoleDON, metricsReporter: metricsReporter, + obs: obs, } return plugincommon.NewTrackedProcessor(lggr, p, processorsLabel, metricsReporter) } @@ -93,6 +113,7 @@ func (p *processor) Outcome( } func (p *processor) Close() error { + p.obs.close() return nil } diff --git a/pluginconfig/commit.go b/pluginconfig/commit.go index c85a15334..6ba5b6b71 100644 --- a/pluginconfig/commit.go +++ b/pluginconfig/commit.go @@ -146,6 +146,15 @@ type CommitOffchainConfig struct { // ChainFeeAsyncObserverSyncTimeout defines the timeout for a single sync operation (e.g. fetch token prices). ChainFeeAsyncObserverSyncTimeout time.Duration `json:"chainFeeAsyncObserverSyncTimeout"` + + // TokenPriceAsyncObserverDisabled defines whether the async observer should be disabled. Default it is enabled. + TokenPriceAsyncObserverDisabled bool `json:"tokenPriceAsyncObserverDisabled"` + + // TokenPriceAsyncObserverSyncFreq defines how frequently the async token price observer should sync. + TokenPriceAsyncObserverSyncFreq commonconfig.Duration `json:"tokenPriceAsyncObserverSyncFreq"` + + // TokenPriceAsyncObserverSyncTimeout defines the timeout for a single sync operation (e.g. fetch token prices). + TokenPriceAsyncObserverSyncTimeout commonconfig.Duration `json:"tokenPriceAsyncObserverSyncTimeout"` } //nolint:gocyclo // it is considered ok since we don't have complicated logic here @@ -200,6 +209,15 @@ func (c *CommitOffchainConfig) applyDefaults() { c.ChainFeeAsyncObserverSyncTimeout = defaultAsyncObserverSyncTimeout } } + + if !c.TokenPriceAsyncObserverDisabled { + if c.TokenPriceAsyncObserverSyncFreq.Duration() == 0 { + c.TokenPriceAsyncObserverSyncFreq = *commonconfig.MustNewDuration(defaultAsyncObserverSyncFreq) + } + if c.TokenPriceAsyncObserverSyncTimeout.Duration() == 0 { + c.TokenPriceAsyncObserverSyncTimeout = *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout) + } + } } //nolint:gocyclo // it is considered ok since we don't have complicated logic here diff --git a/pluginconfig/commit_test.go b/pluginconfig/commit_test.go index 363ddabe0..5be278374 100644 --- a/pluginconfig/commit_test.go +++ b/pluginconfig/commit_test.go @@ -131,6 +131,9 @@ func TestCommitOffchainConfig_Validate(t *testing.T) { ChainFeeAsyncObserverDisabled bool ChainFeeAsyncObserverSyncFreq time.Duration ChainFeeAsyncObserverSyncTimeout time.Duration + TokenPriceAsyncObservedDisabled bool + TokenPriceAsyncObserverSyncFreq commonconfig.Duration + TokenPriceAsyncObserverSyncTimeout commonconfig.Duration } remoteTokenAddress := rand.RandomAddress() aggregatorAddress := rand.RandomAddress() @@ -160,6 +163,8 @@ func TestCommitOffchainConfig_Validate(t *testing.T) { MerkleRootAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(defaultAsyncObserverSyncFreq), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout), }, false, }, @@ -177,6 +182,8 @@ func TestCommitOffchainConfig_Validate(t *testing.T) { MerkleRootAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(defaultAsyncObserverSyncFreq), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout), }, false, }, @@ -389,6 +396,8 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { MerkleRootAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, ChainFeeAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(defaultAsyncObserverSyncFreq), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout), }, }, { @@ -397,6 +406,7 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { RMNEnabled: true, MerkleRootAsyncObserverDisabled: true, ChainFeeAsyncObserverDisabled: true, + TokenPriceAsyncObserverDisabled: true, }, expected: CommitOffchainConfig{ RMNEnabled: true, @@ -410,6 +420,7 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { InflightPriceCheckRetries: defaultInflightPriceCheckRetries, MerkleRootAsyncObserverDisabled: true, ChainFeeAsyncObserverDisabled: true, + TokenPriceAsyncObserverDisabled: true, }, }, { @@ -423,6 +434,8 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { TransmissionDelayMultiplier: 20, InflightPriceCheckRetries: 5, ChainFeeAsyncObserverSyncFreq: 12 * time.Minute, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(10 * time.Minute), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(10 * time.Second), }, expected: CommitOffchainConfig{ RMNEnabled: true, @@ -438,6 +451,8 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { MerkleRootAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncFreq: 12 * time.Minute, ChainFeeAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(10 * time.Minute), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(10 * time.Second), }, }, { @@ -463,6 +478,8 @@ func TestCommitOffchainConfig_ApplyDefaults(t *testing.T) { MerkleRootAsyncObserverSyncTimeout: 10 * time.Minute, ChainFeeAsyncObserverSyncFreq: defaultAsyncObserverSyncFreq, ChainFeeAsyncObserverSyncTimeout: defaultAsyncObserverSyncTimeout, + TokenPriceAsyncObserverSyncFreq: *commonconfig.MustNewDuration(defaultAsyncObserverSyncFreq), + TokenPriceAsyncObserverSyncTimeout: *commonconfig.MustNewDuration(defaultAsyncObserverSyncTimeout), }, }, }