Skip to content

Commit

Permalink
raft: extract progress tracking into own component
Browse files Browse the repository at this point in the history
The Progress maps contain both the active configuration and information
about the replication status. By pulling it into its own component, this
becomes easier to unit test and also clarifies the code, which will see
changes as etcd-io#7625 is addressed.

More functionality will move into `prs` in self-contained follow-up commits.
  • Loading branch information
tbg committed May 21, 2019
1 parent 71881a4 commit dbac67e
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 152 deletions.
46 changes: 45 additions & 1 deletion raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package raft

import "fmt"
import (
"fmt"
"sort"
)

const (
ProgressStateProbe ProgressStateType = iota
Expand Down Expand Up @@ -283,3 +286,44 @@ func (in *inflights) reset() {
in.count = 0
in.start = 0
}

// prs tracks the currently active configuration and the information known about
// the nodes and learners in it. In particular, it tracks the match index for
// each peer which in turn allows reasoning about the committed index.
type prs struct {
nodes map[uint64]*Progress
learners map[uint64]*Progress
matchBuf uint64Slice
}

func makePRS() prs {
return prs{
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
}
}

func (p *prs) quorum() int {
return len(p.nodes)/2 + 1
}

func (p *prs) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(p.matchBuf) < len(p.nodes) {
p.matchBuf = make(uint64Slice, len(p.nodes))
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for _, pr := range p.nodes {
p.matchBuf[idx] = pr.Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
}

func (p *prs) removeAny(id uint64) {
delete(p.nodes, id)
delete(p.learners, id)
}
90 changes: 34 additions & 56 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ type raft struct {
maxMsgSize uint64
maxUncommittedSize uint64
maxInflight int
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice
prs prs

state StateType

Expand Down Expand Up @@ -350,8 +348,7 @@ func newRaft(c *Config) *raft {
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: make(map[uint64]*Progress),
learnerPrs: make(map[uint64]*Progress),
prs: makePRS(),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand All @@ -361,13 +358,13 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
}
for _, p := range peers {
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
r.prs.nodes[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
}
for _, p := range learners {
if _, ok := r.prs[p]; ok {
if _, ok := r.prs.nodes[p]; ok {
panic(fmt.Sprintf("node %x is in both learner and peer list", p))
}
r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
r.prs.learners[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
if r.id == p {
r.isLearner = true
}
Expand Down Expand Up @@ -403,20 +400,18 @@ func (r *raft) hardState() pb.HardState {
}
}

func (r *raft) quorum() int { return len(r.prs)/2 + 1 }

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
for id := range r.prs {
nodes := make([]uint64, 0, len(r.prs.nodes))
for id := range r.prs.nodes {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (r *raft) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(r.learnerPrs))
for id := range r.learnerPrs {
nodes := make([]uint64, 0, len(r.prs.learners))
for id := range r.prs.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
Expand Down Expand Up @@ -458,11 +453,11 @@ func (r *raft) send(m pb.Message) {
}

func (r *raft) getProgress(id uint64) *Progress {
if pr, ok := r.prs[id]; ok {
if pr, ok := r.prs.nodes[id]; ok {
return pr
}

return r.learnerPrs[id]
return r.prs.learners[id]
}

// sendAppend sends an append RPC with new entries (if any) and the
Expand Down Expand Up @@ -558,11 +553,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
}

func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
for id, pr := range r.prs {
for id, pr := range r.prs.nodes {
f(id, pr)
}

for id, pr := range r.learnerPrs {
for id, pr := range r.prs.learners {
f(id, pr)
}
}
Expand Down Expand Up @@ -602,19 +597,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(r.matchBuf) < len(r.prs) {
r.matchBuf = make(uint64Slice, len(r.prs))
}
r.matchBuf = r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
r.matchBuf[idx] = p.Match
idx++
}
sort.Sort(&r.matchBuf)
mci := r.matchBuf[len(r.matchBuf)-r.quorum()]
mci := r.prs.committed()
return r.raftLog.maybeCommit(mci, r.Term)
}

Expand Down Expand Up @@ -755,7 +738,7 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs[r.id].becomeReplicate()
r.prs.nodes[r.id].becomeReplicate()

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
Expand Down Expand Up @@ -790,7 +773,7 @@ func (r *raft) campaign(t CampaignType) {
voteMsg = pb.MsgVote
term = r.Term
}
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if r.prs.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
Expand All @@ -800,7 +783,7 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs {
for id := range r.prs.nodes {
if id == r.id {
continue
}
Expand Down Expand Up @@ -994,7 +977,7 @@ func stepLeader(r *raft, m pb.Message) error {
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if _, ok := r.prs[r.id]; !ok {
if _, ok := r.prs.nodes[r.id]; !ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
Expand Down Expand Up @@ -1024,7 +1007,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.bcastAppend()
return nil
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.prs.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
Expand Down Expand Up @@ -1134,7 +1117,7 @@ func stepLeader(r *raft, m pb.Message) error {
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
if ackCount < r.prs.quorum() {
return nil
}

Expand Down Expand Up @@ -1232,8 +1215,8 @@ func stepCandidate(r *raft, m pb.Message) error {
r.handleSnapshot(m)
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.quorum() {
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.prs.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.prs.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
Expand Down Expand Up @@ -1370,8 +1353,8 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)

r.raftLog.restore(s)
r.prs = make(map[uint64]*Progress)
r.learnerPrs = make(map[uint64]*Progress)
r.prs.nodes = make(map[uint64]*Progress)
r.prs.learners = make(map[uint64]*Progress)
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
Expand All @@ -1392,7 +1375,7 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
_, ok := r.prs.nodes[r.id]
return ok
}

Expand Down Expand Up @@ -1422,9 +1405,9 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
}

// change Learner to Voter, use origin Learner progress
delete(r.learnerPrs, id)
delete(r.prs.learners, id)
pr.IsLearner = false
r.prs[id] = pr
r.prs.nodes[id] = pr
}

if r.id == id {
Expand All @@ -1439,10 +1422,10 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
}

func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.prs.removeAny(id)

// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
return
}

Expand All @@ -1459,20 +1442,15 @@ func (r *raft) removeNode(id uint64) {

func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
if !isLearner {
delete(r.learnerPrs, id)
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
delete(r.prs.learners, id)
r.prs.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
return
}

if _, ok := r.prs[id]; ok {
if _, ok := r.prs.nodes[id]; ok {
panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
}
r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
}

func (r *raft) delProgress(id uint64) {
delete(r.prs, id)
delete(r.learnerPrs, id)
r.prs.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
}

func (r *raft) loadState(state pb.HardState) {
Expand Down Expand Up @@ -1515,7 +1493,7 @@ func (r *raft) checkQuorumActive() bool {
pr.RecentActive = false
})

return act >= r.quorum()
return act >= r.prs.quorum()
}

func (r *raft) sendTimeoutNow(to uint64) {
Expand Down
6 changes: 3 additions & 3 deletions raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs[2]
pr2 := r.prs.nodes[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs[2]
pr2 := r.prs.nodes[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs[2]
pr2 := r.prs.nodes[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down
Loading

0 comments on commit dbac67e

Please sign in to comment.