-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler: Introduce BalaceKeyrange scheduler #8497
base: master
Are you sure you want to change the base?
Changes from all commits
63a7fec
e49630b
3e683fe
cdd686a
9bfe8b6
9afa987
b9e3ca2
d8df5fd
60baee3
9537d03
ba6d418
68402db
9f2e849
0b225ef
78b624f
77a4f70
2327279
52f564b
81f2dae
9029cdb
815c9d3
6edfcab
ca3e808
b1165e2
664d944
0ec83e1
1b5eecb
feb6ccc
4a0222f
b9763af
32770a4
c60f3a8
1f2d3f6
3a51d3d
bfca499
a5919dd
9d7a76a
3941048
f38d356
987afc9
d586084
22da213
c04479b
8e2fa03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ import ( | |
"github.com/tikv/pd/pkg/statistics" | ||
"github.com/tikv/pd/pkg/statistics/buckets" | ||
"github.com/tikv/pd/pkg/statistics/utils" | ||
"github.com/tikv/pd/pkg/storage" | ||
"github.com/tikv/pd/pkg/utils/typeutil" | ||
) | ||
|
||
|
@@ -1303,6 +1304,60 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, | |
return state, nil | ||
} | ||
|
||
// BalanceKeyrange create a new balance-keyrange scheduler according to API request. | ||
func (h *Handler) BalanceKeyrange(data string) (string, error) { | ||
sc, err := h.GetSchedulersController() | ||
if err != nil { | ||
return "", err | ||
} | ||
exist, _ := sc.IsSchedulerExisted(types.BalanceKeyrangeScheduler.String()) | ||
if exist { | ||
return "Already existed", errs.ErrSchedulerExisted.FastGenByArgs() | ||
} | ||
oc := h.GetCoordinator().GetOperatorController() | ||
controller := h.GetCoordinator().GetSchedulersController() | ||
cb := func(s string) error { | ||
return controller.RemoveScheduler(s) | ||
} | ||
s, err := schedulers.CreateScheduler(types.BalanceKeyrangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyrangeScheduler, []string{data}), cb) | ||
if err != nil { | ||
return "Created scheduler failed", err | ||
} | ||
if err = controller.AddScheduler(s); err != nil { | ||
return "Add scheduler failed", err | ||
} | ||
return "Scheduler added successfully", nil | ||
} | ||
|
||
// CheckBalanceKeyrangeStatus returns the status of the balance-keyrange scheduler. | ||
func (h *Handler) CheckBalanceKeyrangeStatus() (any, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about defining a response struct and not using any? and I think the caller may need the key range. . |
||
sc, err := h.GetSchedulersController() | ||
makeJSONResp := func(s string) any { | ||
return struct { | ||
Scheduling bool `json:"scheduling"` | ||
ErrMsg string `json:"err_msg"` | ||
}{ | ||
Scheduling: false, | ||
ErrMsg: s, | ||
} | ||
} | ||
if err != nil { | ||
return makeJSONResp("get scheduler control error"), err | ||
} | ||
s := sc.GetScheduler(types.BalanceKeyrangeScheduler.String()) | ||
if s == nil { | ||
return makeJSONResp("no scheduler found"), nil | ||
} | ||
if s.IsDisable() { | ||
return makeJSONResp("scheduler disabled"), nil | ||
} | ||
st := sc.GetSchedulerStatus(types.BalanceKeyrangeScheduler.String()) | ||
if st == nil { | ||
return makeJSONResp("can't get scheduler status"), nil | ||
} | ||
return st, nil | ||
} | ||
|
||
// GetRuleManager returns the rule manager. | ||
func (h *Handler) GetRuleManager() (*placement.RuleManager, error) { | ||
c := h.GetCluster() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,8 @@ const ( | |
OpWitnessLeader | ||
// Include witness transfer. | ||
OpWitness | ||
// Include keyrange balancer | ||
OpKeyrange | ||
Comment on lines
+52
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reuse the "OpRange" instead of adding "OpKeyrange"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for scatter range scheduler. And I have seen some predicates are based on opkind. In this view, I think different schedulers do not interfere with each other. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This operator kind's priority needs to be higher, it is currently positioned behind the OpRange. |
||
opMax | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -418,6 +418,16 @@ func (oc *Controller) PromoteWaitingOperator() { | |||||
} | ||||||
} | ||||||
|
||||||
// GetWopCount returns the count in the waiting list of a kind of scheduler. | ||||||
func (oc *Controller) GetWopCount(kind string) uint64 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using this function to check the concurrent operators ? |
||||||
return oc.wopStatus.getCount(kind) | ||||||
} | ||||||
|
||||||
// GetSchedulerMaxWaitingOperator returns the waiting queue side. | ||||||
func (oc *Controller) GetSchedulerMaxWaitingOperator() uint64 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return oc.config.GetSchedulerMaxWaitingOperator() | ||||||
} | ||||||
|
||||||
// checkAddOperator checks if the operator can be added. | ||||||
// There are several situations that cannot be added: | ||||||
// - There is no such region in the cluster | ||||||
|
@@ -948,7 +958,7 @@ func (o *records) Put(op *Operator) { | |||||
o.ttl.Put(id, record) | ||||||
} | ||||||
|
||||||
// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the Otherwise, returns false. | ||||||
// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the opertor. Otherwise, returns false. | ||||||
func (oc *Controller) ExceedStoreLimit(ops ...*Operator) bool { | ||||||
// The operator with Urgent priority, like admin operators, should ignore the store limit check. | ||||||
var desc string | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PD may lose the config if the pd server restarts or transfers leader?