-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
228 lines (196 loc) · 4.74 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package raft
import (
"fmt"
"log"
"math/rand"
"net"
"net/rpc"
"os"
"sync"
"time"
)
// 实现论文中的两种RPC
type Server struct {
mu sync.Mutex
serverId int
peerIds []int
Cm *ConsensusModule
storage Storage
// rpc 服务
rpcServer *rpc.Server
listener net.Listener
rpcProxy *RPCProxy
peerClients map[int]*rpc.Client
ready <-chan interface{}
quit chan interface{}
commitChan chan<- CommitEntry
wg sync.WaitGroup
}
func NewServer(serverId int, peerIds []int, storage Storage, ready <-chan interface{}, commitChan chan<- CommitEntry) *Server {
return &Server{
mu: sync.Mutex{},
serverId: serverId,
peerIds: peerIds,
peerClients: make(map[int]*rpc.Client),
ready: ready,
quit: make(chan interface{}),
wg: sync.WaitGroup{},
storage: storage,
commitChan: commitChan,
}
}
// 开始服务
func (s *Server) Serve() {
s.mu.Lock()
s.Cm = NewConsensusModule(s.serverId, s.peerIds, s,s.storage, s.ready, s.commitChan)
// 开启一个RPC服务
s.rpcServer = rpc.NewServer()
s.rpcProxy = &RPCProxy{cm: s.Cm}
s.rpcServer.RegisterName("ConsensusModule", s.rpcProxy)
var err error
s.listener, err = net.Listen("tcp", ":0")
if err != nil {
log.Fatalln(err)
}
log.Printf("[%v] listening at %s", s.serverId, s.listener.Addr())
s.mu.Unlock()
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.quit:
return
default:
log.Fatal("accept error:", err)
}
}
s.wg.Add(1)
go func() {
// 处理RPC请求
s.rpcServer.ServeConn(conn)
s.wg.Done()
}()
}
}()
}
func (s *Server) Call(id int, serviceMethod string, args interface{}, reply interface{}) error {
s.mu.Lock()
peerClient := s.peerClients[id]
s.mu.Unlock()
if peerClient != nil {
return peerClient.Call(serviceMethod, args, reply)
} else {
return fmt.Errorf("call client %d after it's closed", id)
}
}
// DisconnectAll closes all the client connections to peers for this server.
func (s *Server) DisconnectAll() {
s.mu.Lock()
defer s.mu.Unlock()
for id := range s.peerClients {
if s.peerClients[id] != nil {
s.peerClients[id].Close()
s.peerClients[id] = nil
}
}
}
// Shutdown closes the server and waits for it to shut down properly.
func (s *Server) Shutdown() {
s.Cm.Stop()
close(s.quit)
s.listener.Close()
s.wg.Wait()
}
func (s *Server) GetListenAddr() net.Addr {
s.mu.Lock()
defer s.mu.Unlock()
return s.listener.Addr()
}
func (s *Server) ConnectToPeer(peerId int, addr net.Addr) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.peerClients[peerId] == nil {
client, err := rpc.Dial(addr.Network(), addr.String())
if err != nil {
return err
}
s.peerClients[peerId] = client
}
return nil
}
// DisconnectPeer disconnects this server from the peer identified by peerId.
func (s *Server) DisconnectPeer(peerId int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.peerClients[peerId] != nil {
err := s.peerClients[peerId].Close()
s.peerClients[peerId] = nil
return err
}
return nil
}
// RPC 代理ConsensusModule ,这代理会代理 两个RPC方法
type RPCProxy struct {
cm *ConsensusModule
}
func (rpp *RPCProxy) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
if len(os.Getenv("RAFT_UNRELIABLE_RPC")) > 0 {
// 测试
dice := rand.Intn(10)
if dice == 9 {
rpp.cm.debugLog("drop RequestVote")
// 放弃这个RPC请求
return fmt.Errorf("RPC failed")
} else if dice == 8 {
rpp.cm.debugLog("delay RequestVote")
time.Sleep(75 * time.Millisecond)
// 延迟这个RPC
}
} else {
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
}
return rpp.cm.RequestVote(args, reply)
}
func (rpp *RPCProxy) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
if len(os.Getenv("RAFT_UNRELIABLE_RPC")) > 0 {
dice := rand.Intn(10)
if dice == 9 {
rpp.cm.debugLog("drop AppendEntries")
return fmt.Errorf("RPC failed")
} else if dice == 8 {
rpp.cm.debugLog("delay AppendEntries")
time.Sleep(75 * time.Millisecond)
}
} else {
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
}
return rpp.cm.AppendEntries(args, reply)
}
// 论文实现
type RequestVoteArgs struct {
Term int // 候选者的任期
CandidateId int // 候选者的id
LastLogIndex int // 日志中 最新的index
LastLogTerm int // 日志中最新的任期
}
type RequestVoteReply struct {
Term int //自己的任期
VoteGranted bool // 是否投票
}
// 论文实现
type AppendEntriesArgs struct {
Term int
LeaderId int
// 每次心跳会带上试探信息
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}