Skip to content

Commit

Permalink
koord-scheduler: add a parameter to mark whether scheduling is allowe…
Browse files Browse the repository at this point in the history
…d on node with expired nodemetric (koordinator-sh#2076)

Signed-off-by: lucming <[email protected]>
  • Loading branch information
lucming authored Jun 3, 2024
1 parent c159ea2 commit 1bac4d3
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type LoadAwareSchedulingArgs struct {
// When NodeMetrics expired, the node is considered abnormal.
// Default is 180 seconds.
NodeMetricExpirationSeconds *int64
// EnableScheduleWhenNodeMetricsExpired Indicates whether nodes with expired nodeMetrics are allowed to schedule pods.
EnableScheduleWhenNodeMetricsExpired *bool
// ResourceWeights indicates the weights of resources.
// The weights of CPU and Memory are both 1 by default.
ResourceWeights map[corev1.ResourceName]int64
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/apis/config/v1beta3/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func SetDefaults_LoadAwareSchedulingArgs(obj *LoadAwareSchedulingArgs) {
if obj.FilterExpiredNodeMetrics == nil {
obj.FilterExpiredNodeMetrics = pointer.Bool(true)
}
if obj.EnableScheduleWhenNodeMetricsExpired == nil {
obj.EnableScheduleWhenNodeMetricsExpired = pointer.Bool(false)
}
if obj.NodeMetricExpirationSeconds == nil {
obj.NodeMetricExpirationSeconds = pointer.Int64(defaultNodeMetricExpirationSeconds)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type LoadAwareSchedulingArgs struct {
// When NodeMetrics expired, the node is considered abnormal.
// Default is 180 seconds.
NodeMetricExpirationSeconds *int64 `json:"nodeMetricExpirationSeconds,omitempty"`
// EnableScheduleWhenNodeMetricsExpired Indicates whether nodes with expired nodeMetrics are allowed to schedule pods.
EnableScheduleWhenNodeMetricsExpired *bool `json:"enableScheduleWhenNodeMetricsExpired,omitempty"`
// ResourceWeights indicates the weights of resources.
// The weights of CPU and Memory are both 1 by default.
ResourceWeights map[corev1.ResourceName]int64 `json:"resourceWeights,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/scheduler/apis/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/scheduler/plugins/loadaware/load_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

const (
Name = "LoadAwareScheduling"
ErrReasonNodeMetricExpired = "node(s) nodeMetric expired"
ErrReasonUsageExceedThreshold = "node(s) %s usage exceed threshold"
ErrReasonAggregatedUsageExceedThreshold = "node(s) %s aggregated usage exceed threshold"
ErrReasonFailedEstimatePod
Expand Down Expand Up @@ -143,6 +144,9 @@ func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *c

if p.args.FilterExpiredNodeMetrics != nil && *p.args.FilterExpiredNodeMetrics &&
p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) {
if p.args.EnableScheduleWhenNodeMetricsExpired != nil && !*p.args.EnableScheduleWhenNodeMetricsExpired {
return framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired)
}
return nil
}

Expand Down
161 changes: 160 additions & 1 deletion pkg/scheduler/plugins/loadaware/load_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestFilterExpiredNodeMetric(t *testing.T) {
},
},
},
wantStatus: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired),
},
{
name: "filter unhealthy nodeMetric with expired updateTime",
Expand All @@ -202,13 +202,172 @@ func TestFilterExpiredNodeMetric(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var v1beta3args v1beta3.LoadAwareSchedulingArgs
v1beta3.SetDefaults_LoadAwareSchedulingArgs(&v1beta3args)
var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs
err := v1beta3.Convert_v1beta3_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta3args, &loadAwareSchedulingArgs, nil)
assert.NoError(t, err)

koordClientSet := koordfake.NewSimpleClientset()
koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0)
extenderFactory, _ := frameworkext.NewFrameworkExtenderFactory(
frameworkext.WithKoordinatorClientSet(koordClientSet),
frameworkext.WithKoordinatorSharedInformerFactory(koordSharedInformerFactory),
)
proxyNew := frameworkext.PluginFactoryProxy(extenderFactory, New)

cs := kubefake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)

nodes := []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: tt.nodeMetric.Name,
},
},
}

snapshot := newTestSharedLister(nil, nodes)
registeredPlugins := []schedulertesting.RegisterPluginFunc{
schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
}
fh, err := schedulertesting.NewFramework(context.TODO(), registeredPlugins, "koord-scheduler",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
)
assert.Nil(t, err)

p, err := proxyNew(&loadAwareSchedulingArgs, fh)
assert.NotNil(t, p)
assert.Nil(t, err)

_, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{})
assert.NoError(t, err)

koordSharedInformerFactory.Start(context.TODO().Done())
koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done())

cycleState := framework.NewCycleState()

nodeInfo, err := snapshot.Get(tt.nodeMetric.Name)
assert.NoError(t, err)
assert.NotNil(t, nodeInfo)

status := p.(*Plugin).Filter(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo)
assert.True(t, tt.wantStatus.Equal(status), "want status: %s, but got %s", tt.wantStatus.Message(), status.Message())
})
}
}

func TestEnableScheduleWhenNodeMetricsExpired(t *testing.T) {
tests := []struct {
name string
nodeMetric *slov1alpha1.NodeMetric
enableScheduleWhenNodeMetricsExpired *bool
wantStatus *framework.Status
}{
{
name: "filter healthy nodeMetrics",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(60),
},
},
Status: slov1alpha1.NodeMetricStatus{
UpdateTime: &metav1.Time{
Time: time.Now(),
},
},
},
wantStatus: nil,
},
{
name: "enable scheduling when nodeMetric with nil updateTime",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(60),
},
},
},
enableScheduleWhenNodeMetricsExpired: pointer.Bool(true),
wantStatus: nil,
},
{
name: "enable scheduling when nodeMetric with expired updateTime",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(60),
},
},
Status: slov1alpha1.NodeMetricStatus{
UpdateTime: &metav1.Time{
Time: time.Now().Add(-180 * time.Second),
},
},
},
enableScheduleWhenNodeMetricsExpired: pointer.Bool(true),
wantStatus: nil,
},
{
name: "disable scheduling when nodeMetric with nil updateTime",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(60),
},
},
},
enableScheduleWhenNodeMetricsExpired: pointer.Bool(false),
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired),
},
{
name: "disable scheduling when nodeMetric with expired updateTime",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(60),
},
},
Status: slov1alpha1.NodeMetricStatus{
UpdateTime: &metav1.Time{
Time: time.Now().Add(-180 * time.Second),
},
},
},
enableScheduleWhenNodeMetricsExpired: pointer.Bool(false),
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var v1beta3args v1beta3.LoadAwareSchedulingArgs
v1beta3.SetDefaults_LoadAwareSchedulingArgs(&v1beta3args)
v1beta3args.EnableScheduleWhenNodeMetricsExpired = tt.enableScheduleWhenNodeMetricsExpired
var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs
err := v1beta3.Convert_v1beta3_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta3args, &loadAwareSchedulingArgs, nil)
assert.NoError(t, err)
Expand Down

0 comments on commit 1bac4d3

Please sign in to comment.