-
Notifications
You must be signed in to change notification settings - Fork 1
/
node.go
147 lines (130 loc) · 3.61 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package raft
const (
// 来自 Leader 的日志复制请求
AppendEntryRpc rpcType = iota
// 来自 Candidate 的投票请求
RequestVoteRpc
// 来自 Leader 的安装快照请求
InstallSnapshotRpc
// 来自客户端的安装命令请求
ApplyCommandRpc
// 来自客户端的成员变更请求
ChangeConfigRpc
// 来自客户端的领导权转移请求
TransferLeadershipRpc
// 来自客户端的添加 Learner 节点请求
AddLearnerRpc
)
type rpc struct {
rpcType rpcType
req interface{}
res chan rpcReply
}
type rpcReply struct {
res interface{}
err error
}
// 代表了一个当前节点
type Node struct {
raft *raft
config Config // 节点配置对象
rpcCh chan rpc
}
func NewNode(config Config) *Node {
return &Node{
raft: newRaft(config),
config: config,
rpcCh: make(chan rpc),
}
}
func (nd *Node) Run() {
// 开启 raft 循环
nd.raft.raftRun(nd.rpcCh)
}
// 客户端查询当前节点是否是 Leader 节点
func (nd *Node) IsLeader() bool {
return nd.raft.isLeader()
}
// 客户端添加角色变更观察器
func (nd *Node) AddRoleObserver(ob chan RoleStage) {
nd.raft.addRoleObserver(ob)
}
// 客户端查询集群 Leader 地址
func (nd *Node) GetLeader() NodeAddr {
return nd.raft.peerState.getLeader().Addr
}
// Follower 和 Candidate 开放的 rpc接口,由 Leader 调用
// 客户端接收到请求后,调用此方法
func (nd *Node) AppendEntries(args AppendEntry, res *AppendEntryReply) error {
if msg := nd.sendRpc(AppendEntryRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(AppendEntryReply)
return nil
}
}
// Follower 和 Candidate 开放的 rpc 接口,由 Candidate 调用
// 客户端接收到请求后,调用此方法
func (nd *Node) RequestVote(args RequestVote, res *RequestVoteReply) error {
if msg := nd.sendRpc(RequestVoteRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(RequestVoteReply)
return nil
}
}
// Follower 开放的 rpc 接口,由 Leader 调用
// 客户端接收到请求后,调用此方法
func (nd *Node) InstallSnapshot(args InstallSnapshot, res *InstallSnapshotReply) error {
if msg := nd.sendRpc(InstallSnapshotRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(InstallSnapshotReply)
return nil
}
}
// Leader 开放的 rpc 接口,由客户端调用
func (nd *Node) ApplyCommand(args ApplyCommand, res *ApplyCommandReply) error {
if msg := nd.sendRpc(ApplyCommandRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(ApplyCommandReply)
return nil
}
}
// Leader 开放的 rpc 接口,由客户端调用,添加新配置
func (nd *Node) ChangeConfig(args ChangeConfig, res *ChangeConfigReply) error {
if msg := nd.sendRpc(ChangeConfigRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(ChangeConfigReply)
return nil
}
}
// Leader 开放的 rpc 接口,由客户端调用,转移领导权
func (nd *Node) TransferLeadership(args TransferLeadership, res *TransferLeadershipReply) error {
if msg := nd.sendRpc(TransferLeadershipRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(TransferLeadershipReply)
return nil
}
}
// Leader 开放的 rpc 接口,由客户端调用,添加新的 Learner 节点
func (nd *Node) AddLearner(args AddLearner, res *AddLearnerReply) error {
if msg := nd.sendRpc(AddLearnerRpc, args); msg.err != nil {
return msg.err
} else {
*res = msg.res.(AddLearnerReply)
return nil
}
}
func (nd *Node) sendRpc(rpcType rpcType, args interface{}) rpcReply {
rpcMsg := rpc{
rpcType: rpcType,
req: args,
res: make(chan rpcReply),
}
nd.rpcCh <- rpcMsg
return <- rpcMsg.res
}