Skip to content

Commit 044a32b

Browse files
Add Epoch-Based Lease (#785)
1 parent c19a0b8 commit 044a32b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+5299
-1061
lines changed

aware/shard.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ type ShardStateAware interface {
2525
Splited(metapb.Shard)
2626
// Destroyed the shard was destroyed on the current store
2727
Destroyed(metapb.Shard)
28+
// SnapshotApplied snapshot applied
29+
SnapshotApplied(metapb.Shard)
2830
// BecomeLeader the shard was become leader on the current store
2931
BecomeLeader(metapb.Shard)
3032
// BecomeLeader the shard was become follower on the current store
3133
BecomeFollower(metapb.Shard)
32-
// SnapshotApplied snapshot applied
33-
SnapshotApplied(metapb.Shard)
34+
// LeaseChanged the lease of the shard is changed.
35+
LeaseChanged(shard metapb.Shard, lease *metapb.EpochLease, replica metapb.Replica)
3436
}
3537

3638
// TestShardStateAware just for test

client/client.go

+7
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ func WithReplicaSelectPolicy(policy rpcpb.ReplicaSelectPolicy) Option {
9090
}
9191
}
9292

93+
// WithLease set the Lease for request
94+
func WithLease(lease *metapb.EpochLease) Option {
95+
return func(req *rpcpb.Request) {
96+
req.Lease = lease
97+
}
98+
}
99+
93100
// Client is a cube client, providing read and write access to the external.
94101
type Client interface {
95102
// Start start the cube client

client/kv_client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (c *kvClient) BatchGet(ctx context.Context, keys [][]byte) *Future {
192192
c.cli.Router().AscendRange(c.shardGroup,
193193
tree.Min(), end,
194194
c.policy,
195-
func(shard raftstore.Shard, replicaStore metapb.Store) bool {
195+
func(shard raftstore.Shard, replicaStore metapb.Store, _ *metapb.EpochLease) bool {
196196
if allocated == tree.Len() {
197197
return false
198198
}
@@ -379,7 +379,7 @@ func (c *kvClient) ParallelScan(ctx context.Context,
379379
c.cli.Router().AscendRange(c.shardGroup,
380380
start, end,
381381
c.policy,
382-
func(shard raftstore.Shard, replicaStore metapb.Store) bool {
382+
func(shard raftstore.Shard, replicaStore metapb.Store, _ *metapb.EpochLease) bool {
383383
if len(ranges) == 0 {
384384
ranges = append(ranges, keyRange{start: start, end: shard.End})
385385
} else {

components/prophet/cluster/cluster.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,13 @@ func (c *RaftCluster) processShardHeartbeat(res *core.CachedShard) error {
553553
res.GetKeysRead() != origin.GetKeysRead() {
554554
saveCache = true
555555
}
556+
if res.GetLease().GetEpoch() > origin.GetLease().GetEpoch() ||
557+
res.GetLease() == nil {
558+
// If res.GetLease() is nil means all replicas restart, and no lease. So we need
559+
// reset shard's lease to nil, and waiting for scheduling to create lease.
560+
// If lease changed, lease epoch need save to cache.
561+
saveCache = true
562+
}
556563
}
557564

558565
if !saveKV && !saveCache && !isNew {
@@ -657,7 +664,10 @@ func (c *RaftCluster) processShardHeartbeat(res *core.CachedShard) error {
657664
zap.Uint64("from", from),
658665
zap.Uint64("to", res.GetLeader().GetStoreID()))
659666
}
660-
c.addNotifyLocked(event.NewShardEvent(res.Meta, res.GetLeader().GetID(), false, false))
667+
c.addNotifyLocked(event.NewShardEvent(res.Meta,
668+
res.GetLeader().GetID(),
669+
res.GetLease(),
670+
false, false))
661671
}
662672
if saveCache {
663673
c.addNotifyLocked(event.NewShardStatsEvent(res.GetStat()))
@@ -1230,6 +1240,11 @@ func (c *RaftCluster) AllocID() (uint64, error) {
12301240
return c.storage.AllocID()
12311241
}
12321242

1243+
// NextShardEpoch alloc next shard epoch
1244+
func (c *RaftCluster) NextShardEpoch(id uint64) (uint64, error) {
1245+
return c.storage.AllocShardLeaseEpoch(id)
1246+
}
1247+
12331248
// ChangedEventNotifier changedEventNotifier
12341249
func (c *RaftCluster) ChangedEventNotifier() <-chan rpcpb.EventNotify {
12351250
c.RLock()

components/prophet/cluster/cluster_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,38 @@ func TestUpStore(t *testing.T) {
225225
assert.True(t, strings.Contains(err.Error(), "not found"))
226226
}
227227

228+
func TestShardHeartbeatWithLease(t *testing.T) {
229+
_, opt, err := newTestScheduleConfig()
230+
assert.NoError(t, err)
231+
232+
cluster := newTestRaftCluster(opt, storage.NewTestStorage(), core.NewBasicCluster(nil))
233+
stores := newTestStores(3, "2.0.0")
234+
for _, store := range stores {
235+
assert.NoError(t, cluster.putStoreLocked(store))
236+
}
237+
238+
shards := newTestShards(1, 3)
239+
shard := shards[0]
240+
241+
assert.NoError(t, cluster.processShardHeartbeat(shard))
242+
assert.Nil(t, cluster.GetShard(shard.Meta.ID).GetLease())
243+
244+
// first lease
245+
newShard := shard.Clone(core.WithLease(&metapb.EpochLease{Epoch: 1, ReplicaID: shard.Meta.Replicas[0].ID}))
246+
assert.NoError(t, cluster.processShardHeartbeat(newShard))
247+
assert.Equal(t, &metapb.EpochLease{Epoch: 1, ReplicaID: shard.Meta.Replicas[0].ID}, cluster.GetShard(shard.Meta.ID).GetLease())
248+
249+
// new lease
250+
newShard = shard.Clone(core.WithLease(&metapb.EpochLease{Epoch: 2, ReplicaID: shard.Meta.Replicas[1].ID}))
251+
assert.NoError(t, cluster.processShardHeartbeat(newShard))
252+
assert.Equal(t, &metapb.EpochLease{Epoch: 2, ReplicaID: shard.Meta.Replicas[1].ID}, cluster.GetShard(shard.Meta.ID).GetLease())
253+
254+
// invalid lease
255+
newShard = shard.Clone(core.WithLease(&metapb.EpochLease{Epoch: 1, ReplicaID: shard.Meta.Replicas[2].ID}))
256+
assert.NoError(t, cluster.processShardHeartbeat(newShard))
257+
assert.Equal(t, &metapb.EpochLease{Epoch: 2, ReplicaID: shard.Meta.Replicas[1].ID}, cluster.GetShard(shard.Meta.ID).GetLease())
258+
}
259+
228260
func TestShardHeartbeat(t *testing.T) {
229261
_, opt, err := newTestScheduleConfig()
230262
assert.NoError(t, err)

components/prophet/cluster/cluster_worker.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (c *RaftCluster) HandleRemoveShards(request *rpcpb.ProphetRequest) (*rpcpb.
335335

336336
c.core.AddRemovedShards(request.RemoveShards.IDs...)
337337
for _, shard := range origin {
338-
c.addNotifyLocked(event.NewShardEvent(shard, 0, true, false))
338+
c.addNotifyLocked(event.NewShardEvent(shard, 0, nil, true, false))
339339
}
340340

341341
return &rpcpb.RemoveShardsRsp{}, nil
@@ -425,7 +425,7 @@ func (c *RaftCluster) triggerNotifyCreateShards() {
425425

426426
func (c *RaftCluster) doNotifyCreateShards() {
427427
c.core.ForeachWaitingCreateShards(func(res metapb.Shard) {
428-
c.addNotifyLocked(event.NewShardEvent(res, 0, false, true))
428+
c.addNotifyLocked(event.NewShardEvent(res, 0, nil, false, true))
429429
})
430430
}
431431

components/prophet/core/shard.go

+14
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type CachedShard struct {
4545
learners []metapb.Replica
4646
voters []metapb.Replica
4747
leader *metapb.Replica
48+
lease *metapb.EpochLease
4849
downReplicas replicaStatsSlice
4950
pendingReplicas replicaSlice
5051
stats metapb.ShardStats
@@ -108,6 +109,7 @@ func ShardFromHeartbeat(heartbeat rpcpb.ShardHeartbeatReq, meta metapb.Shard) *C
108109
downReplicas: heartbeat.GetDownReplicas(),
109110
pendingReplicas: heartbeat.GetPendingReplicas(),
110111
stats: heartbeat.Stats,
112+
lease: heartbeat.Lease,
111113
}
112114
shard.stats.ApproximateSize = shardSize
113115

@@ -152,6 +154,13 @@ func (r *CachedShard) Clone(opts ...ShardCreateOption) *CachedShard {
152154
opt(res)
153155
}
154156
fillVoterAndLearner(res)
157+
158+
if r.lease != nil {
159+
res.lease = &metapb.EpochLease{
160+
Epoch: r.lease.Epoch,
161+
ReplicaID: r.lease.ReplicaID,
162+
}
163+
}
155164
return res
156165
}
157166

@@ -168,6 +177,11 @@ func (r *CachedShard) IsDestroyState() bool {
168177
r.Meta.GetState() == metapb.ShardState_Destroying
169178
}
170179

180+
// GetLease returns lease of the shard
181+
func (r *CachedShard) GetLease() *metapb.EpochLease {
182+
return r.lease
183+
}
184+
171185
// GetTerm returns the current term of the shard
172186
func (r *CachedShard) GetTerm() uint64 {
173187
return r.term

components/prophet/core/shard_option.go

+7
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ func WithLeader(leader *metapb.Replica) ShardCreateOption {
7272
}
7373
}
7474

75+
// WithLease sets the lease for the shard.
76+
func WithLease(lease *metapb.EpochLease) ShardCreateOption {
77+
return func(res *CachedShard) {
78+
res.lease = lease
79+
}
80+
}
81+
7582
// WithStartKey sets the start key for the shard.
7683
func WithStartKey(key []byte) ShardCreateOption {
7784
return func(res *CachedShard) {

components/prophet/core/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ func DistinctScore(labels []string, stores []*CachedStore, other *CachedStore) f
450450
// ones.
451451
// ones.
452452
func (cr *CachedStore) MergeLabels(labels []metapb.Label) []metapb.Label {
453-
storeLabels := cr.Meta.GetLabels()
453+
storeLabels := cr.Meta.Clone().GetLabels()
454454
L:
455455
for _, newLabel := range labels {
456456
for idx := range storeLabels {

components/prophet/event/event.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ func TypeName(value uint32) string {
4949

5050
// Snapshot cache snapshot
5151
type Snapshot struct {
52-
Shards []metapb.Shard
53-
Stores []metapb.Store
54-
Leaders map[uint64]uint64
52+
Shards []metapb.Shard
53+
Stores []metapb.Store
54+
LeaderReplicasIDs map[uint64]uint64
55+
Leases map[uint64]*metapb.EpochLease
5556
}
5657

5758
// MatchEvent returns the flag has the target event
@@ -79,14 +80,20 @@ func NewInitEvent(snap Snapshot) (*rpcpb.InitEventData, error) {
7980
}
8081

8182
resp.Shards = append(resp.Shards, data)
82-
resp.Leaders = append(resp.Leaders, snap.Leaders[v.GetID()])
83+
resp.LeaderReplicaIDs = append(resp.LeaderReplicaIDs, snap.LeaderReplicasIDs[v.GetID()])
84+
lease := snap.Leases[v.GetID()]
85+
if nil == lease {
86+
resp.Leases = append(resp.Leases, metapb.EpochLease{})
87+
} else {
88+
resp.Leases = append(resp.Leases, *lease)
89+
}
8390
}
8491

8592
return resp, nil
8693
}
8794

8895
// NewShardEvent create shard event
89-
func NewShardEvent(target metapb.Shard, leaderID uint64, removed bool, create bool) rpcpb.EventNotify {
96+
func NewShardEvent(target metapb.Shard, leaderReplicaID uint64, lease *metapb.EpochLease, removed bool, create bool) rpcpb.EventNotify {
9097
value, err := target.Marshal()
9198
if err != nil {
9299
return rpcpb.EventNotify{}
@@ -95,10 +102,11 @@ func NewShardEvent(target metapb.Shard, leaderID uint64, removed bool, create bo
95102
return rpcpb.EventNotify{
96103
Type: ShardEvent,
97104
ShardEvent: &rpcpb.ShardEventData{
98-
Data: value,
99-
Leader: leaderID,
100-
Removed: removed,
101-
Create: create,
105+
Data: value,
106+
LeaderReplicaID: leaderReplicaID,
107+
Lease: lease,
108+
Removed: removed,
109+
Create: create,
102110
},
103111
}
104112
}

components/prophet/event_notifier.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/matrixorigin/matrixcube/components/log"
2424
"github.com/matrixorigin/matrixcube/components/prophet/cluster"
2525
"github.com/matrixorigin/matrixcube/components/prophet/event"
26+
"github.com/matrixorigin/matrixcube/pb/metapb"
2627
"github.com/matrixorigin/matrixcube/pb/rpcpb"
2728
"github.com/matrixorigin/matrixcube/util/stop"
2829
"go.uber.org/zap"
@@ -73,17 +74,16 @@ func (wn *eventNotifier) handleCreateWatcher(req *rpcpb.ProphetRequest, resp *rp
7374
defer wn.cluster.RUnlock()
7475
if event.MatchEvent(event.InitEvent, req.CreateWatcher.Flag) {
7576
snap := event.Snapshot{
76-
Leaders: make(map[uint64]uint64),
77+
LeaderReplicasIDs: make(map[uint64]uint64),
78+
Leases: make(map[uint64]*metapb.EpochLease),
7779
}
7880
for _, c := range wn.cluster.GetStores() {
7981
snap.Stores = append(snap.Stores, c.Meta)
8082
}
8183
for _, res := range wn.cluster.GetShards() {
8284
snap.Shards = append(snap.Shards, res.Meta)
83-
leader := res.GetLeader()
84-
if leader != nil {
85-
snap.Leaders[res.Meta.GetID()] = leader.ID
86-
}
85+
snap.LeaderReplicasIDs[res.Meta.GetID()] = res.GetLeader().GetID()
86+
snap.Leases[res.Meta.GetID()] = res.GetLease()
8787
}
8888

8989
rsp, err := event.NewInitEvent(snap)

components/prophet/id/id.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ type Generator interface {
3232
}
3333

3434
const (
35-
idBatch uint64 = 512
3635
UninitializedID uint64 = 0
3736
)
3837

@@ -42,6 +41,7 @@ type etcdGenerator struct {
4241

4342
client *clientv3.Client
4443
leadship *election.Leadership
44+
idBatch uint64
4545
idPath string
4646
base uint64
4747
end uint64
@@ -52,11 +52,22 @@ func NewEtcdGenerator(
5252
rootPath string,
5353
client *clientv3.Client,
5454
leadship *election.Leadership,
55+
) Generator {
56+
return NewEtcdGeneratorWithPathAndBatch(fmt.Sprintf("%s/meta/id", rootPath), 512, client, leadship)
57+
}
58+
59+
// NewEtcdGenerator returns alloc ID allocator based on etcd.
60+
func NewEtcdGeneratorWithPathAndBatch(
61+
idPath string,
62+
idBatch uint64,
63+
client *clientv3.Client,
64+
leadship *election.Leadership,
5565
) Generator {
5666
return &etcdGenerator{
67+
idBatch: idBatch,
5768
client: client,
5869
leadship: leadship,
59-
idPath: fmt.Sprintf("%s/meta/id", rootPath),
70+
idPath: idPath,
6071
}
6172
}
6273

@@ -82,7 +93,7 @@ func (alloc *etcdGenerator) preemption() error {
8293
return err
8394

8495
}
85-
end := value + idBatch
96+
end := value + alloc.idBatch
8697

8798
if value == 0 {
8899
err = alloc.createID(end)

components/prophet/id/id_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestAllocID(t *testing.T) {
4545
rootPath := "/root"
4646
allocator := NewEtcdGenerator(rootPath, client, ls)
4747

48-
n := idBatch + 1
48+
n := allocator.(*etcdGenerator).idBatch + 1
4949
for i := uint64(1); i <= n; i++ {
5050
id, err := allocator.AllocID()
5151
assert.NoError(t, err)

components/prophet/mock/mockcluster/mockcluster.go

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func (mc *Cluster) AllocID() (uint64, error) {
9090
return mc.storage.AllocID()
9191
}
9292

93+
func (mc *Cluster) NextShardEpoch(id uint64) (uint64, error) {
94+
return mc.storage.AllocShardLeaseEpoch(id)
95+
}
96+
9397
// ScanShards scans resource with start key, until number greater than limit.
9498
func (mc *Cluster) ScanShards(group uint64, startKey, endKey []byte, limit int) []*core.CachedShard {
9599
return mc.Shards.ScanRange(group, startKey, endKey, limit)

0 commit comments

Comments
 (0)