From 18ab2491c17bf6b50400345d6088c0666254496e Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:41:19 +0100 Subject: [PATCH] chore(statesync): chunk request send timeout + retry --- internal/statesync/chunks.go | 4 +--- internal/statesync/syncer.go | 10 +++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/statesync/chunks.go b/internal/statesync/chunks.go index 2083b64c8..938c3935a 100644 --- a/internal/statesync/chunks.go +++ b/internal/statesync/chunks.go @@ -104,10 +104,8 @@ func (q *chunkQueue) Enqueue(chunkIDs ...[]byte) { func (q *chunkQueue) enqueue(chunkID bytes.HexBytes) { q.requestQueue = append(q.requestQueue, chunkID) - chunk, ok := q.items[chunkID.String()] + _, ok := q.items[chunkID.String()] if ok { - // If the chunk is already in the queue, reset its status to initStatus to retry fetching it. - chunk.status = initStatus return } q.items[chunkID.String()] = &chunkItem{ diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index beeabbb3e..c0a161bfd 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -24,12 +24,15 @@ import ( ) const ( + // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue. chunkTimeout = 2 * time.Minute // minimumDiscoveryTime is the lowest allowable time for a // SyncAny discovery time. minimumDiscoveryTime = 5 * time.Second + // chunkRequestSendTimeout is the timeout sending chunk requests to peers. + chunkRequestSendTimeout = 5 * time.Second dequeueChunkIDTimeoutDefault = 2 * time.Second ) @@ -552,6 +555,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, queue *chu } ID, err := queue.Dequeue() if errors.Is(err, errQueueEmpty) { + s.logger.Debug("fetchChunks queue empty, waiting for chunk", "timeout", dequeueChunkIDTimeout, "err", err) continue } s.logger.Info("Fetching snapshot chunk", @@ -561,6 +565,8 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, queue *chu ticker.Reset(s.retryTimeout) if err := s.requestChunk(ctx, snapshot, ID); err != nil { s.logger.Error("failed to request snapshot chunk", "err", err, "chunkID", ID) + // retry the chunk + s.chunkQueue.Enqueue(ID) return } select { @@ -606,8 +612,10 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunkID t ChunkId: chunkID, }, } + sCtx, cancel := context.WithTimeout(ctx, chunkRequestSendTimeout) + defer cancel() - return s.chunkCh.Send(ctx, msg) + return s.chunkCh.Send(sCtx, msg) } // / finalizeSnapshot sends light block to ABCI app after state sync is done