diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 9cb1f95a7b8..229b0b694d8 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -54,10 +54,11 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + mockQuit := make(chan struct{}) clus := &Cluster{ BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, mockQuit), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, disabledFeatures: make(map[versioninfo.Feature]struct{}), diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ef29f6c71a6..49280eed74c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s c.storage = storage c.id = id c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat(c.ctx) + c.hotStat = statistics.NewHotStat(c.ctx, c.quit) c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute) @@ -222,7 +222,7 @@ func (c *RaftCluster) Start(s Server) error { log.Warn("raft cluster has already been started") return nil } - + c.quit = make(chan struct{}) c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster()) cluster, err := c.LoadClusterInfo() if err != nil { @@ -254,7 +254,6 @@ func (c *RaftCluster) Start(s Server) error { c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) - c.quit = make(chan struct{}) c.wg.Add(4) go c.runCoordinator() @@ -562,45 +561,31 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { statistics.RegionWriteKeys: 0, } peerInfo := core.NewPeerInfo(peer, loads, interval) - item := statistics.NewPeerInfoItem(peerInfo, region) - c.hotStat.CheckReadAsync(item) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - collect := statistics.NewUnReportStatsCollect(storeID, regionIDs, interval) - c.hotStat.CheckReadAsync(collect) + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval)) return nil } // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RLock() - hotStat := c.hotStat storage := c.storage coreCluster := c.core + hotStat := c.hotStat c.RUnlock() origin, err := coreCluster.PreCheckPutRegion(region) if err != nil { return err } - - expiredStats := hotStat.ExpiredItems(region) - // Put expiredStats into read/write queue to update stats - if len(expiredStats) > 0 { - for _, stat := range expiredStats { - item := statistics.NewExpiredStatItem(stat) - if stat.Kind == statistics.WriteFlow { - hotStat.CheckWriteAsync(item) - } else { - hotStat.CheckReadAsync(item) - } - } - } + hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - item := statistics.NewPeerInfoItem(peerInfo, region) - hotStat.CheckWriteAsync(item) + hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) } // Save to storage if meta is updated. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index d63a8370ae3..a2ff06ba3c2 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -44,12 +44,23 @@ func Test(t *testing.T) { var _ = Suite(&testClusterInfoSuite{}) -type testClusterInfoSuite struct{} +type testClusterInfoSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testClusterInfoSuite) TearDownTest(c *C) { + s.cancel() +} + +func (s *testClusterInfoSuite) SetUpTest(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) n, np := uint64(3), uint64(3) stores := newTestStores(n, "2.0.0") @@ -161,7 +172,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) stores := newTestStores(3, "2.0.0") for _, store := range stores { @@ -193,7 +204,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) { func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // Put 4 stores. for _, store := range newTestStores(4, "2.0.0") { c.Assert(cluster.PutStore(store.GetMeta()), IsNil) @@ -239,7 +250,7 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) { func (s *testClusterInfoSuite) TestReuseAddress(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // Put 4 stores. for _, store := range newTestStores(4, "2.0.0") { c.Assert(cluster.PutStore(store.GetMeta()), IsNil) @@ -280,7 +291,7 @@ func getTestDeployPath(storeID uint64) string { func (s *testClusterInfoSuite) TestUpStore(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // Put 3 stores. for _, store := range newTestStores(3, "2.0.0") { @@ -314,7 +325,7 @@ func (s *testClusterInfoSuite) TestUpStore(c *C) { func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // Put 3 new 4.0.9 stores. for _, store := range newTestStores(3, "4.0.9") { @@ -337,7 +348,7 @@ func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) newTestStores(4, "2.0.0") peers := []*metapb.Peer{ { @@ -394,7 +405,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) { func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) n, np := uint64(3), uint64(3) @@ -607,7 +618,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} processRegions := func(regions []*core.RegionInfo) { for _, r := range regions { @@ -627,7 +638,7 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) { func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} regions = core.SplitRegions(regions) @@ -688,7 +699,7 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) { func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) @@ -727,7 +738,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} @@ -760,7 +771,7 @@ func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) storage := core.NewStorage(kv.NewMemoryKV()) cluster.ruleManager = placement.NewRuleManager(storage, cluster) @@ -822,7 +833,7 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - tc := newTestCluster(opt) + tc := newTestCluster(s.ctx, opt) stores := newTestStores(5, "2.0.0") for _, s := range stores { c.Assert(tc.putStoreLocked(s), IsNil) @@ -889,14 +900,25 @@ func (s *testStoresInfoSuite) TestStores(c *C) { var _ = Suite(&testRegionsInfoSuite{}) -type testRegionsInfoSuite struct{} +type testRegionsInfoSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testRegionsInfoSuite) TearDownTest(c *C) { + s.cancel() +} + +func (s *testRegionsInfoSuite) SetUpTest(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} func (s *testRegionsInfoSuite) Test(c *C) { n, np := uint64(10), uint64(3) regions := newTestRegions(n, np) _, opts, err := newTestScheduleConfig() c.Assert(err, IsNil) - tc := newTestRaftCluster(mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + tc := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) cache := tc.core.Regions for i := uint64(0); i < n; i++ { @@ -1000,13 +1022,20 @@ func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) { var _ = Suite(&testGetStoresSuite{}) type testGetStoresSuite struct { + ctx context.Context + cancel context.CancelFunc cluster *RaftCluster } +func (s *testGetStoresSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testGetStoresSuite) SetUpSuite(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + s.ctx, s.cancel = context.WithCancel(context.Background()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) s.cluster = cluster stores := newTestStores(200, "2.0.0") @@ -1038,9 +1067,9 @@ func newTestScheduleConfig() (*config.ScheduleConfig, *config.PersistOptions, er return &cfg.Schedule, opt, nil } -func newTestCluster(opt *config.PersistOptions) *testCluster { +func newTestCluster(ctx context.Context, opt *config.PersistOptions) *testCluster { storage := core.NewStorage(kv.NewMemoryKV()) - rc := newTestRaftCluster(mockid.NewIDAllocator(), opt, storage, core.NewBasicCluster()) + rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage, core.NewBasicCluster()) rc.ruleManager = placement.NewRuleManager(storage, rc) if opt.IsPlacementRulesEnabled() { err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) @@ -1052,8 +1081,8 @@ func newTestCluster(opt *config.PersistOptions) *testCluster { return &testCluster{RaftCluster: rc} } -func newTestRaftCluster(id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster { - rc := &RaftCluster{ctx: context.TODO()} +func newTestRaftCluster(ctx context.Context, id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster { + rc := &RaftCluster{ctx: ctx} rc.InitCluster(id, opt, storage, basicCluster) return rc } diff --git a/server/cluster/cluster_worker_test.go b/server/cluster/cluster_worker_test.go index 837640959ec..9b8cfdfcffd 100644 --- a/server/cluster/cluster_worker_test.go +++ b/server/cluster/cluster_worker_test.go @@ -14,10 +14,11 @@ package cluster import ( + "context" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/kv" @@ -26,12 +27,23 @@ import ( var _ = Suite(&testClusterWorkerSuite{}) -type testClusterWorkerSuite struct{} +type testClusterWorkerSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testClusterWorkerSuite) TearDownTest(c *C) { + s.cancel() +} + +func (s *testClusterWorkerSuite) SetUpTest(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} func (s *testClusterWorkerSuite) TestReportSplit(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) left := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("b")} right := &metapb.Region{Id: 2, StartKey: []byte("b"), EndKey: []byte("c")} _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right}) @@ -43,7 +55,7 @@ func (s *testClusterWorkerSuite) TestReportSplit(c *C) { func (s *testClusterWorkerSuite) TestReportBatchSplit(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) regions := []*metapb.Region{ {Id: 1, StartKey: []byte(""), EndKey: []byte("a")}, {Id: 2, StartKey: []byte("a"), EndKey: []byte("b")}, diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 258557d580d..50ae9aab90a 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -283,7 +283,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run if setCfg != nil { setCfg(cfg) } - tc := newTestCluster(opt) + tc := newTestCluster(ctx, opt) hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */) if setTc != nil { setTc(tc) @@ -335,7 +335,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { co.stop() co.wg.Wait() - tc = newTestCluster(opt) + tc = newTestCluster(s.ctx, opt) co = newCoordinator(s.ctx, tc.RaftCluster, hbStreams) co.run() diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 63e4b183a27..e14d645d20f 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -15,7 +15,6 @@ package statistics import ( "context" - "github.com/tikv/pd/server/core" ) @@ -23,93 +22,33 @@ import ( // only turned off by the simulator and the test. var Denoising = true -const queueCap = 1000 +const queueCap = 20000 // HotCache is a cache hold hot regions. type HotCache struct { - readFlowQueue chan *FlowItem - writeFlowQueue chan *FlowItem + ctx context.Context + quit <-chan struct{} + readFlowQueue chan FlowItemTask + writeFlowQueue chan FlowItemTask writeFlow *hotPeerCache readFlow *hotPeerCache } -// FlowItem indicates the item in the flow, it is a wrapper for peerInfo, expiredItems or coldItem. -type FlowItem struct { - peerInfo *core.PeerInfo - regionInfo *core.RegionInfo - expiredStat *HotPeerStat - unReportStatsCollect *unReportStatsCollect -} - -type unReportStatsCollect struct { - storeID uint64 - regionIDs map[uint64]struct{} - interval uint64 -} - -// NewUnReportStatsCollect creates information for unreported stats -func NewUnReportStatsCollect(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) *FlowItem { - return &FlowItem{ - peerInfo: nil, - regionInfo: nil, - expiredStat: nil, - unReportStatsCollect: &unReportStatsCollect{ - storeID: storeID, - regionIDs: regionIDs, - interval: interval, - }, - } -} - -// NewPeerInfoItem creates FlowItem for PeerInfo -func NewPeerInfoItem(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) *FlowItem { - return &FlowItem{ - peerInfo: peerInfo, - regionInfo: regionInfo, - expiredStat: nil, - unReportStatsCollect: nil, - } -} - -// NewExpiredStatItem creates Expired stat -func NewExpiredStatItem(expiredStat *HotPeerStat) *FlowItem { - return &FlowItem{ - peerInfo: nil, - regionInfo: nil, - expiredStat: expiredStat, - unReportStatsCollect: nil, - } -} - // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, quit <-chan struct{}) *HotCache { w := &HotCache{ - readFlowQueue: make(chan *FlowItem, queueCap), - writeFlowQueue: make(chan *FlowItem, queueCap), + ctx: ctx, + quit: quit, + readFlowQueue: make(chan FlowItemTask, queueCap), + writeFlowQueue: make(chan FlowItemTask, queueCap), writeFlow: NewHotStoresStats(WriteFlow), readFlow: NewHotStoresStats(ReadFlow), } - go w.updateItems(ctx) + go w.updateItems(w.readFlowQueue, w.runReadTask) + go w.updateItems(w.writeFlowQueue, w.runWriteTask) return w } -// ExpiredItems returns the items which are already expired. -func (w *HotCache) ExpiredItems(region *core.RegionInfo) (expiredItems []*HotPeerStat) { - expiredItems = append(expiredItems, w.ExpiredReadItems(region)...) - expiredItems = append(expiredItems, w.ExpiredWriteItems(region)...) - return -} - -// ExpiredReadItems returns the read items which are already expired. -func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { - return w.readFlow.CollectExpiredItems(region) -} - -// ExpiredWriteItems returns the write items which are already expired. -func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { - return w.writeFlow.CollectExpiredItems(region) -} - // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { @@ -123,30 +62,33 @@ func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInf } // CheckWriteAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckWriteAsync(item *FlowItem) { - w.writeFlowQueue <- item +func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { + select { + case w.writeFlowQueue <- task: + return true + default: + return false + } } // CheckReadAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckReadAsync(item *FlowItem) { - w.readFlowQueue <- item +func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { + select { + case w.readFlowQueue <- task: + return true + default: + return false + } } // Update updates the cache. +// This is used for mockcluster. func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { case WriteFlow: - w.writeFlow.Update(item) + update(item, w.writeFlow) case ReadFlow: - w.readFlow.Update(item) - } - - if item.IsNeedDelete() { - w.incMetrics("remove_item", item.StoreID, item.Kind) - } else if item.IsNew() { - w.incMetrics("add_item", item.StoreID, item.Kind) - } else { - w.incMetrics("update_item", item.StoreID, item.Kind) + update(item, w.readFlow) } } @@ -154,9 +96,19 @@ func (w *HotCache) Update(item *HotPeerStat) { func (w *HotCache) RegionStats(kind FlowKind, minHotDegree int) map[uint64][]*HotPeerStat { switch kind { case WriteFlow: - return w.writeFlow.RegionStats(minHotDegree) + task := newCollectRegionStatsTask(minHotDegree) + succ := w.CheckWriteAsync(task) + if !succ { + return nil + } + return task.waitRet(w.ctx, w.quit) case ReadFlow: - return w.readFlow.RegionStats(minHotDegree) + task := newCollectRegionStatsTask(minHotDegree) + succ := w.CheckReadAsync(task) + if !succ { + return nil + } + return task.waitRet(w.ctx, w.quit) } return nil } @@ -171,14 +123,28 @@ func (w *HotCache) HotRegionsFromStore(storeID uint64, kind FlowKind, minHotDegr // IsRegionHot checks if the region is hot. func (w *HotCache) IsRegionHot(region *core.RegionInfo, minHotDegree int) bool { - return w.writeFlow.isRegionHotWithAnyPeers(region, minHotDegree) || - w.readFlow.isRegionHotWithAnyPeers(region, minHotDegree) + writeIsRegionHotTask := newIsRegionHotTask(region, minHotDegree) + readIsRegionHotTask := newIsRegionHotTask(region, minHotDegree) + succ1 := w.CheckWriteAsync(writeIsRegionHotTask) + succ2 := w.CheckReadAsync(readIsRegionHotTask) + if succ1 && succ2 { + return writeIsRegionHotTask.waitRet(w.ctx, w.quit) || readIsRegionHotTask.waitRet(w.ctx, w.quit) + } + return false } // CollectMetrics collects the hot cache metrics. func (w *HotCache) CollectMetrics() { - w.writeFlow.CollectMetrics("write") - w.readFlow.CollectMetrics("read") + writeMetricsTask := newCollectMetricsTask("write") + readMetricsTask := newCollectMetricsTask("read") + succ1 := w.CheckWriteAsync(writeMetricsTask) + succ2 := w.CheckReadAsync(readMetricsTask) + if succ1 { + writeMetricsTask.waitDone(w.ctx, w.quit) + } + if succ2 { + readMetricsTask.waitDone(w.ctx, w.quit) + } } // ResetMetrics resets the hot cache metrics. @@ -186,7 +152,19 @@ func (w *HotCache) ResetMetrics() { hotCacheStatusGauge.Reset() } -func (w *HotCache) incMetrics(name string, storeID uint64, kind FlowKind) { +// ExpiredReadItems returns the read items which are already expired. +// This is used for mockcluster. +func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { + return w.readFlow.CollectExpiredItems(region) +} + +// ExpiredWriteItems returns the write items which are already expired. +// This is used for mockcluster. +func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { + return w.writeFlow.CollectExpiredItems(region) +} + +func incMetrics(name string, storeID uint64, kind FlowKind) { store := storeTag(storeID) switch kind { case WriteFlow: @@ -207,38 +185,40 @@ func (w *HotCache) GetFilledPeriod(kind FlowKind) int { return 0 } -func (w *HotCache) updateItems(ctx context.Context) { +func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) { for { select { - case <-ctx.Done(): + case <-w.ctx.Done(): + return + case <-w.quit: return - case item, ok := <-w.writeFlowQueue: - if ok && item != nil { - w.updateItem(item, w.writeFlow) - } - hotCacheFlowQueueStatusGauge.WithLabelValues(WriteFlow.String()).Set(float64(len(w.writeFlowQueue))) - case item, ok := <-w.readFlowQueue: - if ok && item != nil { - w.updateItem(item, w.readFlow) - } - hotCacheFlowQueueStatusGauge.WithLabelValues(ReadFlow.String()).Set(float64(len(w.readFlowQueue))) + case task := <-queue: + runTask(task) } } } -func (w *HotCache) updateItem(item *FlowItem, flow *hotPeerCache) { - if item.peerInfo != nil && item.regionInfo != nil { - stat := flow.CheckPeerFlow(item.peerInfo, item.regionInfo) - if stat != nil { - w.Update(stat) - } - } else if item.expiredStat != nil { - w.Update(item.expiredStat) - } else if item.unReportStatsCollect != nil { - handle := item.unReportStatsCollect - stats := flow.CheckColdPeer(handle.storeID, handle.regionIDs, handle.interval) - for _, stat := range stats { - w.Update(stat) - } +func (w *HotCache) runReadTask(task FlowItemTask) { + if task != nil { + task.runTask(w.readFlow) + hotCacheFlowQueueStatusGauge.WithLabelValues(ReadFlow.String()).Set(float64(len(w.readFlowQueue))) + } +} + +func (w *HotCache) runWriteTask(task FlowItemTask) { + if task != nil { + task.runTask(w.writeFlow) + hotCacheFlowQueueStatusGauge.WithLabelValues(WriteFlow.String()).Set(float64(len(w.writeFlowQueue))) + } +} + +func update(item *HotPeerStat, flow *hotPeerCache) { + flow.Update(item) + if item.IsNeedDelete() { + incMetrics("remove_item", item.StoreID, item.Kind) + } else if item.IsNew() { + incMetrics("add_item", item.StoreID, item.Kind) + } else { + incMetrics("update_item", item.StoreID, item.Kind) } } diff --git a/server/statistics/hot_cache_task.go b/server/statistics/hot_cache_task.go new file mode 100644 index 00000000000..1c98d322ed4 --- /dev/null +++ b/server/statistics/hot_cache_task.go @@ -0,0 +1,205 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "context" + + "github.com/tikv/pd/server/core" +) + +type flowItemTaskKind uint32 + +const ( + checkPeerTaskType flowItemTaskKind = iota + checkExpiredTaskType + collectUnReportedPeerTaskType + collectRegionStatsTaskType + isRegionHotTaskType + collectMetricsTaskType +) + +// FlowItemTask indicates the task in flowItem queue +type FlowItemTask interface { + taskType() flowItemTaskKind + runTask(flow *hotPeerCache) +} + +type checkPeerTask struct { + peerInfo *core.PeerInfo + regionInfo *core.RegionInfo +} + +// NewCheckPeerTask creates task to update peerInfo +func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { + return &checkPeerTask{ + peerInfo: peerInfo, + regionInfo: regionInfo, + } +} + +func (t *checkPeerTask) taskType() flowItemTaskKind { + return checkPeerTaskType +} + +func (t *checkPeerTask) runTask(flow *hotPeerCache) { + stat := flow.CheckPeerFlow(t.peerInfo, t.regionInfo) + if stat != nil { + update(stat, flow) + } +} + +type checkExpiredTask struct { + region *core.RegionInfo +} + +// NewCheckExpiredItemTask creates task to collect expired items +func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { + return &checkExpiredTask{ + region: region, + } +} + +func (t *checkExpiredTask) taskType() flowItemTaskKind { + return checkExpiredTaskType +} + +func (t *checkExpiredTask) runTask(flow *hotPeerCache) { + expiredStats := flow.CollectExpiredItems(t.region) + for _, stat := range expiredStats { + update(stat, flow) + } +} + +type collectUnReportedPeerTask struct { + storeID uint64 + regionIDs map[uint64]struct{} + interval uint64 +} + +// NewCollectUnReportedPeerTask creates task to collect unreported peers +func NewCollectUnReportedPeerTask(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) FlowItemTask { + return &collectUnReportedPeerTask{ + storeID: storeID, + regionIDs: regionIDs, + interval: interval, + } +} + +func (t *collectUnReportedPeerTask) taskType() flowItemTaskKind { + return collectUnReportedPeerTaskType +} + +func (t *collectUnReportedPeerTask) runTask(flow *hotPeerCache) { + stats := flow.CheckColdPeer(t.storeID, t.regionIDs, t.interval) + for _, stat := range stats { + update(stat, flow) + } +} + +type collectRegionStatsTask struct { + minDegree int + ret chan map[uint64][]*HotPeerStat +} + +func newCollectRegionStatsTask(minDegree int) *collectRegionStatsTask { + return &collectRegionStatsTask{ + minDegree: minDegree, + ret: make(chan map[uint64][]*HotPeerStat), + } +} + +func (t *collectRegionStatsTask) taskType() flowItemTaskKind { + return collectRegionStatsTaskType +} + +func (t *collectRegionStatsTask) runTask(flow *hotPeerCache) { + t.ret <- flow.RegionStats(t.minDegree) +} + +func (t *collectRegionStatsTask) waitRet(ctx context.Context, quit <-chan struct{}) map[uint64][]*HotPeerStat { + select { + case <-ctx.Done(): + return nil + case <-quit: + return nil + case ret := <-t.ret: + return ret + } +} + +type isRegionHotTask struct { + region *core.RegionInfo + minHotDegree int + ret chan bool +} + +func newIsRegionHotTask(region *core.RegionInfo, minDegree int) *isRegionHotTask { + return &isRegionHotTask{ + region: region, + minHotDegree: minDegree, + ret: make(chan bool), + } +} + +func (t *isRegionHotTask) taskType() flowItemTaskKind { + return isRegionHotTaskType +} + +func (t *isRegionHotTask) runTask(flow *hotPeerCache) { + t.ret <- flow.isRegionHotWithAnyPeers(t.region, t.minHotDegree) +} + +func (t *isRegionHotTask) waitRet(ctx context.Context, quit <-chan struct{}) bool { + select { + case <-ctx.Done(): + return false + case <-quit: + return false + case r := <-t.ret: + return r + } +} + +type collectMetricsTask struct { + typ string + done chan struct{} +} + +func newCollectMetricsTask(typ string) *collectMetricsTask { + return &collectMetricsTask{ + typ: typ, + done: make(chan struct{}), + } +} + +func (t *collectMetricsTask) taskType() flowItemTaskKind { + return collectMetricsTaskType +} + +func (t *collectMetricsTask) runTask(flow *hotPeerCache) { + flow.CollectMetrics(t.typ) + t.done <- struct{}{} +} + +func (t *collectMetricsTask) waitDone(ctx context.Context, quit <-chan struct{}) { + select { + case <-ctx.Done(): + return + case <-quit: + return + case <-t.done: + return + } +} diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 004dc299a3b..ee838796a12 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -15,7 +15,6 @@ package statistics import ( "math" - "sync" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -52,7 +51,6 @@ var minHotThresholds = [RegionStatCount]float64{ // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { - sync.RWMutex kind FlowKind peersOfStore map[uint64]*TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs @@ -83,8 +81,6 @@ func NewHotStoresStats(kind FlowKind) *hotPeerCache { // TODO: rename RegionStats as PeerStats // RegionStats returns hot items func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { - f.RLock() - defer f.RUnlock() res := make(map[uint64][]*HotPeerStat) for storeID, peers := range f.peersOfStore { values := peers.GetAll() @@ -101,8 +97,6 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { // Update updates the items in statistics. func (f *hotPeerCache) Update(item *HotPeerStat) { - f.Lock() - defer f.Unlock() if item.IsNeedDelete() { f.putInheritItem(item) f.removeItem(item) @@ -135,8 +129,6 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { // CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat { - f.RLock() - defer f.RUnlock() regionID := region.GetID() items := make([]*HotPeerStat, 0) for _, storeID := range f.getAllStoreIDs(region) { @@ -155,8 +147,6 @@ func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerSt // Notice: CheckPeerFlow couldn't be used concurrently. // CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - f.Lock() - defer f.Unlock() interval := peer.GetInterval() if Denoising && interval < HotRegionReportMinInterval { return nil @@ -207,8 +197,6 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf // CheckColdPeer checks the collect the un-heartbeat peer and maintain it. func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]struct{}, interval uint64) (ret []*HotPeerStat) { - f.Lock() - defer f.Unlock() if Denoising && interval < HotRegionReportMinInterval { return } @@ -251,8 +239,6 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st } func (f *hotPeerCache) CollectMetrics(typ string) { - f.RLock() - defer f.RUnlock() for storeID, peers := range f.peersOfStore { store := storeTag(storeID) thresholds := f.calcHotThresholds(storeID) diff --git a/server/statistics/hotstat.go b/server/statistics/hotstat.go index 50a97285b90..309fbe23f47 100644 --- a/server/statistics/hotstat.go +++ b/server/statistics/hotstat.go @@ -13,7 +13,9 @@ package statistics -import "context" +import ( + "context" +) // HotStat contains cluster's hotspot statistics. type HotStat struct { @@ -22,9 +24,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, quit <-chan struct{}) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, quit), StoresStats: NewStoresStats(), } } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index ef3f9b172d1..f84b02f0bb5 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -35,16 +35,22 @@ func Test(t *testing.T) { var _ = Suite(&schedulerTestSuite{}) -type schedulerTestSuite struct{} +type schedulerTestSuite struct { + context context.Context + cancel context.CancelFunc +} func (s *schedulerTestSuite) SetUpSuite(c *C) { server.EnableZap = true + s.context, s.cancel = context.WithCancel(context.Background()) +} + +func (s *schedulerTestSuite) TearDownSuite(c *C) { + s.cancel() } func (s *schedulerTestSuite) TestScheduler(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + cluster, err := tests.NewTestCluster(s.context, 1) c.Assert(err, IsNil) err = cluster.RunInitialServers() c.Assert(err, IsNil)