Skip to content

Commit

Permalink
[YUNIKORN-2924] [core] Remove occupiedResource handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 16, 2024
1 parent 44705ae commit 2b99742
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 142 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 h1:CZ4U7y19YSxNJVBNox3DahhuoxDL++naBl/kj+kqVFc=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
3 changes: 0 additions & 3 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,6 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) {
if sr := nodeInfo.SchedulableResource; sr != nil {
partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr)))
}
if or := nodeInfo.OccupiedResource; or != nil {
node.SetOccupiedResource(resources.NewResourceFromProto(or))
}
case si.NodeInfo_DRAIN_NODE:
if node.IsSchedulable() {
// set the state to not schedulable
Expand Down
7 changes: 0 additions & 7 deletions pkg/scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func TestContext_UpdateNode(t *testing.T) {
SchedulableResource: &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 10}}},
}
full := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
half := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
partition := context.GetPartition(pName)
if partition == nil {
t.Fatalf("partition should have been found")
Expand All @@ -121,15 +120,9 @@ func TestContext_UpdateNode(t *testing.T) {
assert.Assert(t, resources.Equals(full, partition.GetTotalPartitionResource()), "partition resource should be updated")
// try to update: fail due to unknown action
n.SchedulableResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}}
n.OccupiedResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}}
context.updateNode(n)
node := partition.GetNode("test-1")
assert.Assert(t, resources.Equals(full, node.GetAvailableResource()), "node available resource should not be updated")
n.Action = si.NodeInfo_UPDATE
context.updateNode(n)
assert.Assert(t, resources.Equals(half, partition.GetTotalPartitionResource()), "partition resource should be updated")
assert.Assert(t, resources.IsZero(node.GetAvailableResource()), "node available should have been updated to zero")
assert.Assert(t, resources.Equals(half, node.GetOccupiedResource()), "node occupied should have been updated")

// other actions
n = &si.NodeInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewNode(proto *si.NodeInfo) *Node {
reservations: make(map[string]*reservation),
totalResource: resources.NewResourceFromProto(proto.SchedulableResource),
allocatedResource: resources.NewResource(),
occupiedResource: resources.NewResourceFromProto(proto.OccupiedResource),
occupiedResource: resources.NewResource(),
allocations: make(map[string]*Allocation),
schedulable: true,
listeners: make([]NodeListener, 0),
Expand Down
25 changes: 9 additions & 16 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func TestNewNode(t *testing.T) {
if node != nil {
t.Error("node not returned correctly: node is nul or incorrect name")
}
proto := newProto(testNode, nil, nil, nil)
proto := newProto(testNode, nil, nil)
node = NewNode(proto)
if node == nil || node.NodeID != testNode {
t.Error("node not returned correctly: node is nul or incorrect name")
}

totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100})
proto = newProto(testNode, totalRes, nil, map[string]string{})
proto = newProto(testNode, totalRes, map[string]string{})
node = NewNode(proto)
if node == nil || node.NodeID != testNode {
t.Fatal("node not returned correctly: node is nul or incorrect name")
Expand All @@ -74,25 +74,20 @@ func TestNewNode(t *testing.T) {
assert.Equal(t, "rack1", node.Rackname)
assert.Equal(t, "partition1", node.Partition)

// test capacity/available/occupied resources
// test capacity/available resources
totalResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100})
occupiedResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, "second": 20})
availableResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 70, "second": 80})
proto = newProto(testNode, totalResources, occupiedResources, map[string]string{})
proto = newProto(testNode, totalResources, map[string]string{})
node = NewNode(proto)
assert.Equal(t, node.NodeID, testNode, "node not returned correctly: node is nul or incorrect name")
if !resources.Equals(node.GetCapacity(), totalResources) {
t.Errorf("node total resources not set correctly: %v expected got %v",
totalResources, node.GetCapacity())
}
if !resources.Equals(node.GetAvailableResource(), availableResources) {
if !resources.Equals(node.GetAvailableResource(), totalResources) {
t.Errorf("node available resources not set correctly: %v expected got %v",
availableResources, node.GetAvailableResource())
}
if !resources.Equals(node.GetOccupiedResource(), occupiedResources) {
t.Errorf("node occupied resources not set correctly: %v expected got %v",
occupiedResources, node.GetOccupiedResource())
}
}

func TestCheckConditions(t *testing.T) {
Expand Down Expand Up @@ -328,7 +323,7 @@ func TestAttributes(t *testing.T) {
testname := fmt.Sprintf("Attributes in the node %d", index)
t.Run(testname, func(t *testing.T) {
nodename := fmt.Sprintf("%s-%d", testNode, index)
node := NewNode(newProto(nodename, nil, nil, tt.inputs))
node := NewNode(newProto(nodename, nil, tt.inputs))
if node == nil || node.NodeID != nodename {
t.Error("node not returned correctly: node is nul or incorrect name")
}
Expand Down Expand Up @@ -363,7 +358,7 @@ func TestAttributes(t *testing.T) {
}

func TestGetInstanceType(t *testing.T) {
proto := newProto(testNode, nil, nil, map[string]string{
proto := newProto(testNode, nil, map[string]string{
common.NodePartition: "partition1",
"label1": "key1",
"label2": "key2",
Expand Down Expand Up @@ -789,8 +784,7 @@ func TestAddRemoveListener(t *testing.T) {
func TestNodeEvents(t *testing.T) {
mockEvents := evtMock.NewEventSystem()
total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100})
occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10})
proto := newProto(testNode, total, occupied, map[string]string{
proto := newProto(testNode, total, map[string]string{
"ready": "true",
})
node := NewNode(proto)
Expand Down Expand Up @@ -905,8 +899,7 @@ func TestPreconditions(t *testing.T) {

plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true, map[string]int{}))
total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100})
occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10})
proto := newProto(testNode, total, occupied, map[string]string{
proto := newProto(testNode, total, map[string]string{
"ready": "true",
})
res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/objects/nodesorting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func TestSortPolicyWeighting(t *testing.T) {
nc.SetNodeSortingPolicy(fair)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 16000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
}

proto2 := newProto("test2", totalRes, nil, map[string]string{})
proto2 := newProto("test2", totalRes, map[string]string{})
node2 := NewNode(proto2)
if err := nc.AddNode(node2); err != nil {
t.Fatal("Failed to add node2")
Expand Down Expand Up @@ -232,13 +232,13 @@ func TestSortPolicy(t *testing.T) {
nc.SetNodeSortingPolicy(bp)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 4000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
}

proto2 := newProto("test2", totalRes, nil, map[string]string{})
proto2 := newProto("test2", totalRes, map[string]string{})
node2 := NewNode(proto2)
if err := nc.AddNode(node2); err != nil {
t.Fatal("Failed to add node2")
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestAbsResourceUsage(t *testing.T) {
nc.SetNodeSortingPolicy(fair)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0, "memory": 16000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
Expand Down
11 changes: 1 addition & 10 deletions pkg/scheduler/objects/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newNodeInternal(nodeID string, total, occupied *resources.Resource) *Node {
return sn
}

func newProto(nodeID string, totalResource, occupiedResource *resources.Resource, attributes map[string]string) *si.NodeInfo {
func newProto(nodeID string, totalResource *resources.Resource, attributes map[string]string) *si.NodeInfo {
proto := si.NodeInfo{
NodeID: nodeID,
Attributes: attributes,
Expand All @@ -204,15 +204,6 @@ func newProto(nodeID string, totalResource, occupiedResource *resources.Resource
}
}

if occupiedResource != nil {
proto.OccupiedResource = &si.Resource{
Resources: map[string]*si.Quantity{},
}
for name, value := range occupiedResource.Resources {
quantity := si.Quantity{Value: int64(value)}
proto.OccupiedResource.Resources[name] = &quantity
}
}
return &proto
}

Expand Down
91 changes: 0 additions & 91 deletions pkg/scheduler/tests/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,97 +564,6 @@ partitions:
}
}

func TestUpdateNodeOccupiedResources(t *testing.T) {
// Register RM
configData := `
partitions:
-
name: default
queues:
- name: root
submitacl: "*"
queues:
- name: a
resources:
max:
memory: 150
vcore: 20
`
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()

err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")

// Check queues of cache and scheduler.
partitionInfo := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil, "partition info max resource nil")

// Register a node
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100},
"vcore": {Value: 10},
},
},
Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
})

assert.NilError(t, err, "NodeRequest failed")

// Wait until node is registered
context := ms.scheduler.GetClusterContext()
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)

// verify node capacity
assert.Equal(t, len(partitionInfo.GetNodes()), 1)
node1 := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
schedulingNode1 := ms.scheduler.GetClusterContext().
GetNode("node-1:1234", "[rm:123]default")
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))

// update node capacity
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
{
NodeID: "node-1:1234",
Attributes: map[string]string{},
OccupiedResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 80},
"vcore": {Value: 5},
},
},
Action: si.NodeInfo_UPDATE,
},
},
RmID: "rm:123",
})

assert.NilError(t, err, "NodeRequest failed")

waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 20, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
}

func TestForeignPodResourceUsage(t *testing.T) {
// Register RM
configData := `
Expand Down
7 changes: 1 addition & 6 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,20 +647,15 @@ func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources.
})
}

func newNodeWithResources(nodeID string, max, occupied *resources.Resource) *objects.Node {
func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node {
proto := &si.NodeInfo{
NodeID: nodeID,
Attributes: map[string]string{},
SchedulableResource: max.ToProto(),
OccupiedResource: occupied.ToProto(),
}
return objects.NewNode(proto)
}

func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node {
return newNodeWithResources(nodeID, max, nil)
}

// partition with an expected basic queue hierarchy
// root -> parent -> leaf1
//
Expand Down

0 comments on commit 2b99742

Please sign in to comment.