Skip to content

Commit

Permalink
Evict pods in preempting scheduler benchmarks (#2606)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Jun 26, 2023
1 parent 5bf48cb commit fac9411
Showing 1 changed file with 58 additions and 39 deletions.
97 changes: 58 additions & 39 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,25 +1379,25 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
MaxPriorityFactor int
}{
"1 node 1 queue 320 jobs": {
SchedulingConfig: testfixtures.WithNodeOversubscriptionEvictionProbabilityConfig(
0,
testfixtures.WithNodeEvictionProbabilityConfig(
0.1,
testfixtures.TestSchedulingConfig(),
),
),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 1,
NumJobsPerQueue: 320,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
"1 node 10 queues 320 jobs": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 10,
NumJobsPerQueue: 320,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
"10 nodes 1 queue 3200 jobs": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.1,
testfixtures.TestSchedulingConfig(),
),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(10, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 1,
Expand All @@ -1406,10 +1406,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
MaxPriorityFactor: 1,
},
"10 nodes 10 queues 3200 jobs": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.1,
testfixtures.TestSchedulingConfig(),
),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(10, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 10,
Expand All @@ -1418,29 +1415,41 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
MaxPriorityFactor: 1,
},
"100 nodes 1 queue 32000 jobs": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.1,
testfixtures.TestSchedulingConfig(),
),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(100, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 1,
NumJobsPerQueue: 32000,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
"100 nodes 10 queues 32000 jobs": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(100, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 10,
NumJobsPerQueue: 32000,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
"1000 nodes 1 queue 320000 jobs": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.1,
testfixtures.TestSchedulingConfig(),
),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 1,
NumJobsPerQueue: 320000,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
"1000 nodes 10 queues 320000 jobs": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities),
JobFunc: testfixtures.N1CpuJobs,
NumQueues: 1,
NumJobsPerQueue: 32000,
MinPriorityFactor: 1,
MaxPriorityFactor: 1,
},
}
for name, tc := range tests {
b.Run(name, func(b *testing.B) {
Expand All @@ -1454,16 +1463,15 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {

nodeDb, err := CreateNodeDb(tc.Nodes)
require.NoError(b, err)
repo := NewInMemoryJobRepository(testfixtures.TestPriorityClasses)
allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string])
jobRepo := NewInMemoryJobRepository(testfixtures.TestPriorityClasses)

jobs := make([]interfaces.LegacySchedulerJob, 0)
for _, queueJobs := range jobsByQueue {
for _, job := range queueJobs {
jobs = append(jobs, job)
}
}
repo.EnqueueMany(jobs)
jobRepo.EnqueueMany(jobs)

sctx := schedulercontext.NewSchedulingContext(
"executor",
Expand All @@ -1474,7 +1482,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
nodeDb.TotalResources(),
)
for queue, priorityFactor := range priorityFactorByQueue {
err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClass[queue])
err := sctx.AddQueueSchedulingContext(queue, priorityFactor, make(schedulerobjects.QuantityByTAndResourceType[string]))
require.NoError(b, err)
}
constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig(
Expand All @@ -1488,7 +1496,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
tc.SchedulingConfig.Preemption.NodeEvictionProbability,
tc.SchedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
repo,
jobRepo,
nodeDb,
nil,
nil,
Expand All @@ -1498,19 +1506,30 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
require.NoError(b, err)
require.Equal(b, 0, len(result.PreemptedJobs))

// Create a new job repo without the scheduled jobs.
scheduledJobsById := make(map[string]interfaces.LegacySchedulerJob)
scheduledJobs := make(map[string]bool)
for _, job := range result.ScheduledJobs {
scheduledJobsById[job.GetId()] = job
scheduledJobs[job.GetId()] = true
}
unscheduledJobs := make([]interfaces.LegacySchedulerJob, 0)
for _, job := range jobs {
if _, ok := scheduledJobsById[job.GetId()]; !ok {
unscheduledJobs = append(unscheduledJobs, job)
}
for queue, jobs := range jobRepo.jobsByQueue {
jobRepo.jobsByQueue[queue] = armadaslices.Filter(jobs, func(job interfaces.LegacySchedulerJob) bool { return scheduledJobs[job.GetId()] })
}
repo = NewInMemoryJobRepository(testfixtures.TestPriorityClasses)
repo.EnqueueMany(unscheduledJobs)

nodesById := make(map[string]*schedulerobjects.Node)
for _, node := range tc.Nodes {
nodesById[node.Id] = node
}
for _, job := range result.ScheduledJobs {
nodeId := result.NodeIdByJobId[job.GetId()]
node := nodesById[nodeId]
podRequirements := PodRequirementFromLegacySchedulerJob(job, tc.SchedulingConfig.Preemption.PriorityClasses)
node, err = nodedb.BindPodToNode(podRequirements, node)
require.NoError(b, err)
nodesById[nodeId] = node
}
nodeDb, err = CreateNodeDb(maps.Values(nodesById))
require.NoError(b, err)

allocatedByQueueAndPriorityClass := sctx.AllocatedByQueueAndPriority()

b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand All @@ -1531,7 +1550,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
tc.SchedulingConfig.Preemption.NodeEvictionProbability,
tc.SchedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
repo,
jobRepo,
nodeDb,
nil,
nil,
Expand Down

0 comments on commit fac9411

Please sign in to comment.