-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprophet_leader.go
120 lines (95 loc) · 2.17 KB
/
prophet_leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package prophet
import (
"context"
"encoding/json"
"math"
"sync/atomic"
"time"
)
var (
loopInterval = 100 * time.Millisecond
)
// Node is prophet info
type Node struct {
Name string `json:"name"`
Addr string `json:"addr"`
}
func mustUnmarshal(data []byte) *Node {
value := &Node{}
err := json.Unmarshal(data, value)
if err != nil {
log.Fatalf("unmarshal leader node failed with %+v", err)
}
return value
}
func (n *Node) marshal() string {
data, _ := json.Marshal(n)
return string(data)
}
func (p *defaultProphet) startLeaderLoop() {
leaderSignature := ""
if p.opts.cfg.StorageNode {
leaderSignature = p.signature
}
ctx, cancel := context.WithCancel(context.Background())
p.electorCancelFunc = cancel
go p.elector.ElectionLoop(ctx,
math.MaxUint64,
leaderSignature,
p.enableLeader,
p.disableLeader)
<-p.completeC
}
func (p *defaultProphet) enableLeader() {
log.Infof("********become to leader now********")
p.leader = p.node
p.rt = newRuntime(p)
p.rt.load()
p.coordinator = newCoordinator(p.cfg, p.runner, p.rt)
p.coordinator.start()
p.wn = newWatcherNotifier(p.rt)
p.wn.start()
// now, we are leader
atomic.StoreInt64(&p.leaderFlag, 1)
p.notifyElectionComplete()
p.cfg.Handler.ProphetBecomeLeader()
}
func (p *defaultProphet) disableLeader() {
atomic.StoreInt64(&p.leaderFlag, 0)
log.Infof("********become to follower now********")
p.maybeLoadLeader()
// now, we are not leader
if p.coordinator != nil {
p.coordinator.stop()
p.rt = nil
}
if p.wn != nil {
p.wn.stop()
}
p.notifyElectionComplete()
p.cfg.Handler.ProphetBecomeFollower()
}
func (p *defaultProphet) maybeLoadLeader() {
value, err := p.elector.CurrentLeader(math.MaxUint64)
if err != nil {
log.Errorf("get current leader failed with %+v", err)
}
if len(value) == 0 {
return
}
if len(value) > 0 {
p.leader = mustUnmarshal([]byte(value))
}
}
func (p *defaultProphet) isLeader() bool {
return 1 == atomic.LoadInt64(&p.leaderFlag)
}
func (p *defaultProphet) notifyElectionComplete() {
p.notifyOnce.Do(func() {
close(p.completeC)
})
}
func (p *defaultProphet) isMatchLeader(leaderNode *Node) bool {
return leaderNode != nil &&
p.node.Name == leaderNode.Name
}