Skip to content

Commit

Permalink
chore(statesync): chunk request send timeout + retry
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Feb 3, 2025
1 parent 3d39ee0 commit 18ab249
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
4 changes: 1 addition & 3 deletions internal/statesync/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ func (q *chunkQueue) Enqueue(chunkIDs ...[]byte) {

func (q *chunkQueue) enqueue(chunkID bytes.HexBytes) {
q.requestQueue = append(q.requestQueue, chunkID)
chunk, ok := q.items[chunkID.String()]
_, ok := q.items[chunkID.String()]
if ok {
// If the chunk is already in the queue, reset its status to initStatus to retry fetching it.
chunk.status = initStatus
return
}
q.items[chunkID.String()] = &chunkItem{
Expand Down
10 changes: 9 additions & 1 deletion internal/statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
)

const (

// chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
chunkTimeout = 2 * time.Minute

// minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second
// chunkRequestSendTimeout is the timeout sending chunk requests to peers.
chunkRequestSendTimeout = 5 * time.Second

dequeueChunkIDTimeoutDefault = 2 * time.Second
)
Expand Down Expand Up @@ -552,6 +555,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, queue *chu
}
ID, err := queue.Dequeue()
if errors.Is(err, errQueueEmpty) {
s.logger.Debug("fetchChunks queue empty, waiting for chunk", "timeout", dequeueChunkIDTimeout, "err", err)
continue
}
s.logger.Info("Fetching snapshot chunk",
Expand All @@ -561,6 +565,8 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, queue *chu
ticker.Reset(s.retryTimeout)
if err := s.requestChunk(ctx, snapshot, ID); err != nil {
s.logger.Error("failed to request snapshot chunk", "err", err, "chunkID", ID)
// retry the chunk
s.chunkQueue.Enqueue(ID)
return
}
select {
Expand Down Expand Up @@ -606,8 +612,10 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunkID t
ChunkId: chunkID,
},
}
sCtx, cancel := context.WithTimeout(ctx, chunkRequestSendTimeout)
defer cancel()

return s.chunkCh.Send(ctx, msg)
return s.chunkCh.Send(sCtx, msg)
}

// / finalizeSnapshot sends light block to ABCI app after state sync is done
Expand Down

0 comments on commit 18ab249

Please sign in to comment.