From 8ab3e65dc20125c4f457860ef3d2ac7d78191243 Mon Sep 17 00:00:00 2001 From: haorenhui Date: Fri, 6 Dec 2024 11:28:26 +0800 Subject: [PATCH 1/4] [YUNIKORN-2991] The queue in Draining state does not accept new applications --- pkg/scheduler/partition.go | 4 ++++ pkg/scheduler/partition_test.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 7db7e4fe8..c89bca39a 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -342,6 +342,10 @@ func (pc *PartitionContext) AddApplication(app *objects.Application) error { return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) } + if queue.IsDraining() { + return fmt.Errorf("queue %s is draining, cannot add application %s", queueName, appID) + } + guaranteedRes := app.GetGuaranteedResource() maxRes := app.GetMaxResource() maxApps := app.GetMaxApps() diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 75e529eb6..65ed538ab 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -906,6 +906,21 @@ func TestAddApp(t *testing.T) { scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() assert.NilError(t, err, "get scheduler metrics failed") assert.Equal(t, scheduleApplicationsNew, 1) + + // mark queue stopped, no new application can be added + err = partition.handlePartitionEvent(objects.Start) + assert.NilError(t, err, "partition state change failed unexpectedly") + partition.GetQueue(defQueue).MarkQueueForRemoval() + err = partition.AddApplication(app) + if err == nil || partition.getApplication(appID3) != nil { + t.Errorf("add application on draining queue should have failed but did not") + } + queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew() + assert.NilError(t, err, "get queue metrics failed") + assert.Equal(t, queueApplicationsNew, 1) + scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() + assert.NilError(t, err, "get scheduler metrics failed") + assert.Equal(t, scheduleApplicationsNew, 1) } func TestAddAppForced(t *testing.T) { From 36ff74495d084278510537c0a31bcbb7b73fd3f9 Mon Sep 17 00:00:00 2001 From: haorenhui Date: Wed, 25 Dec 2024 17:33:03 +0800 Subject: [PATCH 2/4] Revert "[YUNIKORN-2991] The queue in Draining state does not accept new applications" This reverts commit 8ab3e65dc20125c4f457860ef3d2ac7d78191243. --- pkg/scheduler/partition.go | 4 ---- pkg/scheduler/partition_test.go | 15 --------------- 2 files changed, 19 deletions(-) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index c89bca39a..7db7e4fe8 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -342,10 +342,6 @@ func (pc *PartitionContext) AddApplication(app *objects.Application) error { return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) } - if queue.IsDraining() { - return fmt.Errorf("queue %s is draining, cannot add application %s", queueName, appID) - } - guaranteedRes := app.GetGuaranteedResource() maxRes := app.GetMaxResource() maxApps := app.GetMaxApps() diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 65ed538ab..75e529eb6 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -906,21 +906,6 @@ func TestAddApp(t *testing.T) { scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() assert.NilError(t, err, "get scheduler metrics failed") assert.Equal(t, scheduleApplicationsNew, 1) - - // mark queue stopped, no new application can be added - err = partition.handlePartitionEvent(objects.Start) - assert.NilError(t, err, "partition state change failed unexpectedly") - partition.GetQueue(defQueue).MarkQueueForRemoval() - err = partition.AddApplication(app) - if err == nil || partition.getApplication(appID3) != nil { - t.Errorf("add application on draining queue should have failed but did not") - } - queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew() - assert.NilError(t, err, "get queue metrics failed") - assert.Equal(t, queueApplicationsNew, 1) - scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew() - assert.NilError(t, err, "get scheduler metrics failed") - assert.Equal(t, scheduleApplicationsNew, 1) } func TestAddAppForced(t *testing.T) { From f3d2d180caf7ecdee1acbbc062bb7a26e56e21db Mon Sep 17 00:00:00 2001 From: haorenhui Date: Wed, 25 Dec 2024 17:33:53 +0800 Subject: [PATCH 3/4] [YUNIKORN-2991] The queue in Draining state should not accept new applications --- pkg/scheduler/placement/placement.go | 10 ++++++++++ pkg/scheduler/placement/placement_test.go | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/pkg/scheduler/placement/placement.go b/pkg/scheduler/placement/placement.go index 10e3e663a..93bec9584 100644 --- a/pkg/scheduler/placement/placement.go +++ b/pkg/scheduler/placement/placement.go @@ -183,6 +183,16 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { queueName = "" continue } + // Check if the queue in Draining state, and if so, proceed to the next rule + if queue.IsDraining() { + log.Log(log.SchedApplication).Debug("Cannot Placing application in draining queue", + zap.String("queueName", queueName), + zap.String("ruleName", checkRule.getName()), + zap.String("application", app.ApplicationID)) + // reset the queue name for the last rule in the chain + queueName = "" + continue + } } // we have a queue that allows submitting and can be created: app placed log.Log(log.SchedApplication).Info("Rule result for placing application", diff --git a/pkg/scheduler/placement/placement_test.go b/pkg/scheduler/placement/placement_test.go index 9ea73a789..811aaec72 100644 --- a/pkg/scheduler/placement/placement_test.go +++ b/pkg/scheduler/placement/placement_test.go @@ -323,6 +323,15 @@ partitions: if err == nil || queueName != "" { t.Errorf("parent queue: app should not have been placed, queue: '%s', error: %v", queueName, err) } + + // provided rule (2nd): submit to draining queue + app = newApplication("app1", "default", "root.testparent.testchild", user, nil, nil, "") + man.queueFn("root.testparent.testchild").MarkQueueForRemoval() + err = man.PlaceApplication(app) + queueName = app.GetQueuePath() + if err == nil || queueName != "" { + t.Errorf("Cannot Placing application in draining queue: '%s', error: %v", queueName, err) + } } //nolint:funlen From 0b42475bb5211803f43fdfbc566c237d317e17c8 Mon Sep 17 00:00:00 2001 From: haorenhui Date: Tue, 31 Dec 2024 17:02:49 +0800 Subject: [PATCH 4/4] [YUNIKORN-2991] update text --- pkg/scheduler/placement/placement.go | 2 +- pkg/scheduler/placement/placement_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/placement/placement.go b/pkg/scheduler/placement/placement.go index 93bec9584..7cc8fed43 100644 --- a/pkg/scheduler/placement/placement.go +++ b/pkg/scheduler/placement/placement.go @@ -185,7 +185,7 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { } // Check if the queue in Draining state, and if so, proceed to the next rule if queue.IsDraining() { - log.Log(log.SchedApplication).Debug("Cannot Placing application in draining queue", + log.Log(log.SchedApplication).Debug("Cannot place application in draining queue", zap.String("queueName", queueName), zap.String("ruleName", checkRule.getName()), zap.String("application", app.ApplicationID)) diff --git a/pkg/scheduler/placement/placement_test.go b/pkg/scheduler/placement/placement_test.go index 811aaec72..f0bef0040 100644 --- a/pkg/scheduler/placement/placement_test.go +++ b/pkg/scheduler/placement/placement_test.go @@ -330,7 +330,7 @@ partitions: err = man.PlaceApplication(app) queueName = app.GetQueuePath() if err == nil || queueName != "" { - t.Errorf("Cannot Placing application in draining queue: '%s', error: %v", queueName, err) + t.Errorf("draining queue: app should not have been placed, queue: '%s', error: %v", queueName, err) } }