Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add scheduler metrics #205

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/apis/core/v1alpha1/types_schedulingprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ const (
WebhookPlugin PluginType = "Webhook"
)

const (
// DefaultSchedulerName defines the name of default scheduler.
DefaultSchedulerName = "default-scheduler"
)

// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization.
// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure.
// It is up to the plugin to process these Args.
Expand All @@ -118,3 +123,10 @@ type PluginConfig struct {
// +optional
Args apiextensionsv1.JSON `json:"args"`
}

func (s *SchedulingProfile) ProfileName() string {
if s == nil {
return DefaultSchedulerName
}
return s.Name
}
11 changes: 11 additions & 0 deletions pkg/controllers/scheduler/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,14 @@ const (
SchedulingTriggersAnnotation = common.DefaultPrefix + "scheduling-triggers"
SchedulingDeferredReasonsAnnotation = common.DefaultPrefix + "scheduling-deferred-reasons"
)

const (
// FedObjChanged is the event when FederatedObject/ClusterFederatedObject changes.
FedObjChanged = "FedObjChanged"
// PolicyChanged is the event when PropagationPolicy/ClusterPropagationPolicy changes.
PolicyChanged = "PolicyChanged"
// ClusterChanged is the event when cluster changes.
ClusterChanged = "ClusterChanged"
// FTCChanged is the event when FTC changes.
FTCChanged = "FTCChanged"
)
4 changes: 3 additions & 1 deletion pkg/controllers/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/runtime"
"github.com/kubewharf/kubeadmiral/pkg/stats"
)

type naiveReplicasPlugin struct{}
Expand Down Expand Up @@ -57,7 +58,8 @@ func getFramework() framework.Framework {
DefaultRegistry := runtime.Registry{
"NaiveReplicas": newNaiveReplicas,
}
f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}})
metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false)
f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}}, "", metrics)
return f
}

Expand Down
68 changes: 59 additions & 9 deletions pkg/controllers/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,39 @@ import (
"context"
"fmt"
"reflect"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

fedcore "github.com/kubewharf/kubeadmiral/pkg/apis/core"
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/stats"
)

const (
filter = "Filter"
score = "Score"
selectClusters = "SelectClusters"
replicas = "Replicas"
)

type frameworkImpl struct {
filterPlugins []framework.FilterPlugin
scorePlugins []framework.ScorePlugin
selectPlugins []framework.SelectPlugin
replicasPlugins []framework.ReplicasPlugin

profileName string
metrics stats.Metrics
}

var _ framework.Framework = &frameworkImpl{}

func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fedcore.EnabledPlugins) (framework.Framework, error) {
func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fedcore.EnabledPlugins,
profileName string, metrics stats.Metrics,
) (framework.Framework, error) {
fwk := &frameworkImpl{}

pluginsMap := make(map[string]framework.Plugin)
Expand All @@ -64,6 +78,8 @@ func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fe
}
}

fwk.profileName = profileName
fwk.metrics = metrics
return fwk, nil
}

Expand Down Expand Up @@ -115,7 +131,14 @@ func (f *frameworkImpl) RunFilterPlugins(
ctx context.Context,
schedulingUnit *framework.SchedulingUnit,
cluster *fedcorev1a1.FederatedCluster,
) *framework.Result {
) (result *framework.Result) {
startTime := time.Now()
defer func() {
f.metrics.Duration("scheduler_framework_extension_point_duration", startTime,
stats.Tag{Name: "extension_point", Value: filter},
stats.Tag{Name: "profile", Value: f.profileName},
stats.Tag{Name: "status", Value: result.Code().String()})
}()
for _, pl := range f.filterPlugins {
pluginResult := f.runFilterPlugin(ctx, pl, schedulingUnit, cluster)
if !pluginResult.IsSuccess() {
Expand All @@ -130,18 +153,31 @@ func (f *frameworkImpl) runFilterPlugin(
pl framework.FilterPlugin,
schedulingUnit *framework.SchedulingUnit,
cluster *fedcorev1a1.FederatedCluster,
) *framework.Result {
// TODO: add some metrics here
result := pl.Filter(ctx, schedulingUnit, cluster)
) (result *framework.Result) {
startTime := time.Now()
defer func() {
f.metrics.Duration("scheduler_plugin_execution_duration", startTime,
stats.Tag{Name: "extension_point", Value: filter},
stats.Tag{Name: "plugin", Value: pl.Name()},
stats.Tag{Name: "status", Value: result.Code().String()})
}()
result = pl.Filter(ctx, schedulingUnit, cluster)
return result
}

func (f *frameworkImpl) RunScorePlugins(
ctx context.Context,
schedulingUnit *framework.SchedulingUnit,
clusters []*fedcorev1a1.FederatedCluster,
) (framework.PluginToClusterScore, *framework.Result) {
result := make(framework.PluginToClusterScore)
) (pluginToClusterScore framework.PluginToClusterScore, result *framework.Result) {
startTime := time.Now()
defer func() {
f.metrics.Duration("scheduler_framework_extension_point_duration", startTime,
stats.Tag{Name: "extension_point", Value: score},
stats.Tag{Name: "profile", Value: f.profileName},
stats.Tag{Name: "status", Value: result.Code().String()})
}()
pluginToClusterScore = make(framework.PluginToClusterScore)

for _, plugin := range f.scorePlugins {
scoreList := make(framework.ClusterScoreList, len(clusters))
Expand Down Expand Up @@ -174,17 +210,24 @@ func (f *frameworkImpl) RunScorePlugins(
}
}

result[plugin.Name()] = scoreList
pluginToClusterScore[plugin.Name()] = scoreList
}

return result, nil
return pluginToClusterScore, nil
}

func (f *frameworkImpl) RunSelectClustersPlugin(
ctx context.Context,
schedulingUnit *framework.SchedulingUnit,
clusterScores framework.ClusterScoreList,
) (clusters []*fedcorev1a1.FederatedCluster, result *framework.Result) {
startTime := time.Now()
defer func() {
f.metrics.Duration("scheduler_framework_extension_point_duration", startTime,
stats.Tag{Name: "extension_point", Value: selectClusters},
stats.Tag{Name: "profile", Value: f.profileName},
stats.Tag{Name: "status", Value: result.Code().String()})
}()
if len(f.selectPlugins) == 0 {
for _, clusterScore := range clusterScores {
clusters = append(clusters, clusterScore.Cluster)
Expand Down Expand Up @@ -213,6 +256,13 @@ func (f *frameworkImpl) RunReplicasPlugin(
schedulingUnit *framework.SchedulingUnit,
clusters []*fedcorev1a1.FederatedCluster,
) (clusterReplicasList framework.ClusterReplicasList, result *framework.Result) {
startTime := time.Now()
defer func() {
f.metrics.Duration("scheduler_framework_extension_point_duration", startTime,
stats.Tag{Name: "extension_point", Value: replicas},
stats.Tag{Name: "profile", Value: f.profileName},
stats.Tag{Name: "status", Value: result.Code().String()})
}()
if len(clusters) == 0 {
return clusterReplicasList, framework.NewResult(
framework.Success,
Expand Down
11 changes: 9 additions & 2 deletions pkg/controllers/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
fedcore "github.com/kubewharf/kubeadmiral/pkg/apis/core"
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/stats"
)

type naiveFilterPlugin struct {
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestRunFilterPlugins(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fwk, err := NewFramework(test.plugins, nil, test.enabledPlugins)
metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false)
fwk, err := NewFramework(test.plugins, nil, test.enabledPlugins, "", metrics)
if err != nil {
t.Fatalf("unexpected error when creating framework: %v", err)
}
Expand Down Expand Up @@ -295,6 +297,8 @@ func TestNewFramework(t *testing.T) {
scorePlugins: []framework.ScorePlugin{scorePlugin},
selectPlugins: []framework.SelectPlugin{scoreAndSelectPlugin},
replicasPlugins: []framework.ReplicasPlugin{replicasPlugin},
profileName: "",
metrics: stats.NewMock("test", "kubeadmiral_controller_manager", false),
},
false,
},
Expand All @@ -309,6 +313,8 @@ func TestNewFramework(t *testing.T) {
filterPlugins: []framework.FilterPlugin{filterPlugin, filterAndScorePlugin},
scorePlugins: []framework.ScorePlugin{scorePlugin, scoreAndSelectPlugin},
selectPlugins: []framework.SelectPlugin{scoreAndSelectPlugin, selectPlugin},
profileName: "",
metrics: stats.NewMock("test", "kubeadmiral_controller_manager", false),
},
false,
},
Expand Down Expand Up @@ -349,7 +355,8 @@ func TestNewFramework(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fwk, err := NewFramework(newRegistry(), nil, &test.enabledPlugins)
metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false)
fwk, err := NewFramework(newRegistry(), nil, &test.enabledPlugins, "", metrics)
if test.shouldError {
if err == nil {
t.Fatal("expected error when creating framework but got nil")
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ const (
Error
)

// This list should be exactly the same as the codes iota defined above in the same order.
var codes = []string{"Success", "Unschedulable", "Error"}

func (c Code) String() string {
return codes[c]
}

// NewResult makes a result out of the given arguments and returns its pointer.
func NewResult(code Code, reasons ...string) *Result {
s := &Result{
Expand All @@ -179,6 +186,14 @@ func NewResult(code Code, reasons ...string) *Result {
return s
}

// Code returns code of the Result.
func (s *Result) Code() Code {
if s == nil {
return Success
}
return s.code
}

// IsSuccess returns true if and only if "Result" is nil or Code is "Success".
func (s *Result) IsSuccess() bool {
return s == nil || s.code == Success
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/scheduler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,7 @@ func (s *Scheduler) createFramework(
registry,
handle,
enabledPlugins,
profile.ProfileName(),
s.metrics,
)
}
Loading