Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log NewReportingPlugin errors for Commit and Exec plugins #1187

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ type reportingPluginAndInfo struct {
func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -86,35 +89,35 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to
// function, hence why we can only keep retrying it until it succeeds.
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider adding some timeout

destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.ChangeConfig error: %w", err)
}

priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("GenericAddrToEvm error: %w", err)
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("UpdateDynamicReaders error: %w", err)
}

pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.OffchainConfig error: %w", err)
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.GasPriceEstimator error: %w", err)
}

err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("priceService.UpdateDynamicConfig error: %w", err)
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
Expand Down Expand Up @@ -147,4 +150,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.Report

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
Copy link
Contributor

@winder winder Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider wrapping the errors returned by newReportingPluginFn with additional context similar to how the exec plugin's version of this function does it.

Also, should this change be made there as well?

Copy link
Contributor Author

@rstout rstout Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should this change be made there as well?

This change is in the exec plugin as well, or did you mean something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, missed that.

rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
3 changes: 3 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -24,6 +25,8 @@ import (
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
commitConfig := CommitPluginStaticConfig{}
commitConfig.lggr = logger.TestLogger(t)
commitConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
// Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first
// 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff)
MaxRetries: (6 * 4) + 10,
}

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
Expand Down
17 changes: 15 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ type reportingPluginAndInfo struct {
func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -83,7 +86,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function,
// hence why we can only keep retrying it until it succeeds.
func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider setting a timeout

destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
Expand Down Expand Up @@ -161,4 +164,14 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
3 changes: 3 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -23,6 +24,8 @@ import (
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
execConfig := ExecutionPluginStaticConfig{}
execConfig.lggr = logger.TestLogger(t)
execConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
execConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down
8 changes: 7 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ var (
tokenDataWorkerNumWorkers = 5
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
// Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first
// 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff)
MaxRetries: (6 * 4) + 10,
}

func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
if jb.OCR2OracleSpec == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,19 @@ func SelectorToBytes(chainSelector uint64) [16]byte {
return b
}

// RetryUntilSuccess repeatedly calls fn until it returns a nil error. After each failed call there is an exponential
// backoff applied, between initialDelay and maxDelay.
func RetryUntilSuccess[T any](fn func() (T, error), initialDelay time.Duration, maxDelay time.Duration) (T, error) {
// RetryUntilSuccess repeatedly calls fn until it returns a nil error or retries have been exhausted. After each failed
// call there is an exponential backoff applied, between initialDelay and maxDelay.
func RetryUntilSuccess[T any](
fn func() (T, error),
initialDelay time.Duration,
maxDelay time.Duration,
maxRetries uint,
) (T, error) {
return retry.DoWithData(
fn,
retry.Delay(initialDelay),
retry.MaxDelay(maxDelay),
retry.DelayType(retry.BackOffDelay),
retry.UntilSucceeded(),
retry.Attempts(maxRetries),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,20 @@ func TestRetryUntilSuccess(t *testing.T) {
}

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 5th attempt
numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay)
numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay, 10)
assert.Nil(t, err)
assert.Equal(t, 5, numCalls)

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 8th attempt
numAttempts = 8
numCalls = 0
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay)
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 10)
assert.Nil(t, err)
assert.Equal(t, 8, numCalls)

// Assert that RetryUntilSuccess exhausts retries
numAttempts = 8
numCalls = 0
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 2)
assert.NotNil(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package ccipdata

import "time"

// RetryConfig configures an initial delay between retries and a max delay between retries
// RetryConfig configures an initial delay between retries, a max delay between retries, and a maximum number of
// times to retry
type RetryConfig struct {
InitialDelay time.Duration
MaxDelay time.Duration
MaxRetries uint
}
14 changes: 14 additions & 0 deletions core/services/ocr2/plugins/ccip/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var (
Name: "ccip_sequence_number_counter",
Help: "Sequence number of the last message processed by the plugin",
}, []string{"plugin", "source", "dest", "ocrPhase"})
newReportingPluginErrorCounter = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ccip_new_reporting_plugin_error_counter",
Help: "The count of the number of errors when calling NewReportingPlugin",
}, []string{"plugin"})
)

type ocrPhase string
Expand All @@ -35,6 +39,7 @@ type PluginMetricsCollector interface {
NumberOfMessagesBasedOnInterval(phase ocrPhase, seqNrMin, seqNrMax uint64)
UnexpiredCommitRoots(count int)
SequenceNumber(phase ocrPhase, seqNr uint64)
NewReportingPluginError()
}

type pluginMetricsCollector struct {
Expand Down Expand Up @@ -79,6 +84,12 @@ func (p *pluginMetricsCollector) SequenceNumber(phase ocrPhase, seqNr uint64) {
Set(float64(seqNr))
}

func (p *pluginMetricsCollector) NewReportingPluginError() {
newReportingPluginErrorCounter.
WithLabelValues(p.pluginName).
Inc()
}

var (
// NoopMetricsCollector is a no-op implementation of PluginMetricsCollector
NoopMetricsCollector PluginMetricsCollector = noop{}
Expand All @@ -97,3 +108,6 @@ func (d noop) UnexpiredCommitRoots(int) {

func (d noop) SequenceNumber(ocrPhase, uint64) {
}

func (d noop) NewReportingPluginError() {
}
Loading