Skip to content

Commit

Permalink
improve error reconnection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed Jan 29, 2025
1 parent d68f2ba commit 823e965
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions services/ingestion/block_tracking_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
Expand Down Expand Up @@ -114,12 +115,21 @@ func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan model
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
height,
flow.BlockStatusFinalized,
)
if err != nil {
var blockHeadersChan <-chan flow.BlockHeader
var errChan <-chan error

lastReceivedHeight := height
connect := func(height uint64) error {
var err error
blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
height,
flow.BlockStatusFinalized,
)
return err
}

if err := connect(lastReceivedHeight); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to subscribe for finalized block headers on height: %d, with: %w",
Expand All @@ -130,7 +140,6 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
close(eventsChan)
return eventsChan
}
lastReceivedHeight := height

go func() {
defer func() {
Expand All @@ -145,6 +154,7 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6

case blockHeader, ok := <-blockHeadersChan:
if !ok {
// typically we receive an error in the errChan before the channels are closes
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
Expand Down Expand Up @@ -180,6 +190,7 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6

case err, ok := <-errChan:
if !ok {
// typically we receive an error in the errChan before the channels are closes
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
Expand All @@ -189,26 +200,33 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
return
}

if status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Internal {
blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight(
ctx,
lastReceivedHeight+1,
flow.BlockStatusFinalized,
)
if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to subscribe for finalized block headers on height: %d, with: %w",
height,
err,
),
)
switch status.Code(err) {
case codes.NotFound:
// we can get not found when reconnecting after a disconnect/restart before the
// next block is finalized. just wait briefly and try again
select {
case <-ctx.Done():
return
case <-time.After(200 * time.Millisecond):
}
} else {
case codes.DeadlineExceeded, codes.Internal:
// these are sometimes returned when the stream is disconnected by a middleware or the server
default:
// skip reconnect on all other errors
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
return
}

if err := connect(lastReceivedHeight + 1); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to resubscribe for finalized block headers on height: %d, with: %w",
lastReceivedHeight+1,
err,
),
)
return
}
}
}
}()
Expand Down

0 comments on commit 823e965

Please sign in to comment.