From 5164f08607bc9f49f41c2f0652d5b94f36169af4 Mon Sep 17 00:00:00 2001 From: vgonkivs Date: Thu, 6 Feb 2025 20:35:48 +0200 Subject: [PATCH] add retry logic to grpc client --- core/fetcher.go | 14 +------------- core/listener.go | 5 +---- nodebuilder/core/constructors.go | 24 ++++++++++++++++++++++-- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 96d1f0fbc8..7006507635 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -172,18 +172,11 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V } func (f *BlockFetcher) runSubscriber() (chan types.EventDataSignedBlock, error) { - var ( - subscription coregrpc.BlockAPI_SubscribeNewHeightsClient - err error - ) - signedBlockCh := make(chan types.EventDataSignedBlock, 1) go func() { defer close(f.doneCh) defer close(signedBlockCh) - timeout := time.NewTimer(retrySubscriptionDelay) - defer timeout.Stop() for { select { case <-f.ctx.Done(): @@ -191,15 +184,13 @@ func (f *BlockFetcher) runSubscriber() (chan types.EventDataSignedBlock, error) default: } - subscription, err = f.client.SubscribeNewHeights(f.ctx, &coregrpc.SubscribeNewHeightsRequest{}) + subscription, err := f.client.SubscribeNewHeights(f.ctx, &coregrpc.SubscribeNewHeightsRequest{}) switch { case err == nil: case errors.Is(err, context.Canceled): return default: log.Errorw("fetcher: failed to subscribe to new block events", "err", err) - timeout.Reset(retrySubscriptionDelay) - <-timeout.C continue } @@ -229,9 +220,6 @@ func (f *BlockFetcher) receive( ctxCancel() if err != nil { log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) - // sleeping a bit to avoid retrying instantly and give time for the gRPC connection - // to recover automatically. - time.Sleep(time.Second) continue } select { diff --git a/core/listener.go b/core/listener.go index 959166f4b7..53594185db 100644 --- a/core/listener.go +++ b/core/listener.go @@ -18,10 +18,7 @@ import ( "github.com/celestiaorg/celestia-node/store" ) -var ( - tracer = otel.Tracer("core/listener") - retrySubscriptionDelay = 5 * time.Second -) +var tracer = otel.Tracer("core/listener") // Listener is responsible for listening to Core for // new block events and converting new Core blocks into diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index e4e7593e3f..bf43fb3f27 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -8,9 +8,12 @@ import ( "net" "os" "path/filepath" + "time" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "go.uber.org/fx" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -30,13 +33,30 @@ func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) { } else { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + + retryInterceptor := grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + retryStreamInterceptor := grpc_retry.StreamClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + + opts = append(opts, grpc.WithUnaryInterceptor(retryInterceptor)) + opts = append(opts, grpc.WithStreamInterceptor(retryStreamInterceptor)) + if cfg.XTokenPath != "" { xToken, err := parseTokenPath(cfg.XTokenPath) if err != nil { return nil, err } - opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken))) - opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken))) + opts = append(opts, grpc.WithChainUnaryInterceptor(authInterceptor(xToken), retryInterceptor)) + opts = append(opts, grpc.WithChainStreamInterceptor(authStreamInterceptor(xToken), retryStreamInterceptor)) } endpoint := net.JoinHostPort(cfg.IP, cfg.Port)