From b7b99cfeee5d513dd1d0fb0820f210ed5b5d916c Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Fri, 20 Dec 2024 01:33:00 -0300 Subject: [PATCH 1/4] feat(job-distributor): add exp. backoff retry to feeds.SyncNodeInfo() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s a behavior that we’ve observed for some time on the NOP side where they will add/update a chain configuration of the Job Distributor panel but the change is not reflected on the service itself. This leads to inefficiencies as NOPs are unaware of this and thus need to be notified so that they may "reapply" the configuration. After some investigation, we suspect that this is due to connectivity issues between the nodes and the job distributor instance, which causes the message with the update to be lost. This PR attempts to solve this by adding a "retry" wrapper on top of the existing `SyncNodeInfo` method. We rely on `avast/retry-go` to implement the bulk of the retry logic. It's configured with a minimal delay of 10 seconds, maximum delay of 30 minutes and retry a total of 56 times -- which adds up to a bit more than 24 hours. Ticket Number: DPA-1371 --- .changeset/neat-penguins-report.md | 5 +++ core/services/feeds/service.go | 62 +++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 14 deletions(-) create mode 100644 .changeset/neat-penguins-report.md diff --git a/.changeset/neat-penguins-report.md b/.changeset/neat-penguins-report.md new file mode 100644 index 00000000000..053faa00178 --- /dev/null +++ b/.changeset/neat-penguins-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added add exponential backoff retry to feeds.SyncNodeInfo() diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 99bbb2e0cbb..6e658620fb2 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,7 +5,9 @@ import ( "database/sql" "encoding/hex" "fmt" + "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/lib/pq" @@ -142,6 +144,7 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig + syncNodeInfoCancel context.CancelFunc } // NewService constructs a new feeds service @@ -184,6 +187,7 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, + syncNodeInfoCancel: func() {}, } return svc @@ -255,8 +259,45 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar return id, nil } -// SyncNodeInfo syncs the node's information with FMS +// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. +// In case of failures, it retries with an exponential backoff for up to 24h. +func (s *service) syncNodeInfoWithRetry(id int64) { + // cancel the previous context -- and, by extension, the existing goroutine -- + // so that we can start anew + s.syncNodeInfoCancel() + + var ctx context.Context + ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background()) + + retryOpts := []retry.Option{ + retry.Context(ctx), + retry.Delay(5 * time.Second), + retry.Delay(10 * time.Second), + retry.MaxDelay(30 * time.Minute), + retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries + retry.LastErrorOnly(true), + retry.OnRetry(func(attempt uint, err error) { + s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) + }), + } + + go func() { + err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...) + if err != nil { + s.lggr.Errorw("failed to sync node info; aborting", "err", err) + } else { + s.lggr.Info("successfully synced node info") + } + + s.syncNodeInfoCancel() + s.syncNodeInfoCancel = func() {} + }() +} + func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // Get the FMS RPC client fmsClient, err := s.connMgr.GetClient(id) if err != nil { @@ -402,9 +443,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -426,9 +465,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -467,9 +504,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config") } - if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(ccfg.FeedsManagerID) return id, nil } @@ -1156,6 +1191,8 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() + s.syncNodeInfoCancel() + return nil }) } @@ -1173,10 +1210,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv }, OnConnect: func(pb.FeedsManagerClient) { // Sync the node's information with FMS once connected - err := s.SyncNodeInfo(ctx, mgr.ID) - if err != nil { - s.lggr.Infof("Error syncing node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) }, }) } From 7d631a757971d8d100abc4b806b49dce9cf5b2ea Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Thu, 26 Dec 2024 23:32:01 -0300 Subject: [PATCH 2/4] review: protect cancel func access with a mutex to avoid race conditions --- core/services/feeds/service.go | 62 +++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 6e658620fb2..e5f53c41d9f 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" "time" "github.com/avast/retry-go/v4" @@ -144,7 +145,7 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig - syncNodeInfoCancel context.CancelFunc + syncNodeInfoCancel AtomicCancelFunc } // NewService constructs a new feeds service @@ -187,7 +188,7 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, - syncNodeInfoCancel: func() {}, + syncNodeInfoCancel: AtomicCancelFunc{fn: func() {}}, } return svc @@ -262,20 +263,18 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar // syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. // In case of failures, it retries with an exponential backoff for up to 24h. func (s *service) syncNodeInfoWithRetry(id int64) { + ctx, cancel := context.WithCancel(context.Background()) + // cancel the previous context -- and, by extension, the existing goroutine -- // so that we can start anew - s.syncNodeInfoCancel() - - var ctx context.Context - ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background()) + s.syncNodeInfoCancel.CallAndSwap(cancel) retryOpts := []retry.Option{ retry.Context(ctx), - retry.Delay(5 * time.Second), + retry.DelayType(retry.BackOffDelay), retry.Delay(10 * time.Second), retry.MaxDelay(30 * time.Minute), retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries - retry.LastErrorOnly(true), retry.OnRetry(func(attempt uint, err error) { s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) }), @@ -289,8 +288,7 @@ func (s *service) syncNodeInfoWithRetry(id int64) { s.lggr.Info("successfully synced node info") } - s.syncNodeInfoCancel() - s.syncNodeInfoCancel = func() {} + s.syncNodeInfoCancel.CallAndSwap(func(){}) }() } @@ -1066,9 +1064,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error { ) err = s.transact(ctx, func(tx datasources) error { - var ( - txerr error - ) + var txerr error if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil { return txerr @@ -1191,7 +1187,7 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() - s.syncNodeInfoCancel() + s.syncNodeInfoCancel.CallAndSwap(func(){}) return nil }) @@ -1254,8 +1250,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error { metrics := counts.toMetrics() // Set the prometheus gauge metrics. - for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved, - JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} { + for _, status := range []JobProposalStatus{ + JobProposalStatusPending, JobProposalStatusApproved, + JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked, + } { status := status promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status]) @@ -1599,6 +1597,18 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled) } +type AtomicCancelFunc struct { + fn context.CancelFunc + mutex sync.Mutex +} + +func (f *AtomicCancelFunc) CallAndSwap(other func()) { + f.mutex.Lock() + defer f.mutex.Unlock() + f.fn() + f.fn = other +} + var _ Service = &NullService{} // NullService defines an implementation of the Feeds Service that is used @@ -1611,24 +1621,31 @@ func (ns NullService) Close() error { return nil } func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error { return ErrFeedsManagerDisabled } + func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) CancelSpec(ctx context.Context, id int64) error { return ErrFeedsManagerDisabled } + func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) { return nil, ErrFeedsManagerDisabled } @@ -1636,15 +1653,19 @@ func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) { return 0, ErrFeedsManagerDisabled } @@ -1652,18 +1673,23 @@ func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, erro func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RejectSpec(ctx context.Context, id int64) error { return ErrFeedsManagerDisabled } @@ -1671,15 +1697,19 @@ func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error { return ErrFeedsManagerDisabled } + func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) { return false, nil } + func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error { return ErrFeedsManagerDisabled } From d7e6aa9dbc162b33b41f5b491c98e7f2095b6944 Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Sat, 11 Jan 2025 03:47:33 -0300 Subject: [PATCH 3/4] review: trigger retry on partial failures and support multiple job distributors --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- core/services/feeds/service.go | 96 ++++++++++--- core/services/feeds/service_test.go | 201 +++++++++++++++++++++++++++- deployment/go.mod | 2 +- deployment/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- system-tests/lib/go.mod | 2 +- system-tests/lib/go.sum | 4 +- system-tests/tests/go.mod | 2 +- system-tests/tests/go.sum | 4 +- 16 files changed, 294 insertions(+), 45 deletions(-) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 7fb73eed291..78972619211 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -346,7 +346,7 @@ require ( github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index fbe0b40ff8c..26e32970337 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1104,8 +1104,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index e5f53c41d9f..c30164009dc 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "strings" "sync" "time" @@ -80,6 +81,10 @@ var ( // Job Proposal status "status", }) + + defaultSyncMinDelay = 10 * time.Second + defaultSyncMaxDelay = 30 * time.Minute + defaultSyncMaxAttempts = uint(48 + 8) // 30m * 48 =~ 24h; plus the initial 8 shorter retries ) // Service represents a behavior of the feeds service @@ -145,7 +150,10 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig - syncNodeInfoCancel AtomicCancelFunc + syncNodeInfoCancel atomicCancelFns + syncMinDelay time.Duration + syncMaxDelay time.Duration + syncMaxAttempts uint } // NewService constructs a new feeds service @@ -165,6 +173,7 @@ func NewService( lggr logger.Logger, version string, rc plugins.RegistrarConfig, + opts ...ServiceOption, ) *service { lggr = lggr.Named("Feeds") svc := &service{ @@ -188,7 +197,14 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, - syncNodeInfoCancel: AtomicCancelFunc{fn: func() {}}, + syncNodeInfoCancel: atomicCancelFns{fns: map[int64]context.CancelFunc{}}, + syncMinDelay: defaultSyncMinDelay, + syncMaxDelay: defaultSyncMaxDelay, + syncMaxAttempts: defaultSyncMaxAttempts, + } + + for _, opt := range opts { + opt(svc) } return svc @@ -260,23 +276,24 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar return id, nil } -// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. -// In case of failures, it retries with an exponential backoff for up to 24h. +// syncNodeInfoWithRetry syncs the node's information with FMS. In case of failures, +// it retries with an exponential backoff for up to 24h. func (s *service) syncNodeInfoWithRetry(id int64) { ctx, cancel := context.WithCancel(context.Background()) // cancel the previous context -- and, by extension, the existing goroutine -- // so that we can start anew - s.syncNodeInfoCancel.CallAndSwap(cancel) + s.syncNodeInfoCancel.callAndSwap(id, cancel) retryOpts := []retry.Option{ retry.Context(ctx), retry.DelayType(retry.BackOffDelay), - retry.Delay(10 * time.Second), - retry.MaxDelay(30 * time.Minute), - retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries + retry.Delay(s.syncMinDelay), + retry.MaxDelay(s.syncMaxDelay), + retry.Attempts(s.syncMaxAttempts), + retry.LastErrorOnly(true), retry.OnRetry(func(attempt uint, err error) { - s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) + s.lggr.Infow("failed to sync node info", "attempt", attempt, "err", err.Error()) }), } @@ -288,7 +305,7 @@ func (s *service) syncNodeInfoWithRetry(id int64) { s.lggr.Info("successfully synced node info") } - s.syncNodeInfoCancel.CallAndSwap(func(){}) + s.syncNodeInfoCancel.callAndSwap(id, nil) }() } @@ -320,12 +337,22 @@ func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { } workflowKey := s.getWorkflowPublicKey() - if _, err = fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{ + + resp, err := fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{ Version: s.version, ChainConfigs: cfgMsgs, WorkflowKey: &workflowKey, - }); err != nil { - return err + }) + if err != nil { + return errors.Wrap(err, "SyncNodeInfo.UpdateNode call failed") + } + if len(resp.ChainConfigErrors) > 0 { + errMsgs := make([]string, 0, len(resp.ChainConfigErrors)) + for _, ccErr := range resp.ChainConfigErrors { + errMsgs = append(errMsgs, ccErr.Message) + } + + return errors.Errorf("SyncNodeInfo.UpdateNode call partially failed: %s", strings.Join(errMsgs, "; ")) } return nil @@ -1187,7 +1214,7 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() - s.syncNodeInfoCancel.CallAndSwap(func(){}) + s.syncNodeInfoCancel.callAllAndClear() return nil }) @@ -1597,16 +1624,47 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled) } -type AtomicCancelFunc struct { - fn context.CancelFunc +type atomicCancelFns struct { + fns map[int64]context.CancelFunc mutex sync.Mutex } -func (f *AtomicCancelFunc) CallAndSwap(other func()) { +func (f *atomicCancelFns) callAndSwap(id int64, other func()) { f.mutex.Lock() defer f.mutex.Unlock() - f.fn() - f.fn = other + + fn, found := f.fns[id] + if found && fn != nil { + fn() + } + + f.fns[id] = other +} + +func (f *atomicCancelFns) callAllAndClear() { + f.mutex.Lock() + defer f.mutex.Unlock() + + for _, fn := range f.fns { + if fn != nil { + fn() + } + } + clear(f.fns) +} + +type ServiceOption func(*service) + +func WithSyncMinDelay(delay time.Duration) ServiceOption { + return func(s *service) { s.syncMinDelay = delay } +} + +func WithSyncMaxDelay(delay time.Duration) ServiceOption { + return func(s *service) { s.syncMaxDelay = delay } +} + +func WithSyncMaxAttempts(attempts uint) ServiceOption { + return func(s *service) { s.syncMaxAttempts = attempts } } var _ Service = &NullService{} diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index ce0e933df49..1cf14b00ef5 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -5,7 +5,9 @@ import ( "database/sql" "encoding/hex" "fmt" + "maps" "math/big" + "slices" "testing" "time" @@ -16,6 +18,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "gopkg.in/guregu/null.v4" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" @@ -196,15 +200,18 @@ type TestService struct { ocr2Keystore *ksmocks.OCR2 workflowKeystore *ksmocks.Workflow legacyChains legacyevm.LegacyChainContainer + logs *observer.ObservedLogs } -func setupTestService(t *testing.T) *TestService { +func setupTestService(t *testing.T, opts ...feeds.ServiceOption) *TestService { t.Helper() - return setupTestServiceCfg(t, nil) + return setupTestServiceCfg(t, nil, opts...) } -func setupTestServiceCfg(t *testing.T, overrideCfg func(c *chainlink.Config, s *chainlink.Secrets)) *TestService { +func setupTestServiceCfg( + t *testing.T, overrideCfg func(c *chainlink.Config, s *chainlink.Secrets), opts ...feeds.ServiceOption, +) *TestService { t.Helper() var ( @@ -220,7 +227,7 @@ func setupTestServiceCfg(t *testing.T, overrideCfg func(c *chainlink.Config, s * workflowKeystore = ksmocks.NewWorkflow(t) ) - lggr := logger.TestLogger(t) + lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel) db := pgtest.NewSqlxDB(t) gcfg := configtest.NewGeneralConfig(t, overrideCfg) @@ -241,7 +248,8 @@ func setupTestServiceCfg(t *testing.T, overrideCfg func(c *chainlink.Config, s * keyStore.On("OCR").Return(ocr1Keystore) keyStore.On("OCR2").Return(ocr2Keystore) keyStore.On("Workflow").Return(workflowKeystore) - svc := feeds.NewService(orm, jobORM, db, spawner, keyStore, gcfg, gcfg.Feature(), gcfg.Insecure(), gcfg.JobPipeline(), gcfg.OCR(), gcfg.OCR2(), legacyChains, lggr, "1.0.0", nil) + svc := feeds.NewService(orm, jobORM, db, spawner, keyStore, gcfg, gcfg.Feature(), gcfg.Insecure(), + gcfg.JobPipeline(), gcfg.OCR(), gcfg.OCR2(), legacyChains, lggr, "1.0.0", nil, opts...) svc.SetConnectionsManager(connMgr) return &TestService{ @@ -257,6 +265,7 @@ func setupTestServiceCfg(t *testing.T, overrideCfg func(c *chainlink.Config, s * ocr2Keystore: ocr2Keystore, workflowKeystore: workflowKeystore, legacyChains: legacyChains, + logs: observedLogs, } } @@ -1856,6 +1865,170 @@ func Test_Service_SyncNodeInfo(t *testing.T) { } } +func Test_Service_syncNodeInfoWithRetry(t *testing.T) { + t.Parallel() + + mgr := feeds.FeedsManager{ID: 1} + nodeVersion := &versioning.NodeVersion{Version: "1.0.0"} + cfg := feeds.ChainConfig{ + FeedsManagerID: mgr.ID, + ChainID: "42", + ChainType: feeds.ChainTypeEVM, + AccountAddress: "0x0000000000000000000000000000000000000000", + AccountAddressPublicKey: null.StringFrom("0x0000000000000000000000000000000000000002"), + AdminAddress: "0x0000000000000000000000000000000000000001", + FluxMonitorConfig: feeds.FluxMonitorConfig{Enabled: true}, + OCR1Config: feeds.OCR1Config{Enabled: false}, + OCR2Config: feeds.OCR2ConfigModel{Enabled: false}, + } + workflowKey, err := workflowkey.New() + require.NoError(t, err) + + request := &proto.UpdateNodeRequest{ + Version: nodeVersion.Version, + ChainConfigs: []*proto.ChainConfig{ + { + Chain: &proto.Chain{ + Id: cfg.ChainID, + Type: proto.ChainType_CHAIN_TYPE_EVM, + }, + AccountAddress: cfg.AccountAddress, + AccountAddressPublicKey: &cfg.AccountAddressPublicKey.String, + AdminAddress: cfg.AdminAddress, + FluxMonitorConfig: &proto.FluxMonitorConfig{Enabled: true}, + Ocr1Config: &proto.OCR1Config{Enabled: false}, + Ocr2Config: &proto.OCR2Config{Enabled: false}, + }, + }, + WorkflowKey: func(s string) *string { return &s }(workflowKey.ID()), + } + successResponse := &proto.UpdateNodeResponse{ChainConfigErrors: map[string]*proto.ChainConfigError{}} + failureResponse := func(chainID string) *proto.UpdateNodeResponse { + return &proto.UpdateNodeResponse{ + ChainConfigErrors: map[string]*proto.ChainConfigError{chainID: {Message: "error chain " + chainID}}, + } + } + + tests := []struct { + name string + setup func(t *testing.T, svc *TestService) + run func(svc *TestService) (any, error) + wantLogs []string + }{ + { + name: "create chain", + setup: func(t *testing.T, svc *TestService) { + svc.workflowKeystore.EXPECT().GetAll().Return([]workflowkey.Key{workflowKey}, nil) + svc.orm.EXPECT().CreateChainConfig(mock.Anything, cfg).Return(int64(1), nil) + svc.orm.EXPECT().GetManager(mock.Anything, mgr.ID).Return(&mgr, nil) + svc.orm.EXPECT().ListChainConfigsByManagerIDs(mock.Anything, []int64{mgr.ID}).Return([]feeds.ChainConfig{cfg}, nil) + svc.connMgr.EXPECT().GetClient(mgr.ID).Return(svc.fmsClient, nil) + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(nil, errors.New("error-0")).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("1"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("2"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(successResponse, nil).Once() + }, + run: func(svc *TestService) (any, error) { + return svc.CreateChainConfig(testutils.Context(t), cfg) + }, + wantLogs: []string{ + `failed to sync node info attempt="0" err="SyncNodeInfo.UpdateNode call failed: error-0"`, + `failed to sync node info attempt="1" err="SyncNodeInfo.UpdateNode call partially failed: error chain 1"`, + `failed to sync node info attempt="2" err="SyncNodeInfo.UpdateNode call partially failed: error chain 2"`, + `successfully synced node info`, + }, + }, + { + name: "update chain", + setup: func(t *testing.T, svc *TestService) { + svc.workflowKeystore.EXPECT().GetAll().Return([]workflowkey.Key{workflowKey}, nil) + svc.orm.EXPECT().UpdateChainConfig(mock.Anything, cfg).Return(int64(1), nil) + svc.orm.EXPECT().GetChainConfig(mock.Anything, cfg.ID).Return(&cfg, nil) + svc.orm.EXPECT().ListChainConfigsByManagerIDs(mock.Anything, []int64{mgr.ID}).Return([]feeds.ChainConfig{cfg}, nil) + svc.connMgr.EXPECT().GetClient(mgr.ID).Return(svc.fmsClient, nil) + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("3"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(nil, errors.New("error-4")).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("5"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(successResponse, nil).Once() + }, + run: func(svc *TestService) (any, error) { + return svc.UpdateChainConfig(testutils.Context(t), cfg) + }, + wantLogs: []string{ + `failed to sync node info attempt="0" err="SyncNodeInfo.UpdateNode call partially failed: error chain 3"`, + `failed to sync node info attempt="1" err="SyncNodeInfo.UpdateNode call failed: error-4"`, + `failed to sync node info attempt="2" err="SyncNodeInfo.UpdateNode call partially failed: error chain 5"`, + `successfully synced node info`, + }, + }, + { + name: "delete chain", + setup: func(t *testing.T, svc *TestService) { + svc.workflowKeystore.EXPECT().GetAll().Return([]workflowkey.Key{workflowKey}, nil) + svc.orm.EXPECT().GetChainConfig(mock.Anything, cfg.ID).Return(&cfg, nil) + svc.orm.EXPECT().DeleteChainConfig(mock.Anything, cfg.ID).Return(cfg.ID, nil) + svc.orm.EXPECT().GetManager(mock.Anything, mgr.ID).Return(&mgr, nil) + svc.orm.EXPECT().ListChainConfigsByManagerIDs(mock.Anything, []int64{mgr.ID}).Return([]feeds.ChainConfig{cfg}, nil) + svc.connMgr.EXPECT().GetClient(mgr.ID).Return(svc.fmsClient, nil) + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("6"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("7"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(nil, errors.New("error-8")).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(successResponse, nil).Once() + }, + run: func(svc *TestService) (any, error) { + return svc.DeleteChainConfig(testutils.Context(t), cfg.ID) + }, + wantLogs: []string{ + `failed to sync node info attempt="0" err="SyncNodeInfo.UpdateNode call partially failed: error chain 6"`, + `failed to sync node info attempt="1" err="SyncNodeInfo.UpdateNode call partially failed: error chain 7"`, + `failed to sync node info attempt="2" err="SyncNodeInfo.UpdateNode call failed: error-8"`, + `successfully synced node info`, + }, + }, + { + name: "more errors than MaxAttempts", + setup: func(t *testing.T, svc *TestService) { + svc.workflowKeystore.EXPECT().GetAll().Return([]workflowkey.Key{workflowKey}, nil) + svc.orm.EXPECT().CreateChainConfig(mock.Anything, cfg).Return(int64(1), nil) + svc.orm.EXPECT().GetManager(mock.Anything, mgr.ID).Return(&mgr, nil) + svc.orm.EXPECT().ListChainConfigsByManagerIDs(mock.Anything, []int64{mgr.ID}).Return([]feeds.ChainConfig{cfg}, nil) + svc.connMgr.EXPECT().GetClient(mgr.ID).Return(svc.fmsClient, nil) + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("9"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("10"), nil).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(nil, errors.New("error-11")).Once() + svc.fmsClient.EXPECT().UpdateNode(mock.Anything, request).Return(failureResponse("12"), nil).Once() + }, + run: func(svc *TestService) (any, error) { + return svc.CreateChainConfig(testutils.Context(t), cfg) + }, + wantLogs: []string{ + `failed to sync node info attempt="0" err="SyncNodeInfo.UpdateNode call partially failed: error chain 9"`, + `failed to sync node info attempt="1" err="SyncNodeInfo.UpdateNode call partially failed: error chain 10"`, + `failed to sync node info attempt="2" err="SyncNodeInfo.UpdateNode call failed: error-11"`, + `failed to sync node info attempt="3" err="SyncNodeInfo.UpdateNode call partially failed: error chain 12"`, + `failed to sync node info; aborting err="SyncNodeInfo.UpdateNode call partially failed: error chain 12"`, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + svc := setupTestService(t, feeds.WithSyncMinDelay(5*time.Millisecond), + feeds.WithSyncMaxDelay(50*time.Millisecond), feeds.WithSyncMaxAttempts(4)) + + tt.setup(t, svc) + _, err := tt.run(svc) + + require.NoError(t, err) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, tt.wantLogs, logMessages(svc.logs.All())) + }, 1*time.Second, 50*time.Millisecond) + }) + } +} + func Test_Service_IsJobManaged(t *testing.T) { t.Parallel() @@ -4751,3 +4924,21 @@ func Test_Service_StartStop(t *testing.T) { }) } } + +func logMessages(logEntries []observer.LoggedEntry) []string { + messages := make([]string, 0, len(logEntries)) + for _, entry := range logEntries { + messageWithContext := entry.Message + contextMap := entry.ContextMap() + for _, key := range slices.Sorted(maps.Keys(contextMap)) { + if key == "version" || key == "errVerbose" { + continue + } + messageWithContext += fmt.Sprintf(" %v=\"%v\"", key, entry.ContextMap()[key]) + } + + messages = append(messages, messageWithContext) + } + + return messages +} diff --git a/deployment/go.mod b/deployment/go.mod index f1a0f6fb3f3..60d68646329 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -356,7 +356,7 @@ require ( github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.10 // indirect diff --git a/deployment/go.sum b/deployment/go.sum index df1bf1dc209..8812b2c6837 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1152,8 +1152,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/go.mod b/go.mod index bdd5ee85fff..98740230043 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-33d0c0bf38de diff --git a/go.sum b/go.sum index bde89e1c8a3..0e432088eca 100644 --- a/go.sum +++ b/go.sum @@ -1034,8 +1034,8 @@ github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441- github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb/go.mod h1:4JqpgFy01LaqG1yM2iFTzwX3ZgcAvW9WdstBZQgPHzU= github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 h1:3icYNFldKQbs6Qrfai2LE+tKbNcE4tfgPRELF30mnEA= github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 47986b51869..5d695189cbd 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -433,7 +433,7 @@ require ( github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index a8bf643a088..9540ee09e3b 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1402,8 +1402,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index e699c548a0a..ca630bcdee0 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -424,7 +424,7 @@ require ( github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 04569532d2f..ea06d1014cd 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1387,8 +1387,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index 8868a5e7f6e..f7fd452efb2 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -345,7 +345,7 @@ require ( github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect diff --git a/system-tests/lib/go.sum b/system-tests/lib/go.sum index a52fa0c717d..8b8188a35e2 100644 --- a/system-tests/lib/go.sum +++ b/system-tests/lib/go.sum @@ -1140,8 +1140,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index c46809f5f61..c54feb0bf24 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -350,7 +350,7 @@ require ( github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect diff --git a/system-tests/tests/go.sum b/system-tests/tests/go.sum index 57a42423877..14a5f729c5d 100644 --- a/system-tests/tests/go.sum +++ b/system-tests/tests/go.sum @@ -1140,8 +1140,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c= github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ= -github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y= +github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA= github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w= github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE= From a42860e96a0d1029f5486f457bf4ec3455b1a49e Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Mon, 24 Feb 2025 16:49:11 -0300 Subject: [PATCH 4/4] review: clear contexts before closing the connection manager --- core/services/feeds/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index c30164009dc..06c964130cf 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -1211,11 +1211,11 @@ func (s *service) Start(ctx context.Context) error { // Close shuts down the service func (s *service) Close() error { return s.StopOnce("FeedsService", func() error { + s.syncNodeInfoCancel.callAllAndClear() + // This blocks until it finishes s.connMgr.Close() - s.syncNodeInfoCancel.callAllAndClear() - return nil }) }