Skip to content

Commit

Permalink
fix(core/fetcher): resubscribe if consensus node goes offline
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Feb 7, 2025
1 parent 87bc6c8 commit b616d20
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

libhead "github.com/celestiaorg/go-header"
)
Expand Down Expand Up @@ -168,14 +169,17 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
}
ctx, cancel := context.WithCancel(ctx)
f.cancel = cancel
f.doneCh = make(chan struct{})
f.isListeningForBlocks.Store(true)

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
f.isListeningForBlocks.Store(false)
return nil, err
}

log.Debug("created a subscription. Start listening for new blocks...")

f.doneCh = make(chan struct{})
signedBlockCh := make(chan types.EventDataSignedBlock)
go func() {
defer close(f.doneCh)
Expand All @@ -189,6 +193,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
resp, err := subscription.Recv()
if err != nil {
log.Errorw("fetcher: error receiving new height", "err", err.Error())
_, ok := status.FromError(err) // parsing the gRPC error
if ok {
// ok means that err contains a gRPC status error.
// move on another round of resubscribing.
return
}
continue
}
withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down

0 comments on commit b616d20

Please sign in to comment.