-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprophet.go
148 lines (130 loc) · 3.53 KB
/
prophet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package prophet
import (
"context"
"math"
"sync"
"time"
"github.com/fagongzi/goetty"
"go.etcd.io/etcd/clientv3"
)
// RoleChangeHandler prophet role change handler
type RoleChangeHandler interface {
ProphetBecomeLeader()
ProphetBecomeFollower()
}
// Adapter prophet adapter
type Adapter interface {
// NewResource return a new resource
NewResource() Resource
// NewContainer return a new container
NewContainer() Container
// FetchLeaderResources fetch loacle leader resource
FetchLeaderResources() []uint64
// FetchResourceHB fetch resource HB
FetchResourceHB(id uint64) *ResourceHeartbeatReq
// FetchContainerHB fetch container HB
FetchContainerHB() *ContainerHeartbeatReq
// ResourceHBInterval fetch resource HB interface
ResourceHBInterval() time.Duration
// ContainerHBInterval fetch container HB interface
ContainerHBInterval() time.Duration
// HBHandler HB hander
HBHandler() HeartbeatHandler
}
// Prophet is the distributed scheduler and coordinator
type Prophet interface {
// Start start the prophet instance, this will start the lead election, heartbeat loop and listen requests
Start()
// Stop stop the prophet instance
Stop()
// GetStore returns the Store
GetStore() Store
// GetRPC returns the RPC client
GetRPC() RPC
// GetEtcdClient returns the internal etcd instance
GetEtcdClient() *clientv3.Client
// StorageNode returns true if the current node is storage node
StorageNode() bool
}
type defaultProphet struct {
sync.Mutex
adapter Adapter
opts *options
cfg *Cfg
store Store
rt *Runtime
coordinator *Coordinator
tcpL *goetty.Server
runner *Runner
completeC chan struct{}
rpc *simpleRPC
bizCodec *codec
wn *watcherNotifier
resourceHBC chan uint64
// about leader election
node *Node
elector Elector
electorCancelFunc context.CancelFunc
leader *Node
leaderFlag int64
signature string
notifyOnce sync.Once
}
// NewProphet returns a prophet instance
func NewProphet(name string, adapter Adapter, opts ...Option) Prophet {
value := &options{cfg: &Cfg{}}
for _, opt := range opts {
opt(value)
}
value.adjust()
p := new(defaultProphet)
p.opts = value
p.cfg = value.cfg
p.adapter = adapter
p.bizCodec = &codec{adapter: adapter}
p.leaderFlag = 0
p.node = &Node{
Name: name,
Addr: p.cfg.RPCAddr,
}
p.signature = p.node.marshal()
p.elector, _ = NewElector(p.opts.client, WithLeaderLeaseSeconds(p.opts.cfg.LeaseTTL))
p.store = newEtcdStore(value.client, adapter, p.signature, p.elector)
p.runner = NewRunner()
p.coordinator = newCoordinator(value.cfg, p.runner, p.rt)
p.tcpL = goetty.NewServer(p.cfg.RPCAddr,
goetty.WithServerDecoder(goetty.NewIntLengthFieldBasedDecoder(p.bizCodec)),
goetty.WithServerEncoder(goetty.NewIntLengthFieldBasedEncoder(p.bizCodec)))
p.completeC = make(chan struct{})
p.rpc = newSimpleRPC(p)
p.resourceHBC = make(chan uint64, 512)
return p
}
func (p *defaultProphet) Start() {
p.startListen()
p.startLeaderLoop()
p.startResourceHeartbeatLoop()
p.startContainerHeartbeatLoop()
}
func (p *defaultProphet) Stop() {
p.tcpL.Stop()
p.runner.Stop()
p.electorCancelFunc()
p.elector.Stop(math.MaxUint64)
p.opts.client.Close()
if p.opts.etcd != nil {
p.opts.etcd.Close()
}
}
func (p *defaultProphet) GetStore() Store {
return p.store
}
func (p *defaultProphet) GetRPC() RPC {
return p.rpc
}
func (p *defaultProphet) GetEtcdClient() *clientv3.Client {
return p.opts.client
}
func (p *defaultProphet) StorageNode() bool {
return p.opts.cfg.StorageNode
}