Skip to content

Commit

Permalink
add retry logic to grpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Feb 6, 2025
1 parent 7eefd98 commit 5164f08
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
14 changes: 1 addition & 13 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,34 +172,25 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V
}

func (f *BlockFetcher) runSubscriber() (chan types.EventDataSignedBlock, error) {
var (
subscription coregrpc.BlockAPI_SubscribeNewHeightsClient
err error
)

signedBlockCh := make(chan types.EventDataSignedBlock, 1)

go func() {
defer close(f.doneCh)
defer close(signedBlockCh)
timeout := time.NewTimer(retrySubscriptionDelay)
defer timeout.Stop()
for {
select {
case <-f.ctx.Done():
return
default:
}

subscription, err = f.client.SubscribeNewHeights(f.ctx, &coregrpc.SubscribeNewHeightsRequest{})
subscription, err := f.client.SubscribeNewHeights(f.ctx, &coregrpc.SubscribeNewHeightsRequest{})
switch {
case err == nil:
case errors.Is(err, context.Canceled):
return
default:
log.Errorw("fetcher: failed to subscribe to new block events", "err", err)
timeout.Reset(retrySubscriptionDelay)
<-timeout.C
continue
}

Expand Down Expand Up @@ -229,9 +220,6 @@ func (f *BlockFetcher) receive(
ctxCancel()
if err != nil {
log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error())
// sleeping a bit to avoid retrying instantly and give time for the gRPC connection
// to recover automatically.
time.Sleep(time.Second)
continue
}
select {
Expand Down
5 changes: 1 addition & 4 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import (
"github.com/celestiaorg/celestia-node/store"
)

var (
tracer = otel.Tracer("core/listener")
retrySubscriptionDelay = 5 * time.Second
)
var tracer = otel.Tracer("core/listener")

// Listener is responsible for listening to Core for
// new block events and converting new Core blocks into
Expand Down
24 changes: 22 additions & 2 deletions nodebuilder/core/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"net"
"os"
"path/filepath"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -30,13 +33,30 @@ func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) {
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

retryInterceptor := grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(5),
grpc_retry.WithCodes(codes.Unavailable),
grpc_retry.WithBackoff(
grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)),
)
retryStreamInterceptor := grpc_retry.StreamClientInterceptor(
grpc_retry.WithMax(5),
grpc_retry.WithCodes(codes.Unavailable),
grpc_retry.WithBackoff(
grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)),
)

opts = append(opts, grpc.WithUnaryInterceptor(retryInterceptor))
opts = append(opts, grpc.WithStreamInterceptor(retryStreamInterceptor))

if cfg.XTokenPath != "" {
xToken, err := parseTokenPath(cfg.XTokenPath)
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken)))
opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken)))
opts = append(opts, grpc.WithChainUnaryInterceptor(authInterceptor(xToken), retryInterceptor))
opts = append(opts, grpc.WithChainStreamInterceptor(authStreamInterceptor(xToken), retryStreamInterceptor))
}

endpoint := net.JoinHostPort(cfg.IP, cfg.Port)
Expand Down

0 comments on commit 5164f08

Please sign in to comment.