Skip to content

Commit

Permalink
[YUNIKORN-2703] Core: Fallback to default queue if no placement rules…
Browse files Browse the repository at this point in the history
… match
  • Loading branch information
craigcondit committed Jul 11, 2024
1 parent 4834b19 commit 5454d7d
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 10 deletions.
15 changes: 8 additions & 7 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ package common
const (
Empty = ""

Wildcard = "*"
Separator = ","
Space = " "
AnonymousUser = "nobody"
AnonymousGroup = "nogroup"
RecoveryQueue = "@recovery@"
RecoveryQueueFull = "root." + RecoveryQueue
Wildcard = "*"
Separator = ","
Space = " "
AnonymousUser = "nobody"
AnonymousGroup = "nogroup"
RecoveryQueue = "@recovery@"
RecoveryQueueFull = "root." + RecoveryQueue
DefaultPlacementQueue = "root.default"
)
12 changes: 9 additions & 3 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func TestAddApp(t *testing.T) {
}

func TestAddAppForced(t *testing.T) {
partition, err := newBasePartition()
partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")

// add a new app to an invalid queue
Expand Down Expand Up @@ -1465,9 +1465,9 @@ func TestUpdateQueues(t *testing.T) {
}

func TestGetApplication(t *testing.T) {
partition, err := newBasePartition()
partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")
app := newApplication(appID1, "default", defQueue)
app := newApplication(appID1, "default", "root.custom")
err = partition.AddApplication(app)
assert.NilError(t, err, "no error expected while adding the application")
assert.Equal(t, partition.GetApplication(appID1), app, "partition failed to add app incorrect app returned")
Expand All @@ -1479,6 +1479,12 @@ func TestGetApplication(t *testing.T) {
if partition.GetApplication(appID2) != nil {
t.Fatal("partition added app incorrectly should have failed")
}

partition, err = newBasePartition()
assert.NilError(t, err, "partition create failed")
err = partition.AddApplication(app2)
assert.NilError(t, err, "no error expected while adding the application")
assert.Equal(t, partition.GetApplication(appID2), app2, "partition failed to add app incorrect app returned")
}

func TestGetQueue(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
Expand Down Expand Up @@ -115,7 +116,9 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
var queueName string
var aclCheck bool
var err error
var remainingRules = len(m.rules)
for _, checkRule := range m.rules {
remainingRules--
log.Log(log.Config).Debug("Executing rule for placing application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
Expand All @@ -127,6 +130,18 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
app.SetQueuePath("")
return err
}
// if no queue found even after the last rule, try to place in the default queue
if remainingRules == 0 && queueName == "" {
log.Log(log.Config).Info("No rule matched, placing application in default queue",
zap.String("application", app.ApplicationID),
zap.String("defaultQueue", common.DefaultPlacementQueue))
// get the queue object
queue := m.queueFn(common.DefaultPlacementQueue)
if queue != nil {
// default queue exist
queueName = common.DefaultPlacementQueue
}
}
// queueName returned make sure ACL allows access and create the queueName if not exist
if queueName != "" {
// get the queue object
Expand Down
268 changes: 268 additions & 0 deletions pkg/scheduler/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

"gotest.tools/v3/assert"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)

// basic test to check if no rules leave the manager unusable
Expand Down Expand Up @@ -256,3 +258,269 @@ partitions:
t.Errorf("parent queue: app should not have been placed, queue: '%s', error: %v", queueName, err)
}
}

func TestForcePlaceApp(t *testing.T) {
const (
provided = "provided"
providedQ = "root.provided"
defQ = "root.default"
customDefaultQ = "root.custom"
)

// Create the structure for the test
// specifically no acl to allow on root
// root.default - undefined
data := `
partitions:
- name: default
queues:
- name: root
submitacl: "any-user"
queues:
- name: provided
submitacl: "*"
- name: acldeny
submitacl: " "
- name: parent
parent: true
submitacl: "*"
`
err := initQueueStructure([]byte(data))
assert.NilError(t, err, "setting up the queue config failed")
// update the manager
rules := []configs.PlacementRule{
{Name: "provided",
Create: false},
{Name: "tag",
Value: "namespace",
Create: true},
}
man := NewPlacementManager(rules, queueFunc)
if man == nil {
t.Fatal("placement manager create failed")
}

tags := make(map[string]string)
user := security.UserGroup{
User: "any-user",
Groups: []string{},
}
deny := security.UserGroup{
User: "deny-user",
Groups: []string{},
}
var tests = []struct {
name string
queue string
placed string
tags map[string]string
user security.UserGroup
}{
{"empty", "", "", tags, user},
{"provided unqualified", provided, providedQ, tags, user},
{"provided qualified", providedQ, providedQ, tags, user},
{"provided not exist", "unknown", "", tags, user},
{"provided parent", "root.parent", "", tags, user},
{"acl deny", "root.acldeny", "", tags, deny},
{"create", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
{"deny create", "unknown", "", map[string]string{"namespace": "namespace"}, deny},
{"forced exist", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced and create", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
{"forced and deny create", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny},
{"forced parent", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced acl deny", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
{"forced not exist", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced not exist acl deny", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
err = man.PlaceApplication(app)
if tt.placed == "" {
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
} else {
assert.NilError(t, err, "unexpected placement failure")
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
}
})
}

// Update Queue structure
// root.default - defined
data = `
partitions:
- name: default
queues:
- name: root
submitacl: "any-user"
queues:
- name: default
submitacl: "*"
- name: provided
submitacl: "*"
- name: acldeny
submitacl: " "
- name: parent
parent: true
submitacl: "*"
`
err = initQueueStructure([]byte(data))
assert.NilError(t, err, "setting up the queue config failed")

tests = []struct {
name string
queue string
placed string
tags map[string]string
user security.UserGroup
}{
{"empty | defaulQ defined", "", defQ, tags, user},
{"provided unqualified | defaulQ defined", provided, providedQ, tags, user},
{"provided qualified | defaulQ defined", defQ, defQ, tags, user},
{"provided not exist | defaulQ defined", "unknown", defQ, tags, user},
{"provided parent | defaulQ defined", "root.parent", defQ, tags, user},
{"acl deny | defaulQ defined", "root.acldeny", defQ, tags, deny},
{"create | defaulQ defined", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
{"deny create | defaulQ defined", "unknown", defQ, map[string]string{"namespace": "namespace"}, deny},
{"forced exist | defaulQ defined", defQ, defQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced and create | defaulQ defined", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
{"forced and deny create | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny},
{"forced parent | defaulQ defined", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced acl deny | defaulQ defined", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
{"forced not exist | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced not exist acl deny | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
err = man.PlaceApplication(app)
if tt.placed == "" {
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
} else {
assert.NilError(t, err, "unexpected placement failure")
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
}
})
}

// initialize queues with no root.default
// Add fixed placement rule to define custom default queue
data = `
partitions:
- name: default
queues:
- name: root
submitacl: "any-user"
queues:
- name: custom
submitacl: "*"
- name: provided
submitacl: "*"
- name: acldeny
submitacl: " "
- name: parent
parent: true
submitacl: "*"
`
err = initQueueStructure([]byte(data))
assert.NilError(t, err, "setting up the queue config failed")

// update the manager
rules = []configs.PlacementRule{
{Name: "provided",
Create: false},
{Name: "tag",
Value: "namespace",
Create: true},
{Name: "fixed",
Value: "root.custom",
Create: true},
}
man1 := NewPlacementManager(rules, queueFunc)
if man1 == nil {
t.Fatal("placement manager create failed")
}

tests = []struct {
name string
queue string
placed string
tags map[string]string
user security.UserGroup
}{
{"empty | custom defaulQ", "", customDefaultQ, tags, user},
{"provided unqualified | custom defaulQ", provided, providedQ, tags, user},
{"provided qualified | custom defaulQ", providedQ, providedQ, tags, user},
{"provided not exist | custom defaulQ", "unknown", customDefaultQ, tags, user},
{"provided parent | custom defaulQ", "root.parent", customDefaultQ, tags, user},
{"acl deny | custom defaulQ", "root.acldeny", customDefaultQ, tags, deny},
{"create | custom defaulQ", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
{"deny create | custom defaulQ", "unknown", customDefaultQ, map[string]string{"namespace": "namespace"}, deny},
{"forced exist | custom defaulQ", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced and create | custom defaulQ", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
err = man1.PlaceApplication(app)
if tt.placed == "" {
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
} else {
assert.NilError(t, err, "unexpected placement failure")
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
}
})
}
}

func TestManagerPlaceApp_Error(t *testing.T) {
// Create the structure for the test
data := `
partitions:
- name: default
queues:
- name: root
queues:
- name: testparent
submitacl: "*"
queues:
- name: testchild
- name: fixed
submitacl: "other-user "
parent: true
`
err := initQueueStructure([]byte(data))
assert.NilError(t, err, "setting up the queue config failed")
// basic info without rules, manager should init
man := NewPlacementManager(nil, queueFunc)
if man == nil {
t.Fatal("placement manager create failed")
}
rules := []configs.PlacementRule{
{
Name: "user",
Create: false,
Parent: &configs.PlacementRule{
Name: "user",
Create: false,
Parent: &configs.PlacementRule{
Name: "fixed",
Value: "testparent",
},
},
},
}
user := security.UserGroup{
User: "testchild",
Groups: []string{},
}
tags := make(map[string]string)
err = man.UpdateRules(rules)
assert.NilError(t, err, "failed to update existing manager")
app := newApplication("app1", "default", "", user, tags, nil, "")
err = man.PlaceApplication(app)
queueName := app.GetQueuePath()
if err == nil || queueName != "" {
t.Errorf("failed placed app, queue: '%s', error: %v", queueName, err)
}
}
Loading

0 comments on commit 5454d7d

Please sign in to comment.