From 05d22f4d701ac34808d65d14d26fe3d25e872db3 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:17:16 +0000 Subject: [PATCH 1/3] polygon/sync: retry ufc on busy response --- polygon/sync/execution_client.go | 102 ++++++++++++++++++------------- polygon/sync/service.go | 2 +- 2 files changed, 62 insertions(+), 42 deletions(-) diff --git a/polygon/sync/execution_client.go b/polygon/sync/execution_client.go index 5dd27281acd..9154a8b355d 100644 --- a/polygon/sync/execution_client.go +++ b/polygon/sync/execution_client.go @@ -23,17 +23,20 @@ import ( "math/big" "time" + "github.com/cenkalti/backoff/v4" "google.golang.org/protobuf/types/known/emptypb" "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/executionproto" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" eth1utils "github.com/erigontech/erigon/turbo/execution/eth1/eth1_utils" ) var ErrForkChoiceUpdateFailure = errors.New("fork choice update failure") var ErrForkChoiceUpdateBadBlock = errors.New("fork choice update bad block") +var ErrExecutionClientBusy = errors.New("execution client busy") type ExecutionClient interface { Prepare(ctx context.Context) error @@ -44,11 +47,15 @@ type ExecutionClient interface { GetTd(ctx context.Context, blockNum uint64, blockHash common.Hash) (*big.Int, error) } -func newExecutionClient(client executionproto.ExecutionClient) *executionClient { - return &executionClient{client} +func newExecutionClient(logger log.Logger, client executionproto.ExecutionClient) *executionClient { + return &executionClient{ + logger: logger, + client: client, + } } type executionClient struct { + logger log.Logger client executionproto.ExecutionClient } @@ -71,7 +78,7 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc Blocks: eth1utils.ConvertBlocksToRPC(blocks), } - for { + return e.retryBusy("insertBlocks", func() error { response, err := e.client.InsertBlocks(ctx, request) if err != nil { return err @@ -82,20 +89,11 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc case executionproto.ExecutionStatus_Success: return nil case executionproto.ExecutionStatus_Busy: - // retry after sleep - func() { - delayTimer := time.NewTimer(100 * time.Millisecond) - defer delayTimer.Stop() - - select { - case <-delayTimer.C: - case <-ctx.Done(): - } - }() + return ErrExecutionClientBusy // gets retried default: - return fmt.Errorf("executionClient.InsertBlocks failed with response status: %s", status.String()) + return backoff.Permanent(fmt.Errorf("executionClient.InsertBlocks failure status: %s", status.String())) } - } + }) } func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Header, finalizedHeader *types.Header) (common.Hash, error) { @@ -108,34 +106,31 @@ func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Heade Timeout: 0, } - response, err := e.client.UpdateForkChoice(ctx, &request) - if err != nil { - return common.Hash{}, err - } - var latestValidHash common.Hash - if response.LatestValidHash != nil { - latestValidHash = gointerfaces.ConvertH256ToHash(response.LatestValidHash) - } + err := e.retryBusy("updateForkChoice", func() error { + r, err := e.client.UpdateForkChoice(ctx, &request) + if err != nil { + return err + } - switch response.Status { - case executionproto.ExecutionStatus_Success: - return latestValidHash, nil - case executionproto.ExecutionStatus_BadBlock: - return latestValidHash, fmt.Errorf( - "%w: status=%d, validationErr='%s'", - ErrForkChoiceUpdateBadBlock, - response.Status, - response.ValidationError, - ) - default: - return latestValidHash, fmt.Errorf( - "%w: status=%d, validationErr='%s'", - ErrForkChoiceUpdateFailure, - response.Status, - response.ValidationError, - ) - } + if r.LatestValidHash != nil { + latestValidHash = gointerfaces.ConvertH256ToHash(r.LatestValidHash) + } + + switch r.Status { + case executionproto.ExecutionStatus_Success: + return nil + case executionproto.ExecutionStatus_BadBlock: + return fmt.Errorf("%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateBadBlock, r.Status, r.ValidationError) + case executionproto.ExecutionStatus_Busy: + return ErrExecutionClientBusy // gets retried + default: + return fmt.Errorf( + "%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateFailure, r.Status, r.ValidationError) + } + }) + + return latestValidHash, err } func (e *executionClient) CurrentHeader(ctx context.Context) (*types.Header, error) { @@ -179,3 +174,28 @@ func (e *executionClient) GetTd(ctx context.Context, blockNum uint64, blockHash return eth1utils.ConvertBigIntFromRpc(response.GetTd()), nil } + +func (e *executionClient) retryBusy(label string, f func() error) error { + backOff := 50 * time.Millisecond + logEvery := 5 * time.Second + logEveryXAttempt := int64(logEvery / backOff) + attempt := int64(1) + operation := func() error { + err := f() + if err == nil { + return nil + } + + if errors.Is(err, ErrExecutionClientBusy) { + if attempt%logEveryXAttempt == 1 { + e.logger.Debug("execution client busy - retrying", "in", backOff, "label", label, "attempt", attempt) + } + attempt++ + return err + } + + return backoff.Permanent(err) + } + + return backoff.Retry(operation, backoff.NewConstantBackOff(backOff)) +} diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 06406b7c111..d71a60e1c7e 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -54,7 +54,7 @@ func NewService( milestoneVerifier := VerifyMilestoneHeaders blocksVerifier := VerifyBlocks p2pService := p2p.NewService(logger, maxPeers, sentryClient, statusDataProvider.GetStatusData) - execution := newExecutionClient(executionClient) + execution := newExecutionClient(logger, executionClient) signaturesCache, err := lru.NewARC[common.Hash, common.Address](InMemorySignatures) if err != nil { From de34533b9ff68efefa2f342bc3865bc62fcf3f18 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 31 Jan 2025 14:03:06 +0000 Subject: [PATCH 2/3] tidy --- polygon/sync/execution_client.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/polygon/sync/execution_client.go b/polygon/sync/execution_client.go index 9154a8b355d..0cec6ac5075 100644 --- a/polygon/sync/execution_client.go +++ b/polygon/sync/execution_client.go @@ -91,7 +91,7 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc case executionproto.ExecutionStatus_Busy: return ErrExecutionClientBusy // gets retried default: - return backoff.Permanent(fmt.Errorf("executionClient.InsertBlocks failure status: %s", status.String())) + return fmt.Errorf("executionClient.InsertBlocks failure status: %s", status.String()) } }) } @@ -125,8 +125,7 @@ func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Heade case executionproto.ExecutionStatus_Busy: return ErrExecutionClientBusy // gets retried default: - return fmt.Errorf( - "%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateFailure, r.Status, r.ValidationError) + return fmt.Errorf("%w: status=%d, validationErr='%s'", ErrForkChoiceUpdateFailure, r.Status, r.ValidationError) } }) From 5d94d31f5a5293c20f23af032538dd83f29f4394 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 31 Jan 2025 14:06:17 +0000 Subject: [PATCH 3/3] use backoff with ctx --- polygon/sync/execution_client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/polygon/sync/execution_client.go b/polygon/sync/execution_client.go index 0cec6ac5075..705f88f1d2b 100644 --- a/polygon/sync/execution_client.go +++ b/polygon/sync/execution_client.go @@ -78,7 +78,7 @@ func (e *executionClient) InsertBlocks(ctx context.Context, blocks []*types.Bloc Blocks: eth1utils.ConvertBlocksToRPC(blocks), } - return e.retryBusy("insertBlocks", func() error { + return e.retryBusy(ctx, "insertBlocks", func() error { response, err := e.client.InsertBlocks(ctx, request) if err != nil { return err @@ -107,7 +107,7 @@ func (e *executionClient) UpdateForkChoice(ctx context.Context, tip *types.Heade } var latestValidHash common.Hash - err := e.retryBusy("updateForkChoice", func() error { + err := e.retryBusy(ctx, "updateForkChoice", func() error { r, err := e.client.UpdateForkChoice(ctx, &request) if err != nil { return err @@ -174,7 +174,7 @@ func (e *executionClient) GetTd(ctx context.Context, blockNum uint64, blockHash return eth1utils.ConvertBigIntFromRpc(response.GetTd()), nil } -func (e *executionClient) retryBusy(label string, f func() error) error { +func (e *executionClient) retryBusy(ctx context.Context, label string, f func() error) error { backOff := 50 * time.Millisecond logEvery := 5 * time.Second logEveryXAttempt := int64(logEvery / backOff) @@ -196,5 +196,5 @@ func (e *executionClient) retryBusy(label string, f func() error) error { return backoff.Permanent(err) } - return backoff.Retry(operation, backoff.NewConstantBackOff(backOff)) + return backoff.Retry(operation, backoff.WithContext(backoff.NewConstantBackOff(backOff), ctx)) }