Skip to content

Commit

Permalink
Add assignedNodeId field to JobSchedulingContext (#3678)
Browse files Browse the repository at this point in the history
* Add assignedNodeId field to JobSchedulingContext

This makes it far faster to look up the currently assigned node for a given evicted job
 - Currently it uses a map look up which is comparatively slow

Signed-off-by: JamesMurkin <[email protected]>

* Add test for Get/Set AssignedNodeId

Signed-off-by: JamesMurkin <[email protected]>

* Fix check for assigned node

Signed-off-by: JamesMurkin <[email protected]>

* Fix unit test

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jun 20, 2024
1 parent 69ab1f5 commit 98c4dec
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 24 deletions.
24 changes: 15 additions & 9 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
Expand Down Expand Up @@ -623,6 +624,9 @@ type JobSchedulingContext struct {
// GangInfo holds all the information that is necessary to schedule a gang,
// such as the lower and upper bounds on its size.
GangInfo
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
}

func (jctx *JobSchedulingContext) String() string {
Expand Down Expand Up @@ -663,6 +667,17 @@ func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
}
}

func (jctx *JobSchedulingContext) GetAssignedNodeId() string {
return jctx.AssignedNodeId
}

func (jctx *JobSchedulingContext) SetAssignedNodeId(assignedNodeId string) {
if assignedNodeId != "" {
jctx.AssignedNodeId = assignedNodeId
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNodeId)
}
}

func (jctx *JobSchedulingContext) AddNodeSelector(key, value string) {
if jctx.AdditionalNodeSelectors == nil {
jctx.AdditionalNodeSelectors = map[string]string{key: value}
Expand All @@ -671,15 +686,6 @@ func (jctx *JobSchedulingContext) AddNodeSelector(key, value string) {
}
}

func (jctx *JobSchedulingContext) GetNodeSelector(key string) (string, bool) {
if value, ok := jctx.AdditionalNodeSelectors[key]; ok {
return value, true
} else if value, ok := jctx.PodRequirements.NodeSelector[key]; ok {
return value, true
}
return "", false
}

type GangInfo struct {
Id string
Cardinality int
Expand Down
18 changes: 18 additions & 0 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
Expand Down Expand Up @@ -96,3 +97,20 @@ func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSched
GangInfo: EmptyGangInfo(job),
}
}

func TestJobSchedulingContext_SetAssignedNodeId(t *testing.T) {
jctx := &JobSchedulingContext{}

assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

// Will not add a node selector if input is empty
jctx.SetAssignedNodeId("")
assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

jctx.SetAssignedNodeId("node1")
assert.Equal(t, "node1", jctx.GetAssignedNodeId())
assert.Len(t, jctx.AdditionalNodeSelectors, 1)
assert.Equal(t, map[string]string{configuration.NodeIdLabel: "node1"}, jctx.AdditionalNodeSelectors)
}
8 changes: 4 additions & 4 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
}()

// If the nodeIdLabel selector is set, consider only that node.
if nodeId, ok := jctx.GetNodeSelector(configuration.NodeIdLabel); ok {
if nodeId := jctx.GetAssignedNodeId(); nodeId != "" {
if it, err := txn.Get("nodes", "id", nodeId); err != nil {
return nil, errors.WithStack(err)
} else {
Expand Down Expand Up @@ -821,9 +821,9 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
for obj := it.Next(); obj != nil && selectedNode == nil; obj = it.Next() {
evictedJobSchedulingContext := obj.(*EvictedJobSchedulingContext)
evictedJctx := evictedJobSchedulingContext.JobSchedulingContext
nodeId, ok := evictedJctx.GetNodeSelector(configuration.NodeIdLabel)
if !ok {
return nil, errors.Errorf("evicted job %s does not have a nodeIdLabel", evictedJctx.JobId)
nodeId := evictedJctx.GetAssignedNodeId()
if nodeId == "" {
return nil, errors.Errorf("evicted job %s does not have an assigned nodeId", evictedJctx.JobId)
}
node, ok := nodesById[nodeId]
if !ok {
Expand Down
12 changes: 4 additions & 8 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,11 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
require.NotEmpty(t, nodeId)
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.WithNodeSelectorJobs(
map[string]string{schedulerconfig.NodeIdLabel: nodeId},
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1),
)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId(nodeId)
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
require.NoError(t, err)
Expand All @@ -96,13 +94,11 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
require.NotEmpty(t, nodeId)
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.WithNodeSelectorJobs(
map[string]string{schedulerconfig.NodeIdLabel: "this node does not exist"},
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1),
)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId("non-existent node")
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
if !assert.NoError(t, err) {
Expand Down
5 changes: 2 additions & 3 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/fairness"
Expand Down Expand Up @@ -436,7 +435,7 @@ func (sch *PreemptingQueueScheduler) evictionAssertions(evictorResult *EvictorRe
if !jctx.IsEvicted {
return errors.New("evicted job %s is not marked as such")
}
if nodeId, ok := jctx.GetNodeSelector(schedulerconfig.NodeIdLabel); ok {
if nodeId := jctx.GetAssignedNodeId(); nodeId != "" {
if _, ok := evictorResult.AffectedNodesById[nodeId]; !ok {
return errors.Errorf("node id %s targeted by job %s is not marked as affected", nodeId, jobId)
}
Expand Down Expand Up @@ -858,7 +857,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
// TODO(albin): We can remove the checkOnlyDynamicRequirements flag in the nodeDb now that we've added the tolerations.
jctx := schedulercontext.JobSchedulingContextFromJob(job)
jctx.IsEvicted = true
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, node.GetId())
jctx.SetAssignedNodeId(node.GetId())
evictedJctxsByJobId[job.Id()] = jctx
jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, node.GetTolerationsForTaints()...)

Expand Down

0 comments on commit 98c4dec

Please sign in to comment.