diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 2b882ff7a..7eae1c79a 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -21,11 +21,12 @@ package common const ( Empty = "" - Wildcard = "*" - Separator = "," - Space = " " - AnonymousUser = "nobody" - AnonymousGroup = "nogroup" - RecoveryQueue = "@recovery@" - RecoveryQueueFull = "root." + RecoveryQueue + Wildcard = "*" + Separator = "," + Space = " " + AnonymousUser = "nobody" + AnonymousGroup = "nogroup" + RecoveryQueue = "@recovery@" + RecoveryQueueFull = "root." + RecoveryQueue + DefaultPlacementQueue = "root.default" ) diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index bd1b5900c..1b55d19d0 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -923,7 +923,7 @@ func TestAddApp(t *testing.T) { } func TestAddAppForced(t *testing.T) { - partition, err := newBasePartition() + partition, err := newBasePartitionNoRootDefault() assert.NilError(t, err, "partition create failed") // add a new app to an invalid queue @@ -1465,9 +1465,9 @@ func TestUpdateQueues(t *testing.T) { } func TestGetApplication(t *testing.T) { - partition, err := newBasePartition() + partition, err := newBasePartitionNoRootDefault() assert.NilError(t, err, "partition create failed") - app := newApplication(appID1, "default", defQueue) + app := newApplication(appID1, "default", "root.custom") err = partition.AddApplication(app) assert.NilError(t, err, "no error expected while adding the application") assert.Equal(t, partition.GetApplication(appID1), app, "partition failed to add app incorrect app returned") @@ -1479,6 +1479,12 @@ func TestGetApplication(t *testing.T) { if partition.GetApplication(appID2) != nil { t.Fatal("partition added app incorrectly should have failed") } + + partition, err = newBasePartition() + assert.NilError(t, err, "partition create failed") + err = partition.AddApplication(app2) + assert.NilError(t, err, "no error expected while adding the application") + assert.Equal(t, partition.GetApplication(appID2), app2, "partition failed to add app incorrect app returned") } func TestGetQueue(t *testing.T) { diff --git a/pkg/scheduler/placement/placement.go b/pkg/scheduler/placement/placement.go index 08aac0c4e..1d16ec70c 100644 --- a/pkg/scheduler/placement/placement.go +++ b/pkg/scheduler/placement/placement.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/locking" "github.com/apache/yunikorn-core/pkg/log" @@ -115,7 +116,9 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { var queueName string var aclCheck bool var err error + var remainingRules = len(m.rules) for _, checkRule := range m.rules { + remainingRules-- log.Log(log.Config).Debug("Executing rule for placing application", zap.String("ruleName", checkRule.getName()), zap.String("application", app.ApplicationID)) @@ -127,6 +130,18 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { app.SetQueuePath("") return err } + // if no queue found even after the last rule, try to place in the default queue + if remainingRules == 0 && queueName == "" { + log.Log(log.Config).Info("No rule matched, placing application in default queue", + zap.String("application", app.ApplicationID), + zap.String("defaultQueue", common.DefaultPlacementQueue)) + // get the queue object + queue := m.queueFn(common.DefaultPlacementQueue) + if queue != nil { + // default queue exist + queueName = common.DefaultPlacementQueue + } + } // queueName returned make sure ACL allows access and create the queueName if not exist if queueName != "" { // get the queue object diff --git a/pkg/scheduler/placement/placement_test.go b/pkg/scheduler/placement/placement_test.go index 947fb94f3..ed4d0e3b4 100644 --- a/pkg/scheduler/placement/placement_test.go +++ b/pkg/scheduler/placement/placement_test.go @@ -23,9 +23,11 @@ import ( "gotest.tools/v3/assert" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/security" "github.com/apache/yunikorn-core/pkg/scheduler/placement/types" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" ) // basic test to check if no rules leave the manager unusable @@ -256,3 +258,269 @@ partitions: t.Errorf("parent queue: app should not have been placed, queue: '%s', error: %v", queueName, err) } } + +func TestForcePlaceApp(t *testing.T) { + const ( + provided = "provided" + providedQ = "root.provided" + defQ = "root.default" + customDefaultQ = "root.custom" + ) + + // Create the structure for the test + // specifically no acl to allow on root + // root.default - undefined + data := ` +partitions: + - name: default + queues: + - name: root + submitacl: "any-user" + queues: + - name: provided + submitacl: "*" + - name: acldeny + submitacl: " " + - name: parent + parent: true + submitacl: "*" +` + err := initQueueStructure([]byte(data)) + assert.NilError(t, err, "setting up the queue config failed") + // update the manager + rules := []configs.PlacementRule{ + {Name: "provided", + Create: false}, + {Name: "tag", + Value: "namespace", + Create: true}, + } + man := NewPlacementManager(rules, queueFunc) + if man == nil { + t.Fatal("placement manager create failed") + } + + tags := make(map[string]string) + user := security.UserGroup{ + User: "any-user", + Groups: []string{}, + } + deny := security.UserGroup{ + User: "deny-user", + Groups: []string{}, + } + var tests = []struct { + name string + queue string + placed string + tags map[string]string + user security.UserGroup + }{ + {"empty", "", "", tags, user}, + {"provided unqualified", provided, providedQ, tags, user}, + {"provided qualified", providedQ, providedQ, tags, user}, + {"provided not exist", "unknown", "", tags, user}, + {"provided parent", "root.parent", "", tags, user}, + {"acl deny", "root.acldeny", "", tags, deny}, + {"create", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user}, + {"deny create", "unknown", "", map[string]string{"namespace": "namespace"}, deny}, + {"forced exist", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced and create", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user}, + {"forced and deny create", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny}, + {"forced parent", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced acl deny", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny}, + {"forced not exist", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced not exist acl deny", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "") + err = man.PlaceApplication(app) + if tt.placed == "" { + assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned") + } else { + assert.NilError(t, err, "unexpected placement failure") + assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set") + } + }) + } + + // Update Queue structure + // root.default - defined + data = ` +partitions: + - name: default + queues: + - name: root + submitacl: "any-user" + queues: + - name: default + submitacl: "*" + - name: provided + submitacl: "*" + - name: acldeny + submitacl: " " + - name: parent + parent: true + submitacl: "*" +` + err = initQueueStructure([]byte(data)) + assert.NilError(t, err, "setting up the queue config failed") + + tests = []struct { + name string + queue string + placed string + tags map[string]string + user security.UserGroup + }{ + {"empty | defaulQ defined", "", defQ, tags, user}, + {"provided unqualified | defaulQ defined", provided, providedQ, tags, user}, + {"provided qualified | defaulQ defined", defQ, defQ, tags, user}, + {"provided not exist | defaulQ defined", "unknown", defQ, tags, user}, + {"provided parent | defaulQ defined", "root.parent", defQ, tags, user}, + {"acl deny | defaulQ defined", "root.acldeny", defQ, tags, deny}, + {"create | defaulQ defined", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user}, + {"deny create | defaulQ defined", "unknown", defQ, map[string]string{"namespace": "namespace"}, deny}, + {"forced exist | defaulQ defined", defQ, defQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced and create | defaulQ defined", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user}, + {"forced and deny create | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny}, + {"forced parent | defaulQ defined", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced acl deny | defaulQ defined", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny}, + {"forced not exist | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced not exist acl deny | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "") + err = man.PlaceApplication(app) + if tt.placed == "" { + assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned") + } else { + assert.NilError(t, err, "unexpected placement failure") + assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set") + } + }) + } + + // initialize queues with no root.default + // Add fixed placement rule to define custom default queue + data = ` +partitions: + - name: default + queues: + - name: root + submitacl: "any-user" + queues: + - name: custom + submitacl: "*" + - name: provided + submitacl: "*" + - name: acldeny + submitacl: " " + - name: parent + parent: true + submitacl: "*" +` + err = initQueueStructure([]byte(data)) + assert.NilError(t, err, "setting up the queue config failed") + + // update the manager + rules = []configs.PlacementRule{ + {Name: "provided", + Create: false}, + {Name: "tag", + Value: "namespace", + Create: true}, + {Name: "fixed", + Value: "root.custom", + Create: true}, + } + man1 := NewPlacementManager(rules, queueFunc) + if man1 == nil { + t.Fatal("placement manager create failed") + } + + tests = []struct { + name string + queue string + placed string + tags map[string]string + user security.UserGroup + }{ + {"empty | custom defaulQ", "", customDefaultQ, tags, user}, + {"provided unqualified | custom defaulQ", provided, providedQ, tags, user}, + {"provided qualified | custom defaulQ", providedQ, providedQ, tags, user}, + {"provided not exist | custom defaulQ", "unknown", customDefaultQ, tags, user}, + {"provided parent | custom defaulQ", "root.parent", customDefaultQ, tags, user}, + {"acl deny | custom defaulQ", "root.acldeny", customDefaultQ, tags, deny}, + {"create | custom defaulQ", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user}, + {"deny create | custom defaulQ", "unknown", customDefaultQ, map[string]string{"namespace": "namespace"}, deny}, + {"forced exist | custom defaulQ", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user}, + {"forced and create | custom defaulQ", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "") + err = man1.PlaceApplication(app) + if tt.placed == "" { + assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned") + } else { + assert.NilError(t, err, "unexpected placement failure") + assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set") + } + }) + } +} + +func TestManagerPlaceApp_Error(t *testing.T) { + // Create the structure for the test + data := ` +partitions: + - name: default + queues: + - name: root + queues: + - name: testparent + submitacl: "*" + queues: + - name: testchild + - name: fixed + submitacl: "other-user " + parent: true +` + err := initQueueStructure([]byte(data)) + assert.NilError(t, err, "setting up the queue config failed") + // basic info without rules, manager should init + man := NewPlacementManager(nil, queueFunc) + if man == nil { + t.Fatal("placement manager create failed") + } + rules := []configs.PlacementRule{ + { + Name: "user", + Create: false, + Parent: &configs.PlacementRule{ + Name: "user", + Create: false, + Parent: &configs.PlacementRule{ + Name: "fixed", + Value: "testparent", + }, + }, + }, + } + user := security.UserGroup{ + User: "testchild", + Groups: []string{}, + } + tags := make(map[string]string) + err = man.UpdateRules(rules) + assert.NilError(t, err, "failed to update existing manager") + app := newApplication("app1", "default", "", user, tags, nil, "") + err = man.PlaceApplication(app) + queueName := app.GetQueuePath() + if err == nil || queueName != "" { + t.Errorf("failed placed app, queue: '%s', error: %v", queueName, err) + } +} diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index e7f153599..04421ac21 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -55,6 +55,63 @@ const ( maxapplications = "maxapplications" ) +func newBasePartitionNoRootDefault() (*PartitionContext, error) { + conf := configs.PartitionConfig{ + Name: "test", + Queues: []configs.QueueConfig{ + { + Name: "root", + Parent: true, + SubmitACL: "*", + Queues: []configs.QueueConfig{ + { + Name: "custom", + Parent: false, + Queues: nil, + Limits: []configs.Limit{ + { + Limit: "custom queue limit", + Users: []string{ + "testuser", + }, + Groups: []string{ + "testgroup", + }, + MaxResources: map[string]string{ + "memory": "5", + "vcores": "5", + }, + MaxApplications: 8, + }, + }, + }, + }, + Limits: []configs.Limit{ + { + Limit: "root queue limit", + Users: []string{ + "testuser", + }, + Groups: []string{ + "testgroup", + }, + MaxResources: map[string]string{ + "memory": "10", + "vcores": "10", + }, + MaxApplications: 10, + }, + }, + }, + }, + PlacementRules: nil, + Limits: nil, + NodeSortPolicy: configs.NodeSortingPolicy{}, + } + + return newPartitionContext(conf, rmID, nil) +} + func newBasePartition() (*PartitionContext, error) { conf := configs.PartitionConfig{ Name: "test",