From 823e965cda7f7c21e0948285e6ed698dfa40d528 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Wed, 29 Jan 2025 13:55:18 -0800 Subject: [PATCH] improve error reconnection handling --- .../ingestion/block_tracking_subscriber.go | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/services/ingestion/block_tracking_subscriber.go b/services/ingestion/block_tracking_subscriber.go index 0a635905..0e269486 100644 --- a/services/ingestion/block_tracking_subscriber.go +++ b/services/ingestion/block_tracking_subscriber.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" @@ -114,12 +115,21 @@ func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan model func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) - blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight( - ctx, - height, - flow.BlockStatusFinalized, - ) - if err != nil { + var blockHeadersChan <-chan flow.BlockHeader + var errChan <-chan error + + lastReceivedHeight := height + connect := func(height uint64) error { + var err error + blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight( + ctx, + height, + flow.BlockStatusFinalized, + ) + return err + } + + if err := connect(lastReceivedHeight); err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf( "failed to subscribe for finalized block headers on height: %d, with: %w", @@ -130,7 +140,6 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6 close(eventsChan) return eventsChan } - lastReceivedHeight := height go func() { defer func() { @@ -145,6 +154,7 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6 case blockHeader, ok := <-blockHeadersChan: if !ok { + // typically we receive an error in the errChan before the channels are closes var err error err = errs.ErrDisconnected if ctx.Err() != nil { @@ -180,6 +190,7 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6 case err, ok := <-errChan: if !ok { + // typically we receive an error in the errChan before the channels are closes var err error err = errs.ErrDisconnected if ctx.Err() != nil { @@ -189,26 +200,33 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6 return } - if status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Internal { - blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight( - ctx, - lastReceivedHeight+1, - flow.BlockStatusFinalized, - ) - if err != nil { - eventsChan <- models.NewBlockEventsError( - fmt.Errorf( - "failed to subscribe for finalized block headers on height: %d, with: %w", - height, - err, - ), - ) + switch status.Code(err) { + case codes.NotFound: + // we can get not found when reconnecting after a disconnect/restart before the + // next block is finalized. just wait briefly and try again + select { + case <-ctx.Done(): return + case <-time.After(200 * time.Millisecond): } - } else { + case codes.DeadlineExceeded, codes.Internal: + // these are sometimes returned when the stream is disconnected by a middleware or the server + default: + // skip reconnect on all other errors eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) return } + + if err := connect(lastReceivedHeight + 1); err != nil { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to resubscribe for finalized block headers on height: %d, with: %w", + lastReceivedHeight+1, + err, + ), + ) + return + } } } }()