Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into will/price-only-tr…
Browse files Browse the repository at this point in the history
…ansmit
  • Loading branch information
winder committed Feb 26, 2025
2 parents 8156451 + 39d0909 commit 70faf30
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/neat-penguins-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added add exponential backoff retry to feeds.SyncNodeInfo()
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ require (
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d
github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w=
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE=
Expand Down
166 changes: 144 additions & 22 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
Expand Down Expand Up @@ -77,6 +81,10 @@ var (
// Job Proposal status
"status",
})

defaultSyncMinDelay = 10 * time.Second
defaultSyncMaxDelay = 30 * time.Minute
defaultSyncMaxAttempts = uint(48 + 8) // 30m * 48 =~ 24h; plus the initial 8 shorter retries
)

// Service represents a behavior of the feeds service
Expand Down Expand Up @@ -142,6 +150,10 @@ type service struct {
lggr logger.Logger
version string
loopRegistrarConfig plugins.RegistrarConfig
syncNodeInfoCancel atomicCancelFns
syncMinDelay time.Duration
syncMaxDelay time.Duration
syncMaxAttempts uint
}

// NewService constructs a new feeds service
Expand All @@ -161,6 +173,7 @@ func NewService(
lggr logger.Logger,
version string,
rc plugins.RegistrarConfig,
opts ...ServiceOption,
) *service {
lggr = lggr.Named("Feeds")
svc := &service{
Expand All @@ -184,6 +197,14 @@ func NewService(
lggr: lggr,
version: version,
loopRegistrarConfig: rc,
syncNodeInfoCancel: atomicCancelFns{fns: map[int64]context.CancelFunc{}},
syncMinDelay: defaultSyncMinDelay,
syncMaxDelay: defaultSyncMaxDelay,
syncMaxAttempts: defaultSyncMaxAttempts,
}

for _, opt := range opts {
opt(svc)
}

return svc
Expand Down Expand Up @@ -255,8 +276,43 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar
return id, nil
}

// SyncNodeInfo syncs the node's information with FMS
// syncNodeInfoWithRetry syncs the node's information with FMS. In case of failures,
// it retries with an exponential backoff for up to 24h.
func (s *service) syncNodeInfoWithRetry(id int64) {
ctx, cancel := context.WithCancel(context.Background())

// cancel the previous context -- and, by extension, the existing goroutine --
// so that we can start anew
s.syncNodeInfoCancel.callAndSwap(id, cancel)

retryOpts := []retry.Option{
retry.Context(ctx),
retry.DelayType(retry.BackOffDelay),
retry.Delay(s.syncMinDelay),
retry.MaxDelay(s.syncMaxDelay),
retry.Attempts(s.syncMaxAttempts),
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
s.lggr.Infow("failed to sync node info", "attempt", attempt, "err", err.Error())
}),
}

go func() {
err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...)
if err != nil {
s.lggr.Errorw("failed to sync node info; aborting", "err", err)
} else {
s.lggr.Info("successfully synced node info")
}

s.syncNodeInfoCancel.callAndSwap(id, nil)
}()
}

func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Get the FMS RPC client
fmsClient, err := s.connMgr.GetClient(id)
if err != nil {
Expand All @@ -281,12 +337,22 @@ func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
}

workflowKey := s.getWorkflowPublicKey()
if _, err = fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{

resp, err := fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{
Version: s.version,
ChainConfigs: cfgMsgs,
WorkflowKey: &workflowKey,
}); err != nil {
return err
})
if err != nil {
return errors.Wrap(err, "SyncNodeInfo.UpdateNode call failed")
}
if len(resp.ChainConfigErrors) > 0 {
errMsgs := make([]string, 0, len(resp.ChainConfigErrors))
for _, ccErr := range resp.ChainConfigErrors {
errMsgs = append(errMsgs, ccErr.Message)
}

return errors.Errorf("SyncNodeInfo.UpdateNode call partially failed: %s", strings.Join(errMsgs, "; "))
}

return nil
Expand Down Expand Up @@ -402,9 +468,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand All @@ -426,9 +490,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error
return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand Down Expand Up @@ -467,9 +529,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config")
}

if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(ccfg.FeedsManagerID)

return id, nil
}
Expand Down Expand Up @@ -1031,9 +1091,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error {
)

err = s.transact(ctx, func(tx datasources) error {
var (
txerr error
)
var txerr error

if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil {
return txerr
Expand Down Expand Up @@ -1153,6 +1211,8 @@ func (s *service) Start(ctx context.Context) error {
// Close shuts down the service
func (s *service) Close() error {
return s.StopOnce("FeedsService", func() error {
s.syncNodeInfoCancel.callAllAndClear()

// This blocks until it finishes
s.connMgr.Close()

Expand All @@ -1173,10 +1233,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv
},
OnConnect: func(pb.FeedsManagerClient) {
// Sync the node's information with FMS once connected
err := s.SyncNodeInfo(ctx, mgr.ID)
if err != nil {
s.lggr.Infof("Error syncing node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)
},
})
}
Expand Down Expand Up @@ -1220,8 +1277,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
metrics := counts.toMetrics()

// Set the prometheus gauge metrics.
for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} {
for _, status := range []JobProposalStatus{
JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked,
} {
status := status

promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status])
Expand Down Expand Up @@ -1565,6 +1624,49 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu
return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled)
}

type atomicCancelFns struct {
fns map[int64]context.CancelFunc
mutex sync.Mutex
}

func (f *atomicCancelFns) callAndSwap(id int64, other func()) {
f.mutex.Lock()
defer f.mutex.Unlock()

fn, found := f.fns[id]
if found && fn != nil {
fn()
}

f.fns[id] = other
}

func (f *atomicCancelFns) callAllAndClear() {
f.mutex.Lock()
defer f.mutex.Unlock()

for _, fn := range f.fns {
if fn != nil {
fn()
}
}
clear(f.fns)
}

type ServiceOption func(*service)

func WithSyncMinDelay(delay time.Duration) ServiceOption {
return func(s *service) { s.syncMinDelay = delay }
}

func WithSyncMaxDelay(delay time.Duration) ServiceOption {
return func(s *service) { s.syncMaxDelay = delay }
}

func WithSyncMaxAttempts(attempts uint) ServiceOption {
return func(s *service) { s.syncMaxAttempts = attempts }
}

var _ Service = &NullService{}

// NullService defines an implementation of the Feeds Service that is used
Expand All @@ -1577,75 +1679,95 @@ func (ns NullService) Close() error { return nil }
func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) CancelSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}
func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) { return nil, nil }
func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}
func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, error) { return nil, nil }
func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RejectSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}
func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return nil }
func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) {
return false, nil
}

func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error {
return ErrFeedsManagerDisabled
}
Expand Down
Loading

0 comments on commit 70faf30

Please sign in to comment.