Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix block sync auto restart not working as expected #175

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,9 +1383,9 @@ func DefaultSelfRemediationConfig() *SelfRemediationConfig {
P2pNoPeersRestarWindowSeconds: 0,
StatesyncNoPeersRestartWindowSeconds: 0,
BlocksBehindThreshold: 0,
BlocksBehindCheckIntervalSeconds: 30,
BlocksBehindCheckIntervalSeconds: 60,
// 30 minutes
RestartCooldownSeconds: 1800,
RestartCooldownSeconds: 600,
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
// Need at least 2 peers to be considered caught up.
if len(pool.peers) <= 1 {
return false
}

Expand Down
37 changes: 26 additions & 11 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ type Reactor struct {
// store
stateStore sm.Store

blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
consReactor consensusReactor
blockSync *atomicBool
blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
consReactor consensusReactor
blockSync *atomicBool
previousMaxPeerHeight int64

peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
Expand All @@ -94,8 +95,10 @@ type Reactor struct {
syncStartTime time.Time

restartCh chan struct{}
lastRestartTime time.Time
blocksBehindThreshold uint64
blocksBehindCheckInterval time.Duration
restartCooldownSeconds uint64
}

// NewReactor returns new reactor instance.
Expand Down Expand Up @@ -125,8 +128,10 @@ func NewReactor(
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
lastRestartTime: time.Now(),
blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold,
blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second,
restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds,
}

r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
Expand All @@ -150,6 +155,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return err
}
r.initialState = state
r.lastRestartTime = time.Now()

if state.LastBlockHeight != r.store.Height() {
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
Expand Down Expand Up @@ -329,8 +335,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
// autoRestartIfBehind will check if the node is behind the max peer height by
// a certain threshold. If it is, the node will attempt to restart itself
func (r *Reactor) autoRestartIfBehind(ctx context.Context) {
if r.blocksBehindThreshold == 0 {
r.logger.Info("blocks behind threshold is 0, not checking if node is behind")
if r.blocksBehindThreshold == 0 || r.blocksBehindCheckInterval <= 0 {
r.logger.Info("Auto remediation is disabled")
return
}

Expand All @@ -342,15 +348,24 @@ func (r *Reactor) autoRestartIfBehind(ctx context.Context) {
maxPeerHeight := r.pool.MaxPeerHeight()
threshold := int64(r.blocksBehindThreshold)
behindHeight := maxPeerHeight - selfHeight
// No peer info yet so maxPeerHeight will be 0

blockSyncIsSet := r.blockSync.IsSet()
if maxPeerHeight > r.previousMaxPeerHeight {
r.previousMaxPeerHeight = maxPeerHeight
}

// We do not restart if we are not lagging behind, or we are already in block sync mode
if maxPeerHeight == 0 || behindHeight < threshold || blockSyncIsSet {
r.logger.Debug("does not exceed threshold or is already in block sync mode", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight, "blockSyncIsSet", blockSyncIsSet)
continue
}

r.logger.Info("Blocks behind threshold restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)
// Check if we have met cooldown time
if time.Since(r.lastRestartTime).Seconds() < float64(r.restartCooldownSeconds) {
r.logger.Debug("we are lagging behind, going to trigger a restart after cooldown time passes")
continue
}

r.logger.Info("Blocks behind threshold, restarting node", "threshold", threshold, "behindHeight", behindHeight, "maxPeerHeight", maxPeerHeight, "selfHeight", selfHeight)

// Send signal to restart the node
r.blockSync.Set()
Expand Down Expand Up @@ -544,7 +559,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
)
continue

case r.pool.IsCaughtUp():
case r.pool.IsCaughtUp() && r.previousMaxPeerHeight < r.pool.MaxPeerHeight():
r.logger.Info("switching to consensus reactor", "height", height)

case time.Since(lastAdvance) > syncTimeout:
Expand Down