Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit/tokenprice: add async tokenprice observer #668

Merged
merged 6 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ccip-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ jobs:
name: "Messaging Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_batching_test.go -timeout 12m -test.parallel=2 -count=1 -json
name: "Batching Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_gas_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json
name: "Gas Price Updates Test"
# TODO: this can only run in docker for now, switch to in-memory and uncomment
# - cmd: cd integration-tests/smoke/ccip && go test ccip_token_price_updates_test.go -timeout 12m -test.parallel=2 -count=1 -json
# name: "Token Price Updates Test"
- cmd: cd integration-tests/smoke/ccip && go test ccip_reader_test.go -timeout 5m -test.parallel=1 -count=1 -json
name: "CCIPReader Test"

Expand Down
6 changes: 3 additions & 3 deletions commit/chainfee/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func (p *processor) Observation(
) (Observation, error) {
lggr := logutil.WithContextValues(ctx, p.lggr)

feeComponents := p.obs.getChainsFeeComponents(ctx)
nativeTokenPrices := p.obs.getNativeTokenPrices(ctx)
chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx)
feeComponents := p.obs.getChainsFeeComponents(ctx, lggr)
nativeTokenPrices := p.obs.getNativeTokenPrices(ctx, lggr)
chainFeeUpdates := p.obs.getChainFeePriceUpdates(ctx, lggr)
fChain := p.observeFChain(lggr)
now := time.Now().UTC()

Expand Down
69 changes: 43 additions & 26 deletions commit/chainfee/observation_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/pkg/logutil"
ccipreader "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
)

type observer interface {
getChainsFeeComponents(ctx context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents
getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt
getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update
getChainsFeeComponents(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]types.ChainFeeComponents
getNativeTokenPrices(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]cciptypes.BigInt
getChainFeePriceUpdates(ctx context.Context, lggr logger.Logger) map[cciptypes.ChainSelector]Update
close()
}

Expand All @@ -27,11 +28,9 @@ type baseObserver struct {
oracleID commontypes.OracleID
destChain cciptypes.ChainSelector
ccipReader ccipreader.CCIPReader
lggr logger.Logger
}

func newBaseObserver(
lggr logger.Logger,
ccipReader ccipreader.CCIPReader,
destChain cciptypes.ChainSelector,
oracleID commontypes.OracleID,
Expand All @@ -42,34 +41,40 @@ func newBaseObserver(
oracleID: oracleID,
destChain: destChain,
ccipReader: ccipReader,
lggr: lggr,
}
}

func (o *baseObserver) getChainsFeeComponents(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]types.ChainFeeComponents {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err)
lggr.Errorw("failed to get supported chains unable to get chains fee components", "err", err)
return map[cciptypes.ChainSelector]types.ChainFeeComponents{}
}
return o.ccipReader.GetChainsFeeComponents(ctx, supportedChains)
}

func (o *baseObserver) getNativeTokenPrices(ctx context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
func (o *baseObserver) getNativeTokenPrices(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]cciptypes.BigInt {
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err)
lggr.Errorw("failed to get supported chains unable to get native token prices", "err", err)
return map[cciptypes.ChainSelector]cciptypes.BigInt{}
}
return o.ccipReader.GetWrappedNativeTokenPriceUSD(ctx, supportedChains)
}

func (o *baseObserver) getChainFeePriceUpdates(ctx context.Context) map[cciptypes.ChainSelector]Update {
supportedChains, err := o.getSupportedChains(o.lggr, o.cs, o.oracleID, o.destChain)
func (o *baseObserver) getChainFeePriceUpdates(
ctx context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]Update {
supportedChains, err := o.getSupportedChains(lggr, o.cs, o.oracleID, o.destChain)
if err != nil {
o.lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err)
lggr.Errorw("failed to get supported chains unable to get chain fee price updates", "err", err)
return map[cciptypes.ChainSelector]Update{}
}
return feeUpdatesFromTimestampedBig(o.ccipReader.GetChainFeePriceUpdate(ctx, supportedChains))
Expand Down Expand Up @@ -120,13 +125,13 @@ func newAsyncObserver(

obs := &asyncObserver{
baseObserver: baseObserver,
lggr: lggr,
lggr: logutil.WithComponent(lggr, "chainfeeAsyncObserver"),
cancelFunc: nil,
mu: &sync.RWMutex{},
}

ticker := time.NewTicker(tickDur)
lggr.Debugw("async observer started", "tickDur", tickDur, "syncTimeout", syncTimeout)
lggr.Debugw("async chainfee observer started", "tickDur", tickDur, "syncTimeout", syncTimeout)
obs.start(ctx, ticker.C, syncTimeout)

obs.cancelFunc = func() {
Expand All @@ -151,7 +156,7 @@ func (o *asyncObserver) start(ctx context.Context, ticker <-chan time.Time, sync
}

func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
o.lggr.Debugw("async observer is syncing", "syncTimeout", syncTimeout)
o.lggr.Debugw("async chainfee observer is syncing", "syncTimeout", syncTimeout)
ctxSync, cf := context.WithTimeout(ctx, syncTimeout)
defer cf()

Expand All @@ -162,7 +167,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "chainsFeeComponents",
op: func(ctx context.Context) {
chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx)
chainsFeeComponents := o.baseObserver.getChainsFeeComponents(ctx, o.lggr)
o.mu.Lock()
o.chainsFeeComponents = chainsFeeComponents
o.mu.Unlock()
Expand All @@ -171,7 +176,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "nativeTokenPrices",
op: func(ctx context.Context) {
nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx)
nativeTokenPrices := o.baseObserver.getNativeTokenPrices(ctx, o.lggr)
o.mu.Lock()
o.nativeTokenPrices = nativeTokenPrices
o.mu.Unlock()
Expand All @@ -180,7 +185,7 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "chainFeePriceUpdates",
op: func(ctx context.Context) {
chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx)
chainFeePriceUpdates := o.baseObserver.getChainFeePriceUpdates(ctx, o.lggr)
o.mu.Lock()
o.chainFeePriceUpdates = chainFeePriceUpdates
o.mu.Unlock()
Expand All @@ -191,37 +196,49 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
wg := &sync.WaitGroup{}
wg.Add(len(syncOps))
for _, op := range syncOps {
go o.applySyncOp(ctxSync, o.lggr, op.id, wg, op.op)
go o.applySyncOp(ctxSync, op.id, wg, op.op)
}
wg.Wait()
}

// applySyncOp applies the given operation synchronously.
func (o *asyncObserver) applySyncOp(
ctx context.Context, lggr logger.Logger, id string, wg *sync.WaitGroup, op func(ctx context.Context)) {
ctx context.Context, id string, wg *sync.WaitGroup, op func(ctx context.Context)) {
defer wg.Done()
tStart := time.Now()
o.lggr.Debugw("async observer applying sync operation", "id", id)
op(ctx)
lggr.Debugw("async observer has applied the sync operation",
o.lggr.Debugw("async observer has applied the sync operation",
"id", id, "duration", time.Since(tStart))
}

func (o *asyncObserver) getChainsFeeComponents(_ context.Context) map[cciptypes.ChainSelector]types.ChainFeeComponents {
func (o *asyncObserver) getChainsFeeComponents(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]types.ChainFeeComponents {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getChainsFeeComponents returning cached value", "numFeeComponents", len(o.chainsFeeComponents))
return o.chainsFeeComponents
}

func (o *asyncObserver) getNativeTokenPrices(_ context.Context) map[cciptypes.ChainSelector]cciptypes.BigInt {
func (o *asyncObserver) getNativeTokenPrices(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]cciptypes.BigInt {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getNativeTokenPrices returning cached value", "numPrices", len(o.nativeTokenPrices))
return o.nativeTokenPrices
}

func (o *asyncObserver) getChainFeePriceUpdates(_ context.Context) map[cciptypes.ChainSelector]Update {
func (o *asyncObserver) getChainFeePriceUpdates(
_ context.Context,
lggr logger.Logger,
) map[cciptypes.ChainSelector]Update {
o.mu.RLock()
defer o.mu.RUnlock()
lggr.Debugw("getChainFeePriceUpdates returning cached value", "numUpdates", len(o.chainFeePriceUpdates))
return o.chainFeePriceUpdates
}

Expand Down
4 changes: 2 additions & 2 deletions commit/chainfee/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func Test_processor_Observation(t *testing.T) {
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: plugincommon2.NoopReporter{},
obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs),
obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs),
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down Expand Up @@ -323,7 +323,7 @@ func Test_unique_chain_filter_in_Observation(t *testing.T) {
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: plugincommon2.NoopReporter{},
obs: newBaseObserver(lggr, ccipReader, tc.dstChain, oracleID, cs),
obs: newBaseObserver(ccipReader, tc.dstChain, oracleID, cs),
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down
1 change: 0 additions & 1 deletion commit/chainfee/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewProcessor(
) plugincommon.PluginProcessor[Query, Observation, Outcome] {
var obs observer
baseObs := newBaseObserver(
lggr,
ccipReader,
destChain,
oracleID,
Expand Down
1 change: 1 addition & 0 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ func defaultNodeParams(t *testing.T) SetupNodeParams {
InflightPriceCheckRetries: 10,
MerkleRootAsyncObserverDisabled: true, // we want to keep it disabled since this test is deterministic
ChainFeeAsyncObserverDisabled: true,
TokenPriceAsyncObserverDisabled: true,
}

reportingCfg := ocr3types.ReportingPluginConfig{F: 1, ConfigDigest: digest}
Expand Down
Loading
Loading