Skip to content

Commit

Permalink
Merge pull request #28 from danl5/fix_issue27
Browse files Browse the repository at this point in the history
Resolve the issue of failing to correctly elect a leader.
  • Loading branch information
danl5 authored Jul 13, 2024
2 parents 681ace0 + 9a2bc1e commit 8fb6291
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 59 deletions.
4 changes: 3 additions & 1 deletion examples/onenode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func newElect() (*goelect.Elect, error) {
peerNodes = append(peerNodes, goelect.Node{Address: pa, ID: pa})
}

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))

// rpc transport
rpcTransport, err := rpc.NewRPC(logger)
Expand Down
48 changes: 48 additions & 0 deletions examples/onenode/node.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash

node_addrs=("127.0.0.1:9981" "127.0.0.1:9982" "127.0.0.1:9983")
peers="127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983"

elect_binary="node"
pid_file="./node_pids"

start_nodes() {
echo "Starting nodes..."
for node_addr in "${node_addrs[@]}"; do
echo "Starting node with address $node_addr"
log_file="${node_addr//:/_}.log"
./$elect_binary --nodeaddr=$node_addr --peers=$peers 2>&1 | tee "$log_file" &
echo $! >> $pid_file
done
echo "All nodes started."
}

stop_nodes() {
echo "Stopping nodes..."
if [ -f $pid_file ]; then
while read -r pid; do
echo "Stopping process $pid"
kill -9 $pid
done < $pid_file
rm $pid_file
echo "All nodes stopped."
else
echo "No nodes are running."
fi
}

show_help() {
echo "Usage: $0 {start|stop}"
}

case "$1" in
start)
start_nodes
;;
stop)
stop_nodes
;;
*)
show_help
;;
esac
152 changes: 115 additions & 37 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ type Consensus struct {
candidateChan chan struct{}
// shutdownChan is used to send shutdown event
shutdownChan chan struct{}

// preEventState holds the node state before an event is processed.
// This allows for comparison and analysis of state changes after the event handling.
preEventState model.NodeState

// inLeaderState indicates whether the current node is in the leader state.
inLeaderState bool
// inFollowerState indicates whether the current node is in the follower state.
inFollowerState bool
// inCandidateState indicates whether the current node is in the candidate state.
inCandidateState bool
// inDownState indicates whether the current node is in a down state.
inDownState bool
}

// Run starts the consensus
Expand Down Expand Up @@ -157,6 +170,7 @@ func (c *Consensus) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartBe
c.logger.Debug("receive heartbeat", "from", args.NodeId)

if c.term > args.Term {
c.logger.Info("peer term is behind self", "peer term", args.Term, "self term", c.term)
// term in the request is behind this node
model.HBResponse(reply, false, common.HeartbeatExpired.String())
return nil
Expand All @@ -165,17 +179,17 @@ func (c *Consensus) HeartBeat(args *model.HeartBeatRequest, reply *model.HeartBe
// update term of this node
c.setTerm(args.Term)

switch model.NodeState(c.fsm.Current()) {
case model.NodeStateLeader:
switch {
case c.ensureState(model.NodeStateLeader):
// leave leader state
c.sendEvent(model.EventLeaveLeader)
case model.NodeStateFollower:
c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader)
case c.ensureState(model.NodeStateFollower):
// send heartbeat to handler
c.followerChan <- struct{}{}
case model.NodeStateCandidate:
case c.ensureState(model.NodeStateCandidate):
// receive a new leader
c.sendEvent(model.EventNewLeader)
case model.NodeStateDown:
c.sendEvent(model.NodeStateCandidate, model.EventNewLeader)
case c.ensureState(model.NodeStateDown):
}

model.HBResponse(reply, true, common.HeartbeatOk.String())
Expand All @@ -191,27 +205,27 @@ func (c *Consensus) RequestVote(args *model.RequestVoteRequest, reply *model.Req
return nil
}

switch model.NodeState(c.fsm.Current()) {
case model.NodeStateLeader:
switch {
case c.ensureState(model.NodeStateLeader):
if args.Term <= c.term {
model.VoteResponse(reply, c.node.Node, false, common.VoteLeaderExist.String())
return nil
}
// term in the request is newer, leaves leader state
c.sendEvent(model.EventLeaveLeader)
case model.NodeStateFollower:
c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader)
case c.ensureState(model.NodeStateFollower):
if args.Term < c.term {
model.VoteResponse(reply, c.node.Node, false, common.VoteTermExpired.String())
return nil
}
case model.NodeStateCandidate:
case c.ensureState(model.NodeStateCandidate):
if args.Term <= c.term {
model.VoteResponse(reply, c.node.Node, false, common.VoteHaveVoted.String())
return nil
}
// term in the request is newer, vote and switch to follower state
c.sendEvent(model.EventNewTerm)
case model.NodeStateDown:
c.sendEvent(model.NodeStateCandidate, model.EventNewTerm)
case c.ensureState(model.NodeStateDown):
}

// update term cache
Expand Down Expand Up @@ -358,9 +372,43 @@ func (c *Consensus) buildHeaders() model.Header {
return model.Header{Node: c.node.Node}
}

func (c *Consensus) ensureState(state model.NodeState) bool {
if model.NodeState(c.fsm.Current()) != state {
return false
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

for {
select {
case <-ctx.Done():
c.logger.Error("wait for state ready timeout", "state", state)
return false
default:
}
stateReady := false
switch state {
case model.NodeStateLeader:
stateReady = c.inLeaderState
case model.NodeStateFollower:
stateReady = c.inFollowerState
case model.NodeStateCandidate:
stateReady = c.inCandidateState
case model.NodeStateDown:
stateReady = c.inDownState
}

if stateReady {
break
}
time.Sleep(500 * time.Microsecond)
}

return true
}

func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) {
c.logger.Info("become leader")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.leaderChan = make(chan struct{}, 1)
go func() {
err := c.runLeader(ctx)
Expand All @@ -369,19 +417,28 @@ func (c *Consensus) enterLeader(ctx context.Context, ev *fsm.Event) {
return
}
}()
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.inLeaderState = true
}

func (c *Consensus) runLeader(_ context.Context) error {
tk := time.NewTicker(c.cfg.HeartBeatInterval)
defer tk.Stop()

for {
select {
case <-c.leaderChan:
c.logger.Info("leave leader")
return nil
default:
}

var errCount int
// send heartbeat to followers
c.sendHeartBeat(&errCount)
// leaves leader state if the number of errors is more than half
if errCount >= c.countVoteNode()/2+1 {
c.sendEvent(model.EventLeaveLeader)
c.sendEvent(model.NodeStateLeader, model.EventLeaveLeader)
}

select {
Expand All @@ -397,11 +454,11 @@ func (c *Consensus) leaveLeader(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave leader")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.leaderChan)
c.inLeaderState = false
}

func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) {
c.logger.Info("become follower")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.followerChan = make(chan struct{}, 1)
go func() {
err := c.runFollower(ctx)
Expand All @@ -410,6 +467,8 @@ func (c *Consensus) enterFollower(ctx context.Context, ev *fsm.Event) {
return
}
}()
c.inFollowerState = true
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
}

func (c *Consensus) runFollower(_ context.Context) error {
Expand All @@ -436,7 +495,7 @@ func (c *Consensus) runFollower(_ context.Context) error {
case <-ts.C:
c.logger.Info("leave follower state due to heartbeat timeout")
// heartbeat timeout
c.sendEvent(model.EventHeartbeatTimeout)
c.sendEvent(model.NodeStateFollower, model.EventHeartbeatTimeout)
return nil
}
}
Expand All @@ -446,11 +505,11 @@ func (c *Consensus) leaveFollower(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave follower")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.followerChan)
c.inFollowerState = false
}

func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) {
c.logger.Info("become candidate")
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
c.candidateChan = make(chan struct{}, 1)
go func() {
err := c.runCandidate(ctx)
Expand All @@ -459,6 +518,8 @@ func (c *Consensus) enterCandidate(ctx context.Context, ev *fsm.Event) {
return
}
}()
c.inCandidateState = true
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
}

func (c *Consensus) runCandidate(ctx context.Context) error {
Expand Down Expand Up @@ -507,7 +568,7 @@ func (c *Consensus) tryToBecomeLeader(_ context.Context) error {
// become a leader when receive more than half of the votes
if voteCount >= c.countVoteNode()/2+1 {
c.logger.Info("received more than half of the votes, try to become leader")
c.sendEvent(model.EventMajorityVotes)
c.sendEvent(model.NodeStateCandidate, model.EventMajorityVotes)
return nil
}
select {
Expand All @@ -520,7 +581,7 @@ func (c *Consensus) tryToBecomeLeader(_ context.Context) error {
voteCount += 1
if voteCount >= c.countVoteNode()/2+1 {
c.logger.Info("received more than half of the votes, become leader")
c.sendEvent(model.EventMajorityVotes)
c.sendEvent(model.NodeStateCandidate, model.EventMajorityVotes)
return nil
}
case <-c.candidateChan:
Expand All @@ -543,41 +604,54 @@ func (c *Consensus) leaveCandidate(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave candidate")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.candidateChan)
c.inCandidateState = false
}

func (c *Consensus) enterShutdown(_ context.Context, ev *fsm.Event) {
c.logger.Info("become shutdown")
c.inDownState = true
c.sendNodeStateTransition(model.NodeState(ev.Dst), model.NodeState(ev.Src), model.TransitionTypeEnter)
}

func (c *Consensus) leaveShutdown(_ context.Context, ev *fsm.Event) {
c.logger.Info("leave shutdown")
c.sendNodeStateTransition(model.NodeState(ev.Src), model.NodeState(ev.Dst), model.TransitionTypeLeave)
close(c.shutdownChan)
c.inDownState = false
}

func (c *Consensus) sendEvent(ev model.NodeEvent) {
func (c *Consensus) sendEvent(currentState model.NodeState, ev model.NodeEvent) {
if currentState == c.preEventState {
c.logger.Warn("event occurring simultaneously under the same state,ignore it",
"state", currentState, "event", ev)
return
}
c.preEventState = currentState
c.eventChan <- ev
c.logger.Debug("node event", "event", ev.String())
}

func (c *Consensus) runEventHandler() {

handler := func(ev model.NodeEvent) {
// check if the event is legal
ok := c.fsm.Can(ev.String())
if !ok {
c.logger.Error("wrong event", "current state", c.fsm.Current(), "event", ev.String())
// faulty state migration is unacceptable
panic("unrecoverable error: wrong state transition")
}

err := c.fsm.Event(context.TODO(), ev.String())
if err != nil {
c.logger.Error("error state transition", "current state", c.fsm.Current(), "event", ev.String())
// faulty state migration is unacceptable
panic("unrecoverable error: wrong state transition")
}
}
go func() {
for ev := range c.eventChan {
// check if the event is legal
ok := c.fsm.Can(ev.String())
if !ok {
c.logger.Error("wrong event", "current state", c.fsm.Current(), "event", ev.String())
// faulty state migration is unacceptable
panic("unrecoverable error: wrong state transition")
}

err := c.fsm.Event(context.TODO(), ev.String())
if err != nil {
c.logger.Error("error state transition", "current state", c.fsm.Current(), "event", ev.String())
// faulty state migration is unacceptable
panic("unrecoverable error: wrong state transition")
}
handler(ev)
}
}()
}
Expand Down Expand Up @@ -638,14 +712,18 @@ func (c *Consensus) sendRequestVote(voteChan chan model.Node) error {

peerID := peer.ID
g.Go(func() error {
if !c.ensureState(model.NodeStateCandidate) {
return nil
}

c.logger.Info("send vote request to peer", "peer", peerID)
resp := &model.Response{}
// send vote request
err := c.transport.SendRequest(peerID, &model.Request{
Header: c.buildHeaders(),
CommandCode: model.RequestVote,
Command: model.RequestVoteRequest{
NodeId: peerID,
NodeId: c.node.ID,
Term: c.term,
NodeAddr: c.node.Address,
},
Expand Down
Loading

0 comments on commit 8fb6291

Please sign in to comment.