From 714911b65d8fd9d752724ce27e7d1f9e2b702260 Mon Sep 17 00:00:00 2001 From: shentiecheng Date: Thu, 17 Aug 2023 15:42:24 +0800 Subject: [PATCH] add scheduler metrics Signed-off-by: shentiecheng --- .../core/v1alpha1/types_schedulingprofile.go | 12 ++++ pkg/controllers/scheduler/constants.go | 11 +++ .../scheduler/core/generic_scheduler_test.go | 4 +- .../scheduler/framework/runtime/framework.go | 68 ++++++++++++++++--- .../framework/runtime/framework_test.go | 11 ++- pkg/controllers/scheduler/framework/types.go | 15 ++++ pkg/controllers/scheduler/profile.go | 2 + pkg/controllers/scheduler/scheduler.go | 52 ++++++++++++-- .../deployment/util/deployment_util_test.go | 2 - pkg/util/fedobjectadapters/adapters.go | 1 - .../federatedinformermanager.go | 8 +-- test/e2e/framework/cluster.go | 4 +- 12 files changed, 164 insertions(+), 26 deletions(-) diff --git a/pkg/apis/core/v1alpha1/types_schedulingprofile.go b/pkg/apis/core/v1alpha1/types_schedulingprofile.go index 255fce8e..d3d7b4b2 100644 --- a/pkg/apis/core/v1alpha1/types_schedulingprofile.go +++ b/pkg/apis/core/v1alpha1/types_schedulingprofile.go @@ -108,6 +108,11 @@ const ( WebhookPlugin PluginType = "Webhook" ) +const ( + // DefaultSchedulerName defines the name of default scheduler. + DefaultSchedulerName = "default-scheduler" +) + // PluginConfig specifies arguments that should be passed to a plugin at the time of initialization. // A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure. // It is up to the plugin to process these Args. @@ -118,3 +123,10 @@ type PluginConfig struct { // +optional Args apiextensionsv1.JSON `json:"args"` } + +func (s *SchedulingProfile) ProfileName() string { + if s == nil { + return DefaultSchedulerName + } + return s.Name +} diff --git a/pkg/controllers/scheduler/constants.go b/pkg/controllers/scheduler/constants.go index 0a1e85d5..165ae304 100644 --- a/pkg/controllers/scheduler/constants.go +++ b/pkg/controllers/scheduler/constants.go @@ -49,3 +49,14 @@ const ( SchedulingTriggersAnnotation = common.DefaultPrefix + "scheduling-triggers" SchedulingDeferredReasonsAnnotation = common.DefaultPrefix + "scheduling-deferred-reasons" ) + +const ( + // FedObjChanged is the event when FederatedObject/ClusterFederatedObject changes. + FedObjChanged = "FedObjChanged" + // PolicyChanged is the event when PropagationPolicy/ClusterPropagationPolicy changes. + PolicyChanged = "PolicyChanged" + // ClusterChanged is the event when cluster changes. + ClusterChanged = "ClusterChanged" + // FTCChanged is the event when FTC changes. + FTCChanged = "FTCChanged" +) diff --git a/pkg/controllers/scheduler/core/generic_scheduler_test.go b/pkg/controllers/scheduler/core/generic_scheduler_test.go index f38cdb2c..a882e8de 100644 --- a/pkg/controllers/scheduler/core/generic_scheduler_test.go +++ b/pkg/controllers/scheduler/core/generic_scheduler_test.go @@ -29,6 +29,7 @@ import ( fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/runtime" + "github.com/kubewharf/kubeadmiral/pkg/stats" ) type naiveReplicasPlugin struct{} @@ -57,7 +58,8 @@ func getFramework() framework.Framework { DefaultRegistry := runtime.Registry{ "NaiveReplicas": newNaiveReplicas, } - f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}}) + metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false) + f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}}, "", metrics) return f } diff --git a/pkg/controllers/scheduler/framework/runtime/framework.go b/pkg/controllers/scheduler/framework/runtime/framework.go index 8da7e857..ee06e79a 100644 --- a/pkg/controllers/scheduler/framework/runtime/framework.go +++ b/pkg/controllers/scheduler/framework/runtime/framework.go @@ -24,6 +24,7 @@ import ( "context" "fmt" "reflect" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -31,6 +32,14 @@ import ( fedcore "github.com/kubewharf/kubeadmiral/pkg/apis/core" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" + "github.com/kubewharf/kubeadmiral/pkg/stats" +) + +const ( + filter = "Filter" + score = "Score" + selectClusters = "SelectClusters" + replicas = "Replicas" ) type frameworkImpl struct { @@ -38,11 +47,16 @@ type frameworkImpl struct { scorePlugins []framework.ScorePlugin selectPlugins []framework.SelectPlugin replicasPlugins []framework.ReplicasPlugin + + profileName string + metrics stats.Metrics } var _ framework.Framework = &frameworkImpl{} -func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fedcore.EnabledPlugins) (framework.Framework, error) { +func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fedcore.EnabledPlugins, + profileName string, metrics stats.Metrics, +) (framework.Framework, error) { fwk := &frameworkImpl{} pluginsMap := make(map[string]framework.Plugin) @@ -64,6 +78,8 @@ func NewFramework(registry Registry, handle framework.Handle, enabledPlugins *fe } } + fwk.profileName = profileName + fwk.metrics = metrics return fwk, nil } @@ -115,7 +131,14 @@ func (f *frameworkImpl) RunFilterPlugins( ctx context.Context, schedulingUnit *framework.SchedulingUnit, cluster *fedcorev1a1.FederatedCluster, -) *framework.Result { +) (result *framework.Result) { + startTime := time.Now() + defer func() { + f.metrics.Duration("scheduler_framework_extension_point_duration", startTime, + stats.Tag{Name: "extension_point", Value: filter}, + stats.Tag{Name: "profile", Value: f.profileName}, + stats.Tag{Name: "status", Value: result.Code().String()}) + }() for _, pl := range f.filterPlugins { pluginResult := f.runFilterPlugin(ctx, pl, schedulingUnit, cluster) if !pluginResult.IsSuccess() { @@ -130,9 +153,15 @@ func (f *frameworkImpl) runFilterPlugin( pl framework.FilterPlugin, schedulingUnit *framework.SchedulingUnit, cluster *fedcorev1a1.FederatedCluster, -) *framework.Result { - // TODO: add some metrics here - result := pl.Filter(ctx, schedulingUnit, cluster) +) (result *framework.Result) { + startTime := time.Now() + defer func() { + f.metrics.Duration("scheduler_plugin_execution_duration", startTime, + stats.Tag{Name: "extension_point", Value: filter}, + stats.Tag{Name: "plugin", Value: pl.Name()}, + stats.Tag{Name: "status", Value: result.Code().String()}) + }() + result = pl.Filter(ctx, schedulingUnit, cluster) return result } @@ -140,8 +169,15 @@ func (f *frameworkImpl) RunScorePlugins( ctx context.Context, schedulingUnit *framework.SchedulingUnit, clusters []*fedcorev1a1.FederatedCluster, -) (framework.PluginToClusterScore, *framework.Result) { - result := make(framework.PluginToClusterScore) +) (pluginToClusterScore framework.PluginToClusterScore, result *framework.Result) { + startTime := time.Now() + defer func() { + f.metrics.Duration("scheduler_framework_extension_point_duration", startTime, + stats.Tag{Name: "extension_point", Value: score}, + stats.Tag{Name: "profile", Value: f.profileName}, + stats.Tag{Name: "status", Value: result.Code().String()}) + }() + pluginToClusterScore = make(framework.PluginToClusterScore) for _, plugin := range f.scorePlugins { scoreList := make(framework.ClusterScoreList, len(clusters)) @@ -174,10 +210,10 @@ func (f *frameworkImpl) RunScorePlugins( } } - result[plugin.Name()] = scoreList + pluginToClusterScore[plugin.Name()] = scoreList } - return result, nil + return pluginToClusterScore, nil } func (f *frameworkImpl) RunSelectClustersPlugin( @@ -185,6 +221,13 @@ func (f *frameworkImpl) RunSelectClustersPlugin( schedulingUnit *framework.SchedulingUnit, clusterScores framework.ClusterScoreList, ) (clusters []*fedcorev1a1.FederatedCluster, result *framework.Result) { + startTime := time.Now() + defer func() { + f.metrics.Duration("scheduler_framework_extension_point_duration", startTime, + stats.Tag{Name: "extension_point", Value: selectClusters}, + stats.Tag{Name: "profile", Value: f.profileName}, + stats.Tag{Name: "status", Value: result.Code().String()}) + }() if len(f.selectPlugins) == 0 { for _, clusterScore := range clusterScores { clusters = append(clusters, clusterScore.Cluster) @@ -213,6 +256,13 @@ func (f *frameworkImpl) RunReplicasPlugin( schedulingUnit *framework.SchedulingUnit, clusters []*fedcorev1a1.FederatedCluster, ) (clusterReplicasList framework.ClusterReplicasList, result *framework.Result) { + startTime := time.Now() + defer func() { + f.metrics.Duration("scheduler_framework_extension_point_duration", startTime, + stats.Tag{Name: "extension_point", Value: replicas}, + stats.Tag{Name: "profile", Value: f.profileName}, + stats.Tag{Name: "status", Value: result.Code().String()}) + }() if len(clusters) == 0 { return clusterReplicasList, framework.NewResult( framework.Success, diff --git a/pkg/controllers/scheduler/framework/runtime/framework_test.go b/pkg/controllers/scheduler/framework/runtime/framework_test.go index d4e2baf5..25353feb 100644 --- a/pkg/controllers/scheduler/framework/runtime/framework_test.go +++ b/pkg/controllers/scheduler/framework/runtime/framework_test.go @@ -25,6 +25,7 @@ import ( fedcore "github.com/kubewharf/kubeadmiral/pkg/apis/core" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" + "github.com/kubewharf/kubeadmiral/pkg/stats" ) type naiveFilterPlugin struct { @@ -197,7 +198,8 @@ func TestRunFilterPlugins(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fwk, err := NewFramework(test.plugins, nil, test.enabledPlugins) + metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false) + fwk, err := NewFramework(test.plugins, nil, test.enabledPlugins, "", metrics) if err != nil { t.Fatalf("unexpected error when creating framework: %v", err) } @@ -295,6 +297,8 @@ func TestNewFramework(t *testing.T) { scorePlugins: []framework.ScorePlugin{scorePlugin}, selectPlugins: []framework.SelectPlugin{scoreAndSelectPlugin}, replicasPlugins: []framework.ReplicasPlugin{replicasPlugin}, + profileName: "", + metrics: stats.NewMock("test", "kubeadmiral_controller_manager", false), }, false, }, @@ -309,6 +313,8 @@ func TestNewFramework(t *testing.T) { filterPlugins: []framework.FilterPlugin{filterPlugin, filterAndScorePlugin}, scorePlugins: []framework.ScorePlugin{scorePlugin, scoreAndSelectPlugin}, selectPlugins: []framework.SelectPlugin{scoreAndSelectPlugin, selectPlugin}, + profileName: "", + metrics: stats.NewMock("test", "kubeadmiral_controller_manager", false), }, false, }, @@ -349,7 +355,8 @@ func TestNewFramework(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fwk, err := NewFramework(newRegistry(), nil, &test.enabledPlugins) + metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false) + fwk, err := NewFramework(newRegistry(), nil, &test.enabledPlugins, "", metrics) if test.shouldError { if err == nil { t.Fatal("expected error when creating framework but got nil") diff --git a/pkg/controllers/scheduler/framework/types.go b/pkg/controllers/scheduler/framework/types.go index 3b1abbde..1dab9f31 100644 --- a/pkg/controllers/scheduler/framework/types.go +++ b/pkg/controllers/scheduler/framework/types.go @@ -167,6 +167,13 @@ const ( Error ) +// This list should be exactly the same as the codes iota defined above in the same order. +var codes = []string{"Success", "Unschedulable", "Error"} + +func (c Code) String() string { + return codes[c] +} + // NewResult makes a result out of the given arguments and returns its pointer. func NewResult(code Code, reasons ...string) *Result { s := &Result{ @@ -179,6 +186,14 @@ func NewResult(code Code, reasons ...string) *Result { return s } +// Code returns code of the Result. +func (s *Result) Code() Code { + if s == nil { + return Success + } + return s.code +} + // IsSuccess returns true if and only if "Result" is nil or Code is "Success". func (s *Result) IsSuccess() bool { return s == nil || s.code == Success diff --git a/pkg/controllers/scheduler/profile.go b/pkg/controllers/scheduler/profile.go index 3c6f8c44..4e377fb8 100644 --- a/pkg/controllers/scheduler/profile.go +++ b/pkg/controllers/scheduler/profile.go @@ -109,5 +109,7 @@ func (s *Scheduler) createFramework( registry, handle, enabledPlugins, + profile.ProfileName(), + s.metrics, ) } diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index 17ba2fad..771d6146 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -55,6 +55,12 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) +var ( + ScheduledResult = "scheduled" + UnschedulableResult = "unschedulable" + ErrorResult = "error" +) + const ( SchedulerName = "scheduler" PropagationPolicyNameLabel = common.DefaultPrefix + "propagation-policy-name" @@ -138,11 +144,19 @@ func NewScheduler( fedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform( common.NewQualifiedName, - s.worker.Enqueue, + func(name common.QualifiedName) { + s.worker.Enqueue(name) + s.metrics.Counter("queue_incoming_federated_object_total", 1, + stats.Tag{Name: "event", Value: FedObjChanged}) + }, )) clusterFedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform( common.NewQualifiedName, - s.worker.Enqueue, + func(name common.QualifiedName) { + s.worker.Enqueue(name) + s.metrics.Counter("queue_incoming_federated_object_total", 1, + stats.Tag{Name: "event", Value: FedObjChanged}) + }, )) propagationPolicyInformer.Informer().AddEventHandler( @@ -300,7 +314,7 @@ func (s *Scheduler) reconcile(ctx context.Context, key common.QualifiedName) (st } result, earlyReturnWorkerResult := s.schedule(ctx, ftc, fedObject, policy, schedulingProfile, clusters) - if earlyReturnWorkerResult != nil { + if earlyReturnWorkerResult != nil && !earlyReturnWorkerResult.Success { return *earlyReturnWorkerResult } @@ -480,7 +494,7 @@ func (s *Scheduler) schedule( policy fedcorev1a1.GenericPropagationPolicy, schedulingProfile *fedcorev1a1.SchedulingProfile, clusters []*fedcorev1a1.FederatedCluster, -) (*core.ScheduleResult, *worker.Result) { +) (scheduleResult *core.ScheduleResult, workerResult *worker.Result) { logger := klog.FromContext(ctx) if policy == nil { @@ -496,6 +510,11 @@ func (s *Scheduler) schedule( return &core.ScheduleResult{SuggestedClusters: make(map[string]*int64)}, nil } + startTime := time.Now() + defer func() { + s.federatedObjectSchedule(workerResult, schedulingProfile.ProfileName(), startTime) + }() + // schedule according to matched policy logger.V(2).Info("Matched policy found, start scheduling") s.eventRecorder.Eventf( @@ -547,7 +566,7 @@ func (s *Scheduler) schedule( return nil, &worker.StatusError } - return &result, nil + return &result, &worker.StatusAllOK } func (s *Scheduler) persistSchedulingResult( @@ -769,6 +788,8 @@ func (s *Scheduler) enqueueFederatedObjectsForPolicy(policy metav1.Object) { continue } else if policyKey.Name == policyAccessor.GetName() && policyKey.Namespace == policyAccessor.GetNamespace() { s.worker.Enqueue(common.NewQualifiedName(obj)) + s.metrics.Counter("queue_incoming_federated_object_total", 1, + stats.Tag{Name: "event", Value: PolicyChanged}) } } } @@ -812,6 +833,8 @@ func (s *Scheduler) enqueueFederatedObjectsForCluster(cluster *fedcorev1a1.Feder for obj := range allObjects { s.worker.Enqueue(obj) + s.metrics.Counter("queue_incoming_federated_object_total", 1, + stats.Tag{Name: "event", Value: ClusterChanged}) } } @@ -846,6 +869,8 @@ func (s *Scheduler) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedType } if templateMetadata.GroupVersionKind() == ftc.GetSourceTypeGVK() { s.worker.Enqueue(common.NewQualifiedName(obj)) + s.metrics.Counter("queue_incoming_federated_object_total", 1, + stats.Tag{Name: "event", Value: FTCChanged}) } } } @@ -857,3 +882,20 @@ func (s *Scheduler) policyFromStore(qualifiedName common.QualifiedName) (fedcore } return s.clusterPropagationPolicyInformer.Lister().Get(qualifiedName.Name) } + +func (s *Scheduler) federatedObjectSchedule(result *worker.Result, profileName string, startTime time.Time) { + if result.Success { + s.observeScheduleAttemptAndLatency(ScheduledResult, profileName, startTime) + } else { + s.observeScheduleAttemptAndLatency(ErrorResult, profileName, startTime) + } +} + +func (s *Scheduler) observeScheduleAttemptAndLatency(result, profileName string, startTime time.Time) { + s.metrics.Counter("schedule_attempts_total", 1, + stats.Tag{Name: "profile", Value: profileName}, + stats.Tag{Name: "result", Value: result}) + s.metrics.Duration("scheduling_attempt_duration", startTime, + stats.Tag{Name: "profile", Value: profileName}, + stats.Tag{Name: "result", Value: result}) +} diff --git a/pkg/lifted/kubernetes/pkg/controller/deployment/util/deployment_util_test.go b/pkg/lifted/kubernetes/pkg/controller/deployment/util/deployment_util_test.go index 96256c6b..21069788 100644 --- a/pkg/lifted/kubernetes/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/lifted/kubernetes/pkg/controller/deployment/util/deployment_util_test.go @@ -267,8 +267,6 @@ func TestFindNewReplicaSet(t *testing.T) { } } - - func newString(s string) *string { return &s } diff --git a/pkg/util/fedobjectadapters/adapters.go b/pkg/util/fedobjectadapters/adapters.go index 7741a7d7..bf550028 100644 --- a/pkg/util/fedobjectadapters/adapters.go +++ b/pkg/util/fedobjectadapters/adapters.go @@ -138,4 +138,3 @@ func Delete( return fedv1a1Client.FederatedObjects(namespace).Delete(ctx, name, opts) } } - diff --git a/pkg/util/informermanager/federatedinformermanager.go b/pkg/util/informermanager/federatedinformermanager.go index 81b6b727..250d48f1 100644 --- a/pkg/util/informermanager/federatedinformermanager.go +++ b/pkg/util/informermanager/federatedinformermanager.go @@ -105,10 +105,10 @@ func NewFederatedInformerManager( queue: workqueue.NewRateLimitingQueue( workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), ), - podListerSemaphore: semaphore.NewWeighted(3), // TODO: make this configurable - initialClusters: sets.New[string](), - podEventHandlers: []*ResourceEventHandlerWithClusterFuncs{}, - podEventRegistrations: map[string]map[*ResourceEventHandlerWithClusterFuncs]cache.ResourceEventHandlerRegistration{}, + podListerSemaphore: semaphore.NewWeighted(3), // TODO: make this configurable + initialClusters: sets.New[string](), + podEventHandlers: []*ResourceEventHandlerWithClusterFuncs{}, + podEventRegistrations: map[string]map[*ResourceEventHandlerWithClusterFuncs]cache.ResourceEventHandlerRegistration{}, } clusterInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index b4366060..ab0fb390 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -22,7 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util" + "github.com/kubewharf/kubeadmiral/pkg/util/cascadingdeletion" ) const ( @@ -51,7 +51,7 @@ func WithCascadingDelete(c *fedcorev1a1.FederatedCluster) { if c.Annotations == nil { c.Annotations = map[string]string{} } - c.Annotations[util.AnnotationCascadingDelete] = "true" + c.Annotations[cascadingdeletion.AnnotationCascadingDelete] = "true" } func WithTaints(c *fedcorev1a1.FederatedCluster) {