diff --git a/polygon/sync/execution_client.go b/polygon/sync/execution_client.go index 5dd27281acd..705f88f1d2b 100644 --- a/polygon/sync/execution_client.go +++ b/polygon/sync/execution_client.go @@ -23,17 +23,20 @@ import ( "math/big" "time" + "github.com/cenkalti/backoff/v4" "google.golang.org/protobuf/types/known/emptypb" "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/executionproto" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" eth1utils "github.com/erigontech/erigon/turbo/execution/eth1/eth1_utils" ) var ErrForkChoiceUpdateFailure = errors.New("fork choice update failure") var ErrForkChoiceUpdateBadBlock = errors.New("fork choice update bad block") +var ErrExecutionClientBusy = errors.New("execution client busy") type ExecutionClient interface { Prepare(ctx context.Context) error @@ -44,11 +47,15 @@ type ExecutionClient interface { GetTd(ctx context.Context, blockNum uint64, blockHash common.Hash) (*big.Int, error) } -func newExecutionClient(client executionproto.ExecutionClient) *executionClient { - return &executionClient{client} +func newExecutionClient(logger log.Logger, client executionproto.ExecutionClient) *executionClient { + return &executionClient{ + logger: logger, + client: client, + } } type executionClient struct { + logger log.Logger client executionproto.ExecutionClient } @@ -71,7 +78,7 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc Blocks: eth1utils.ConvertBlocksToRPC(blocks), } - for { + return e.retryBusy(ctx, "insertBlocks", func() error { response, err := e.client.InsertBlocks(ctx, request) if err != nil { return err @@ -82,20 +89,11 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc case executionproto.ExecutionStatus_Success: return nil case executionproto.ExecutionStatus_Busy: - // retry after sleep - func() { - delayTimer := time.NewTimer(100 * time.Millisecond) - defer delayTimer.Stop() - - select { - case <-delayTimer.C: - case <-ctx.Done(): - } - }() + return ErrExecutionClientBusy // gets retried default: - return fmt.Errorf("executionClient.InsertBlocks failed with response status: %s", status.String()) + return fmt.Errorf("executionClient.InsertBlocks failure status: %s", status.String()) } - } + }) } func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Header, finalizedHeader *types.Header) (common.Hash, error) { @@ -108,34 +106,30 @@ func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Heade Timeout: 0, } - response, err := e.client.UpdateForkChoice(ctx, &request) - if err != nil { - return common.Hash{}, err - } - var latestValidHash common.Hash - if response.LatestValidHash != nil { - latestValidHash = gointerfaces.ConvertH256ToHash(response.LatestValidHash) - } + err := e.retryBusy(ctx, "updateForkChoice", func() error { + r, err := e.client.UpdateForkChoice(ctx, &request) + if err != nil { + return err + } - switch response.Status { - case executionproto.ExecutionStatus_Success: - return latestValidHash, nil - case executionproto.ExecutionStatus_BadBlock: - return latestValidHash, fmt.Errorf( - "%w: status=%d, validationErr='%s'", - ErrForkChoiceUpdateBadBlock, - response.Status, - response.ValidationError, - ) - default: - return latestValidHash, fmt.Errorf( - "%w: status=%d, validationErr='%s'", - ErrForkChoiceUpdateFailure, - response.Status, - response.ValidationError, - ) - } + if r.LatestValidHash != nil { + latestValidHash = gointerfaces.ConvertH256ToHash(r.LatestValidHash) + } + + switch r.Status { + case executionproto.ExecutionStatus_Success: + return nil + case executionproto.ExecutionStatus_BadBlock: + return fmt.Errorf("%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateBadBlock, r.Status, r.ValidationError) + case executionproto.ExecutionStatus_Busy: + return ErrExecutionClientBusy // gets retried + default: + return fmt.Errorf("%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateFailure, r.Status, r.ValidationError) + } + }) + + return latestValidHash, err } func (e *executionClient) CurrentHeader(ctx context.Context) (*types.Header, error) { @@ -179,3 +173,28 @@ func (e *executionClient) GetTd(ctx context.Context, blockNum uint64, blockHash return eth1utils.ConvertBigIntFromRpc(response.GetTd()), nil } + +func (e *executionClient) retryBusy(ctx context.Context, label string, f func() error) error { + backOff := 50 * time.Millisecond + logEvery := 5 * time.Second + logEveryXAttempt := int64(logEvery / backOff) + attempt := int64(1) + operation := func() error { + err := f() + if err == nil { + return nil + } + + if errors.Is(err, ErrExecutionClientBusy) { + if attempt%logEveryXAttempt == 1 { + e.logger.Debug("execution client busy - retrying", "in", backOff, "label", label, "attempt", attempt) + } + attempt++ + return err + } + + return backoff.Permanent(err) + } + + return backoff.Retry(operation, backoff.WithContext(backoff.NewConstantBackOff(backOff), ctx)) +} diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 06406b7c111..d71a60e1c7e 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -54,7 +54,7 @@ func NewService( milestoneVerifier := VerifyMilestoneHeaders blocksVerifier := VerifyBlocks p2pService := p2p.NewService(logger, maxPeers, sentryClient, statusDataProvider.GetStatusData) - execution := newExecutionClient(executionClient) + execution := newExecutionClient(logger, executionClient) signaturesCache, err := lru.NewARC[common.Hash, common.Address](InMemorySignatures) if err != nil {