Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: Introduce BalaceKeyrange scheduler #8497

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
63a7fec
a
CalvinNeo Aug 6, 2024
e49630b
more
CalvinNeo Aug 6, 2024
3e683fe
a
CalvinNeo Aug 6, 2024
cdd686a
fix tests
CalvinNeo Aug 21, 2024
9bfe8b6
fix
CalvinNeo Aug 21, 2024
9afa987
more fix
CalvinNeo Aug 21, 2024
b9e3ca2
state
CalvinNeo Sep 5, 2024
d8df5fd
Merge remote-tracking branch 'upstream/master' into ctl-1
CalvinNeo Nov 6, 2024
60baee3
added some tests
CalvinNeo Nov 13, 2024
9537d03
add missing file
CalvinNeo Nov 13, 2024
ba6d418
enrich tests
CalvinNeo Nov 15, 2024
68402db
add label
CalvinNeo Nov 22, 2024
9f2e849
timeout
CalvinNeo Dec 4, 2024
0b225ef
add tests
CalvinNeo Dec 4, 2024
78b624f
it could run
CalvinNeo Dec 10, 2024
77a4f70
change conf to json directly
CalvinNeo Dec 10, 2024
2327279
at least could work
CalvinNeo Dec 12, 2024
52f564b
fix tests
CalvinNeo Dec 12, 2024
81f2dae
fix runmode
CalvinNeo Dec 16, 2024
9029cdb
add some test
CalvinNeo Dec 17, 2024
815c9d3
Use mutex to protect
CalvinNeo Dec 18, 2024
6edfcab
switch to a common name
CalvinNeo Dec 18, 2024
ca3e808
more name
CalvinNeo Dec 18, 2024
b1165e2
add ddesc
CalvinNeo Dec 18, 2024
664d944
add desc
CalvinNeo Dec 18, 2024
0ec83e1
Merge remote-tracking branch 'upstream/master' into ctl-1
CalvinNeo Dec 18, 2024
1b5eecb
remove some debug
CalvinNeo Dec 19, 2024
feb6ccc
json
CalvinNeo Dec 20, 2024
4a0222f
fix
CalvinNeo Dec 20, 2024
b9763af
fix
CalvinNeo Dec 20, 2024
32770a4
make happy happy
CalvinNeo Dec 20, 2024
c60f3a8
make happy happy again
CalvinNeo Dec 20, 2024
1f2d3f6
keep happy happy
CalvinNeo Dec 20, 2024
3a51d3d
fix some tests
CalvinNeo Dec 20, 2024
bfca499
fix some more checks
CalvinNeo Dec 20, 2024
a5919dd
fix some more checks 2
CalvinNeo Dec 20, 2024
9d7a76a
fix some more checks 3
CalvinNeo Dec 20, 2024
3941048
fix some more checks 4
CalvinNeo Dec 20, 2024
f38d356
manual fix
CalvinNeo Dec 20, 2024
987afc9
gci
CalvinNeo Dec 20, 2024
d586084
Update server/api/region.go
CalvinNeo Dec 25, 2024
22da213
address some comments
CalvinNeo Dec 25, 2024
c04479b
push
CalvinNeo Dec 25, 2024
8e2fa03
Merge branch 'ctl-1' of github.com:CalvinNeo/pd into ctl-1
CalvinNeo Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea
}
}

// AddLabelsStore adds store with specified count of region and labels.
func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[string]string) {
// AddLabelsStoreWithLimit adds store with specified count of region and labels and store limit.
func (mc *Cluster) AddLabelsStoreWithLimit(storeID uint64, regionCount int, labels map[string]string, limit float64) {
newLabels := make([]*metapb.StoreLabel, 0, len(labels))
for k, v := range labels {
newLabels = append(newLabels, &metapb.StoreLabel{Key: k, Value: v})
Expand All @@ -388,11 +388,16 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
core.SetRegionSize(int64(regionCount)*defaultRegionSize/units.MiB),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.SetStoreLimit(storeID, storelimit.AddPeer, limit)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, limit)
mc.PutStore(store)
}

// AddLabelsStore adds store with specified count of region and labels.
func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[string]string) {
mc.AddLabelsStoreWithLimit(storeID, regionCount, labels, 60)
}

// AddLabersStoreWithLearnerCount adds store with specified count of region, learner and labels.
func (mc *Cluster) AddLabersStoreWithLearnerCount(storeID uint64, regionCount int, learnerCount int, labels map[string]string) {
mc.AddLabelsStore(storeID, regionCount, labels)
Expand Down
55 changes: 55 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/typeutil"
)

Expand Down Expand Up @@ -1303,6 +1304,60 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string,
return state, nil
}

// BalanceKeyrange create a new balance-keyrange scheduler according to API request.
func (h *Handler) BalanceKeyrange(data string) (string, error) {
sc, err := h.GetSchedulersController()
if err != nil {
return "", err
}
exist, _ := sc.IsSchedulerExisted(types.BalanceKeyrangeScheduler.String())
if exist {
return "Already existed", errs.ErrSchedulerExisted.FastGenByArgs()
}
oc := h.GetCoordinator().GetOperatorController()
controller := h.GetCoordinator().GetSchedulersController()
cb := func(s string) error {
return controller.RemoveScheduler(s)
}
s, err := schedulers.CreateScheduler(types.BalanceKeyrangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyrangeScheduler, []string{data}), cb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PD may lose the config if the pd server restarts or transfers leader?

if err != nil {
return "Created scheduler failed", err
}
if err = controller.AddScheduler(s); err != nil {
return "Add scheduler failed", err
}
return "Scheduler added successfully", nil
}

// CheckBalanceKeyrangeStatus returns the status of the balance-keyrange scheduler.
func (h *Handler) CheckBalanceKeyrangeStatus() (any, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about defining a response struct and not using any? and I think the caller may need the key range. .

sc, err := h.GetSchedulersController()
makeJSONResp := func(s string) any {
return struct {
Scheduling bool `json:"scheduling"`
ErrMsg string `json:"err_msg"`
}{
Scheduling: false,
ErrMsg: s,
}
}
if err != nil {
return makeJSONResp("get scheduler control error"), err
}
s := sc.GetScheduler(types.BalanceKeyrangeScheduler.String())
if s == nil {
return makeJSONResp("no scheduler found"), nil
}
if s.IsDisable() {
return makeJSONResp("scheduler disabled"), nil
}
st := sc.GetSchedulerStatus(types.BalanceKeyrangeScheduler.String())
if st == nil {
return makeJSONResp("can't get scheduler status"), nil
}
return st, nil
}

// GetRuleManager returns the rule manager.
func (h *Handler) GetRuleManager() (*placement.RuleManager, error) {
c := h.GetCluster()
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/operator/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
OpWitnessLeader
// Include witness transfer.
OpWitness
// Include keyrange balancer
OpKeyrange
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the "OpRange" instead of adding "OpKeyrange"?

Copy link
Member Author

@CalvinNeo CalvinNeo Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for scatter range scheduler. And I have seen some predicates are based on opkind. In this view, I think different schedulers do not interfere with each other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator kind's priority needs to be higher, it is currently positioned behind the OpRange.

opMax
)

Expand Down
4 changes: 4 additions & 0 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"

"github.com/tikv/pd/pkg/core"
Expand Down Expand Up @@ -292,6 +293,9 @@ func (o *Operator) IsEnd() bool {

// CheckSuccess checks if all steps are finished, and update the status.
func (o *Operator) CheckSuccess() bool {
failpoint.Inject("forceSucess", func() {
atomic.StoreInt32(&o.currentStep, int32(len(o.steps)))
})
if atomic.LoadInt32(&o.currentStep) >= int32(len(o.steps)) {
return o.status.To(SUCCESS) || o.Status() == SUCCESS
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,16 @@ func (oc *Controller) PromoteWaitingOperator() {
}
}

// GetWopCount returns the count in the waiting list of a kind of scheduler.
func (oc *Controller) GetWopCount(kind string) uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using this function to check the concurrent operators ? s.OpController.OperatorCount

return oc.wopStatus.getCount(kind)
}

// GetSchedulerMaxWaitingOperator returns the waiting queue side.
func (oc *Controller) GetSchedulerMaxWaitingOperator() uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (oc *Controller) GetSchedulerMaxWaitingOperator() uint64 {
func (oc *Controller) GetSchedulerMaxWaitingOperatorLimit() uint64 {

return oc.config.GetSchedulerMaxWaitingOperator()
}

// checkAddOperator checks if the operator can be added.
// There are several situations that cannot be added:
// - There is no such region in the cluster
Expand Down Expand Up @@ -948,7 +958,7 @@ func (o *records) Put(op *Operator) {
o.ttl.Put(id, record)
}

// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the Otherwise, returns false.
// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the opertor. Otherwise, returns false.
func (oc *Controller) ExceedStoreLimit(ops ...*Operator) bool {
// The operator with Urgent priority, like admin operators, should ignore the store limit check.
var desc string
Expand Down
Loading
Loading