Skip to content

Commit

Permalink
statistics: remove mutex in hotPeerCache (#3721)
Browse files Browse the repository at this point in the history
* try to remove mutex

Signed-off-by: yisaer <[email protected]>

try to remove mutex

Signed-off-by: yisaer <[email protected]>

* try to remove mutex

Signed-off-by: yisaer <[email protected]>

* try to remove mutex

Signed-off-by: yisaer <[email protected]>

* try to remove mutex

Signed-off-by: yisaer <[email protected]>

* fix lint

Signed-off-by: yisaer <[email protected]>

* fix lint

Signed-off-by: yisaer <[email protected]>

* address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

fix test

Signed-off-by: yisaer <[email protected]>

* remove useless code

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

* fix race

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

fix race

Signed-off-by: yisaer <[email protected]>

* address the comment

Signed-off-by: yisaer <[email protected]>

* address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

Revert "address the comment"

This reverts commit 16a3cbf.

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

fix goroutine stuck

Signed-off-by: yisaer <[email protected]>

address the comment

Signed-off-by: yisaer <[email protected]>

address the

Signed-off-by: yisaer <[email protected]>

address the

Signed-off-by: yisaer <[email protected]>

address the

Signed-off-by: yisaer <[email protected]>

address the

Signed-off-by: yisaer <[email protected]>

use leaderCtx

Signed-off-by: yisaer <[email protected]>

Revert "use leaderCtx"

This reverts commit 7327c64.

* use quit

Signed-off-by: yisaer <[email protected]>

use quit

Signed-off-by: yisaer <[email protected]>

use quit

Signed-off-by: yisaer <[email protected]>

* add staleread casew

Signed-off-by: yisaer <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
Yisaer and ti-chi-bot authored Jun 7, 2021
1 parent 6c52d75 commit d26c09a
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 193 deletions.
3 changes: 2 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
mockQuit := make(chan struct{})
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotStat: statistics.NewHotStat(ctx, mockQuit),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down
31 changes: 8 additions & 23 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat(c.ctx)
c.hotStat = statistics.NewHotStat(c.ctx, c.quit)
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
Expand All @@ -222,7 +222,7 @@ func (c *RaftCluster) Start(s Server) error {
log.Warn("raft cluster has already been started")
return nil
}

c.quit = make(chan struct{})
c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster())
cluster, err := c.LoadClusterInfo()
if err != nil {
Expand Down Expand Up @@ -254,7 +254,6 @@ func (c *RaftCluster) Start(s Server) error {
c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.quit = make(chan struct{})

c.wg.Add(4)
go c.runCoordinator()
Expand Down Expand Up @@ -562,45 +561,31 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
statistics.RegionWriteKeys: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
c.hotStat.CheckReadAsync(item)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
collect := statistics.NewUnReportStatsCollect(storeID, regionIDs, interval)
c.hotStat.CheckReadAsync(collect)
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval))
return nil
}

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
hotStat := c.hotStat
storage := c.storage
coreCluster := c.core
hotStat := c.hotStat
c.RUnlock()

origin, err := coreCluster.PreCheckPutRegion(region)
if err != nil {
return err
}

expiredStats := hotStat.ExpiredItems(region)
// Put expiredStats into read/write queue to update stats
if len(expiredStats) > 0 {
for _, stat := range expiredStats {
item := statistics.NewExpiredStatItem(stat)
if stat.Kind == statistics.WriteFlow {
hotStat.CheckWriteAsync(item)
} else {
hotStat.CheckReadAsync(item)
}
}
}
hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
hotStat.CheckWriteAsync(item)
hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}

// Save to storage if meta is updated.
Expand Down
73 changes: 51 additions & 22 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,23 @@ func Test(t *testing.T) {

var _ = Suite(&testClusterInfoSuite{})

type testClusterInfoSuite struct{}
type testClusterInfoSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testClusterInfoSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testClusterInfoSuite) SetUpTest(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)
stores := newTestStores(n, "2.0.0")
Expand Down Expand Up @@ -161,7 +172,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

stores := newTestStores(3, "2.0.0")
for _, store := range stores {
Expand Down Expand Up @@ -193,7 +204,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
// Put 4 stores.
for _, store := range newTestStores(4, "2.0.0") {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
Expand Down Expand Up @@ -239,7 +250,7 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) {
func (s *testClusterInfoSuite) TestReuseAddress(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
// Put 4 stores.
for _, store := range newTestStores(4, "2.0.0") {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
Expand Down Expand Up @@ -280,7 +291,7 @@ func getTestDeployPath(storeID uint64) string {
func (s *testClusterInfoSuite) TestUpStore(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

// Put 3 stores.
for _, store := range newTestStores(3, "2.0.0") {
Expand Down Expand Up @@ -314,7 +325,7 @@ func (s *testClusterInfoSuite) TestUpStore(c *C) {
func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

// Put 3 new 4.0.9 stores.
for _, store := range newTestStores(3, "4.0.9") {
Expand All @@ -337,7 +348,7 @@ func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) {
func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
newTestStores(4, "2.0.0")
peers := []*metapb.Peer{
{
Expand Down Expand Up @@ -394,7 +405,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) {
func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)

Expand Down Expand Up @@ -607,7 +618,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}
processRegions := func(regions []*core.RegionInfo) {
for _, r := range regions {
Expand All @@ -627,7 +638,7 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}
regions = core.SplitRegions(regions)
Expand Down Expand Up @@ -688,7 +699,7 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

// 1: [nil, nil)
region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
Expand Down Expand Up @@ -727,7 +738,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}

Expand Down Expand Up @@ -760,7 +771,7 @@ func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

storage := core.NewStorage(kv.NewMemoryKV())
cluster.ruleManager = placement.NewRuleManager(storage, cluster)
Expand Down Expand Up @@ -822,7 +833,7 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) {
func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestCluster(opt)
tc := newTestCluster(s.ctx, opt)
stores := newTestStores(5, "2.0.0")
for _, s := range stores {
c.Assert(tc.putStoreLocked(s), IsNil)
Expand Down Expand Up @@ -889,14 +900,25 @@ func (s *testStoresInfoSuite) TestStores(c *C) {

var _ = Suite(&testRegionsInfoSuite{})

type testRegionsInfoSuite struct{}
type testRegionsInfoSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testRegionsInfoSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testRegionsInfoSuite) SetUpTest(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testRegionsInfoSuite) Test(c *C) {
n, np := uint64(10), uint64(3)
regions := newTestRegions(n, np)
_, opts, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestRaftCluster(mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
tc := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cache := tc.core.Regions

for i := uint64(0); i < n; i++ {
Expand Down Expand Up @@ -1000,13 +1022,20 @@ func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) {
var _ = Suite(&testGetStoresSuite{})

type testGetStoresSuite struct {
ctx context.Context
cancel context.CancelFunc
cluster *RaftCluster
}

func (s *testGetStoresSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testGetStoresSuite) SetUpSuite(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
s.ctx, s.cancel = context.WithCancel(context.Background())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
s.cluster = cluster

stores := newTestStores(200, "2.0.0")
Expand Down Expand Up @@ -1038,9 +1067,9 @@ func newTestScheduleConfig() (*config.ScheduleConfig, *config.PersistOptions, er
return &cfg.Schedule, opt, nil
}

func newTestCluster(opt *config.PersistOptions) *testCluster {
func newTestCluster(ctx context.Context, opt *config.PersistOptions) *testCluster {
storage := core.NewStorage(kv.NewMemoryKV())
rc := newTestRaftCluster(mockid.NewIDAllocator(), opt, storage, core.NewBasicCluster())
rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage, core.NewBasicCluster())
rc.ruleManager = placement.NewRuleManager(storage, rc)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels())
Expand All @@ -1052,8 +1081,8 @@ func newTestCluster(opt *config.PersistOptions) *testCluster {
return &testCluster{RaftCluster: rc}
}

func newTestRaftCluster(id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster {
rc := &RaftCluster{ctx: context.TODO()}
func newTestRaftCluster(ctx context.Context, id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster {
rc := &RaftCluster{ctx: ctx}
rc.InitCluster(id, opt, storage, basicCluster)
return rc
}
Expand Down
20 changes: 16 additions & 4 deletions server/cluster/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package cluster

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/kv"
Expand All @@ -26,12 +27,23 @@ import (

var _ = Suite(&testClusterWorkerSuite{})

type testClusterWorkerSuite struct{}
type testClusterWorkerSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testClusterWorkerSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testClusterWorkerSuite) SetUpTest(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testClusterWorkerSuite) TestReportSplit(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
left := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("b")}
right := &metapb.Region{Id: 2, StartKey: []byte("b"), EndKey: []byte("c")}
_, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right})
Expand All @@ -43,7 +55,7 @@ func (s *testClusterWorkerSuite) TestReportSplit(c *C) {
func (s *testClusterWorkerSuite) TestReportBatchSplit(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
regions := []*metapb.Region{
{Id: 1, StartKey: []byte(""), EndKey: []byte("a")},
{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")},
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
if setCfg != nil {
setCfg(cfg)
}
tc := newTestCluster(opt)
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */)
if setTc != nil {
setTc(tc)
Expand Down Expand Up @@ -335,7 +335,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
co.stop()
co.wg.Wait()

tc = newTestCluster(opt)
tc = newTestCluster(s.ctx, opt)
co = newCoordinator(s.ctx, tc.RaftCluster, hbStreams)
co.run()

Expand Down
Loading

0 comments on commit d26c09a

Please sign in to comment.