Skip to content

Commit

Permalink
Fix block sync auto restart not working as expected
Browse files Browse the repository at this point in the history
  • Loading branch information
yzang2019 committed Dec 27, 2023
1 parent 016c1b9 commit ac74ccf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,9 +1383,9 @@ func DefaultSelfRemediationConfig() *SelfRemediationConfig {
P2pNoPeersRestarWindowSeconds: 0,
StatesyncNoPeersRestartWindowSeconds: 0,
BlocksBehindThreshold: 0,
BlocksBehindCheckIntervalSeconds: 30,
BlocksBehindCheckIntervalSeconds: 60,
// 30 minutes
RestartCooldownSeconds: 1800,
RestartCooldownSeconds: 600,
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
// Need at least 2 peers to be considered caught up.
if len(pool.peers) <= 1 {

Check warning on line 214 in internal/blocksync/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/pool.go#L213-L214

Added lines #L213 - L214 were not covered by tests
return false
}

Expand Down
37 changes: 26 additions & 11 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ type Reactor struct {
// store
stateStore sm.Store

blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
consReactor consensusReactor
blockSync *atomicBool
blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
consReactor consensusReactor
blockSync *atomicBool
previousMaxPeerHeight int64

peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
Expand All @@ -94,8 +95,10 @@ type Reactor struct {
syncStartTime time.Time

restartCh chan struct{}
lastRestartTime time.Time
blocksBehindThreshold uint64
blocksBehindCheckInterval time.Duration
restartCooldownSeconds uint64
}

// NewReactor returns new reactor instance.
Expand Down Expand Up @@ -125,8 +128,10 @@ func NewReactor(
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
lastRestartTime: time.Now(),
blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold,
blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second,
restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds,
}

r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
Expand All @@ -150,6 +155,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return err
}
r.initialState = state
r.lastRestartTime = time.Now()

if state.LastBlockHeight != r.store.Height() {
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
Expand Down Expand Up @@ -329,8 +335,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
// autoRestartIfBehind will check if the node is behind the max peer height by
// a certain threshold. If it is, the node will attempt to restart itself
func (r *Reactor) autoRestartIfBehind(ctx context.Context) {
if r.blocksBehindThreshold == 0 {
r.logger.Info("blocks behind threshold is 0, not checking if node is behind")
if r.blocksBehindThreshold == 0 || r.blocksBehindCheckInterval <= 0 {
r.logger.Info("Auto remediation is disabled")
return
}

Expand All @@ -342,15 +348,24 @@ func (r *Reactor) autoRestartIfBehind(ctx context.Context) {
maxPeerHeight := r.pool.MaxPeerHeight()
threshold := int64(r.blocksBehindThreshold)
behindHeight := maxPeerHeight - selfHeight
// No peer info yet so maxPeerHeight will be 0

blockSyncIsSet := r.blockSync.IsSet()
if maxPeerHeight > r.previousMaxPeerHeight {
r.previousMaxPeerHeight = maxPeerHeight
}

// We do not restart if we are not lagging behind, or we are already in block sync mode
if maxPeerHeight == 0 || behindHeight < threshold || blockSyncIsSet {
r.logger.Debug("does not exceed threshold or is already in block sync mode", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight, "blockSyncIsSet", blockSyncIsSet)
continue
}

r.logger.Info("Blocks behind threshold restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)
// Check if we have met cooldown time
if time.Since(r.lastRestartTime).Seconds() < float64(r.restartCooldownSeconds) {
r.logger.Debug("we are lagging behind, going to trigger a restart after cooldown time passes")
continue

Check warning on line 365 in internal/blocksync/reactor.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/reactor.go#L364-L365

Added lines #L364 - L365 were not covered by tests
}

r.logger.Info("Blocks behind threshold, restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)

// Send signal to restart the node
r.blockSync.Set()
Expand Down Expand Up @@ -544,7 +559,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
)
continue

case r.pool.IsCaughtUp():
case r.pool.IsCaughtUp() && r.previousMaxPeerHeight < r.pool.MaxPeerHeight():

Check warning on line 562 in internal/blocksync/reactor.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/reactor.go#L562

Added line #L562 was not covered by tests
r.logger.Info("switching to consensus reactor", "height", height)

case time.Since(lastAdvance) > syncTimeout:
Expand Down

0 comments on commit ac74ccf

Please sign in to comment.