Skip to content

Commit

Permalink
checker: replace down check with disconnect check when fixing orphan …
Browse files Browse the repository at this point in the history
…peer (#7294)

close #7249

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 3, 2023
1 parent 01fb56b commit 689fcbe
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 25 deletions.
3 changes: 3 additions & 0 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ var (
// tikv's store heartbeat for a short time, maybe caused by process restart or
// temporary network failure.
func (s *StoreInfo) IsDisconnected() bool {
if s == nil {
return true
}
return s.DownTime() > storeDisconnectDuration
}

Expand Down
60 changes: 37 additions & 23 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer

isUnhealthyPeer := func(id uint64) bool {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
Expand All @@ -461,31 +461,41 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
}
return false
}

isDisconnectedPeer := func(p *metapb.Peer) bool {
// avoid to meet down store when fix orphan peers,
// Isdisconnected is more strictly than IsUnhealthy.
return c.cluster.GetStore(p.GetStoreId()).IsDisconnected()
}

checkDownPeer := func(peers []*metapb.Peer) (*metapb.Peer, bool) {
for _, p := range peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
return p, true
}
return nil, true
}
if isDisconnectedPeer(p) {
return p, true
}
}
return nil, false
}

// remove orphan peers only when all rules are satisfied (count+role) and all peers selected
// by RuleFits is not pending or down.
var pinDownPeer *metapb.Peer
hasUnhealthyFit := false
loopFits:
for _, rf := range fit.RuleFits {
if !rf.IsSatisfied() {
hasUnhealthyFit = true
break
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
// avoid to meet down store when fix orpahn peers,
// Isdisconnected is more strictly than IsUnhealthy.
if c.cluster.GetStore(p.GetStoreId()).IsDisconnected() {
hasUnhealthyFit = true
pinDownPeer = p
break loopFits
}
pinDownPeer, hasUnhealthyFit = checkDownPeer(rf.Peers)
if hasUnhealthyFit {
break
}
}

Expand All @@ -502,15 +512,15 @@ loopFits:
continue
}
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
if isUnhealthyPeer(orphanPeer.GetId()) || isDisconnectedPeer(orphanPeer) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
// down peer's store should be disconnected
if !isDisconnectedPeer(pinDownPeer) {
continue
}
// check if down peer can replace with orphan peer.
Expand All @@ -525,7 +535,7 @@ loopFits:
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Voter &&
c.cluster.GetStore(pinDownPeer.GetStoreId()).IsDisconnected() && !dstStore.IsDisconnected():
isDisconnectedPeer(pinDownPeer) && !dstStore.IsDisconnected():
return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId())
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
Expand All @@ -542,7 +552,11 @@ loopFits:
for _, orphanPeer := range fit.OrphanPeers {
if isUnhealthyPeer(orphanPeer.GetId()) {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
return operator.CreateRemovePeerOperator("remove-unhealthy-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if isDisconnectedPeer(orphanPeer) {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-disconnected-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if hasHealthPeer {
// there already exists a healthy orphan peer, so we can remove other orphan Peers.
Expand Down
176 changes: 174 additions & 2 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (suite *ruleCheckerTestSuite) TestFixToManyOrphanPeers() {
suite.cluster.PutRegion(region)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore)
}

Expand Down Expand Up @@ -702,7 +702,7 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.cluster.PutRegion(testRegion)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.IsType(remove, op.Step(0))
// Ref #3521
suite.cluster.SetStoreOffline(2)
Expand All @@ -723,6 +723,178 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799
func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged() {
// init cluster with 5 replicas
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
storeIDs := []uint64{1, 2, 3, 4, 5}
suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...)
rule := &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 5,
StartKey: []byte{},
EndKey: []byte{},
}
suite.ruleManager.SetRule(rule)
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)

// set store 1, 2 to disconnected
suite.cluster.SetStoreDisconnect(1)
suite.cluster.SetStoreDisconnect(2)

// change rule to 3 replicas
rule = &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 3,
StartKey: []byte{},
EndKey: []byte{},
Override: true,
}
suite.ruleManager.SetRule(rule)

// remove store 1 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 2)
newLeaderID := op.Step(0).(operator.TransferLeader).ToStore
removedPeerID := op.Step(1).(operator.RemovePeer).FromStore
r1 := suite.cluster.GetRegion(1)
r1 = r1.Clone(
core.WithLeader(r1.GetPeer(newLeaderID)),
core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 4)

// remove store 2 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 3)
for _, p := range r1.GetPeers() {
suite.NotEqual(p.GetStoreId(), 1)
suite.NotEqual(p.GetStoreId(), 2)
}
}

// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799
func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged2() {
// init cluster with 5 voters and 1 learner
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLabelsStore(6, 1, map[string]string{"host": "host6"})
storeIDs := []uint64{1, 2, 3, 4, 5}
suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...)
r1 := suite.cluster.GetRegion(1)
r1 = r1.Clone(core.WithAddPeer(&metapb.Peer{Id: 6, StoreId: 6, Role: metapb.PeerRole_Learner}))
suite.cluster.PutRegion(r1)
err := suite.ruleManager.SetRules([]*placement.Rule{
{
GroupID: "pd",
ID: "default",
Index: 100,
Override: true,
Role: placement.Voter,
Count: 5,
IsWitness: false,
},
{
GroupID: "pd",
ID: "r1",
Index: 100,
Override: false,
Role: placement.Learner,
Count: 1,
IsWitness: false,
},
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)

// set store 1, 2 to disconnected
suite.cluster.SetStoreDisconnect(1)
suite.cluster.SetStoreDisconnect(2)
suite.cluster.SetStoreDisconnect(3)

// change rule to 3 replicas
suite.ruleManager.DeleteRuleGroup("pd")
suite.ruleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 2,
StartKey: []byte{},
EndKey: []byte{},
Override: true,
})

// remove store 1 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 2)
newLeaderID := op.Step(0).(operator.TransferLeader).ToStore
removedPeerID := op.Step(1).(operator.RemovePeer).FromStore
r1 = suite.cluster.GetRegion(1)
r1 = r1.Clone(
core.WithLeader(r1.GetPeer(newLeaderID)),
core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 5)

// remove store 2 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 4)
for _, p := range r1.GetPeers() {
fmt.Println(p.GetStoreId(), p.Role.String())
}

// remove store 3 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 3)

for _, p := range r1.GetPeers() {
suite.NotEqual(p.GetStoreId(), 1)
suite.NotEqual(p.GetStoreId(), 2)
suite.NotEqual(p.GetStoreId(), 3)
}
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
Expand Down

0 comments on commit 689fcbe

Please sign in to comment.