Skip to content

Commit

Permalink
Scheduler: log more per-queue info (#3669)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith authored Jun 13, 2024
1 parent 8b38477 commit d5f5002
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 19 deletions.
33 changes: 31 additions & 2 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"reflect"
"time"

Expand Down Expand Up @@ -275,6 +276,9 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor *
if err != nil {
return nil, nil, err
}

ctx.Infof("Evicting for pool %s (most may get re-scheduled this cycle so they won't necessarily be preempted) %s", sch.schedulingContext.Pool, result.SummaryString())

if err := sch.nodeDb.UpsertManyWithTxn(txn, maps.Values(result.AffectedNodesById)); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -347,7 +351,13 @@ func (sch *PreemptingQueueScheduler) evictGangs(ctx *armadacontext.Context, txn
// No gangs to evict.
return &EvictorResult{}, nil
}
return evictor.Evict(ctx, txn)

result, err := evictor.Evict(ctx, txn)
if err != nil {
ctx.Infof("Evicting remains of partially evicted gangs for pool %s (most may get re-scheduled this cycle so they won't necessarily be preempted) %s", sch.schedulingContext.Pool, result.SummaryString())
}

return result, err
}

// Collect job ids for any gangs that were partially evicted and the ids of nodes those jobs are on.
Expand Down Expand Up @@ -496,7 +506,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
defer txn.Abort()
i := 0
for {
if gctx, err := candidateGangIterator.Peek(); err != nil {
if gctx, _, err := candidateGangIterator.Peek(); err != nil {
return err
} else if gctx == nil {
break
Expand Down Expand Up @@ -694,6 +704,24 @@ type EvictorResult struct {
NodeIdByJobId map[string]string
}

func (er *EvictorResult) SummaryString() string {
type queueStats struct {
evictedJobCount int
evictedResources internaltypes.ResourceList
}
statsPerQueue := map[string]queueStats{}
for _, jctx := range er.EvictedJctxsByJobId {
queue := jctx.Job.Queue()
stats := statsPerQueue[queue]
stats.evictedJobCount++
stats.evictedResources = stats.evictedResources.Add(jctx.Job.EfficientResourceRequirements())
statsPerQueue[queue] = stats
}
return fmt.Sprintf("%v", armadamaps.MapValues(statsPerQueue, func(s queueStats) string {
return fmt.Sprintf("{evictedJobCount=%d, evictedResources={%s}}", s.evictedJobCount, s.evictedResources.String())
}))
}

func NewNodeEvictor(
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
Expand Down Expand Up @@ -845,5 +873,6 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
AffectedNodesById: affectedNodesById,
NodeIdByJobId: nodeIdByJobId,
}

return result, nil
}
92 changes: 76 additions & 16 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,29 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul

nodeIdByJobId := make(map[string]string)
additionalAnnotationsByJobId := make(map[string]map[string]string)
ctx.Info("Looping through candidate gangs...")
ctx.Infof("Looping through candidate gangs for pool %s...", sch.schedulingContext.Pool)

type queueStats struct {
gangCount int
jobCount int
time time.Duration
gangsConsidered int
jobsConsidered int
gangsScheduled int
firstGangConsideredSampleJobId string
firstGangConsideredResult string
firstGangConsideredQueuePosition int
lastGangScheduledSampleJobId string
lastGangScheduledQueuePosition int
lastGangScheduledQueueCost float64
lastGangScheduledResources schedulerobjects.ResourceList
lastGangScheduledQueueResources schedulerobjects.ResourceList
time time.Duration
}

statsPerQueue := map[string]queueStats{}
loopNumber := 0
for {
// Peek() returns the next gang to try to schedule. Call Clear() before calling Peek() again.
// Calling Clear() after (failing to) schedule ensures we get the next gang in order of smallest fair share.
gctx, err := sch.candidateGangIterator.Peek()
gctx, queueCostInclGang, err := sch.candidateGangIterator.Peek()
if err != nil {
sch.schedulingContext.TerminationReason = err.Error()
return nil, err
Expand All @@ -100,9 +110,10 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
}

start := time.Now()
if ok, unschedulableReason, err := sch.gangScheduler.Schedule(ctx, gctx); err != nil {
scheduledOk, unschedulableReason, err := sch.gangScheduler.Schedule(ctx, gctx)
if err != nil {
return nil, err
} else if ok {
} else if scheduledOk {
for _, jctx := range gctx.JobSchedulingContexts {
if pctx := jctx.PodSchedulingContext; pctx.IsSuccessful() {
scheduledJobs = append(scheduledJobs, jctx)
Expand All @@ -122,23 +133,71 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul

duration := time.Now().Sub(start)
stats := statsPerQueue[gctx.Queue]
stats.gangCount++
stats.jobCount += gctx.Cardinality()

stats.gangsConsidered++
stats.jobsConsidered += gctx.Cardinality()
if scheduledOk {
stats.gangsScheduled++
}

if stats.firstGangConsideredSampleJobId == "" {
stats.firstGangConsideredSampleJobId = gctx.JobIds()[0]
stats.firstGangConsideredQueuePosition = loopNumber
if scheduledOk {
stats.firstGangConsideredResult = "scheduled"
} else {
stats.firstGangConsideredResult = unschedulableReason
}
}

if scheduledOk {
stats.lastGangScheduledSampleJobId = gctx.JobIds()[0]
stats.lastGangScheduledQueueCost = queueCostInclGang
stats.lastGangScheduledQueuePosition = loopNumber
queue, queueOK := sch.candidateGangIterator.queueRepository.GetQueue(gctx.Queue)
if queueOK {
stats.lastGangScheduledResources = gctx.TotalResourceRequests.DeepCopy()
stats.lastGangScheduledQueueResources = queue.GetAllocation().DeepCopy()
} else {
stats.lastGangScheduledResources = schedulerobjects.NewResourceListWithDefaultSize()
stats.lastGangScheduledQueueResources = schedulerobjects.NewResourceListWithDefaultSize()
}
}

stats.time += duration
statsPerQueue[gctx.Queue] = stats
if duration.Seconds() > 1 {
ctx.Infof("Slow schedule: queue %s, gang cardinality %d, first job id %s, time %fs", gctx.Queue, gctx.Cardinality(), gctx.JobIds()[0], duration.Seconds())
ctx.Infof("Slow schedule: queue %s, gang cardinality %d, sample job id %s, time %fs", gctx.Queue, gctx.Cardinality(), gctx.JobIds()[0], duration.Seconds())
}

// Clear() to get the next gang in order of smallest fair share.
// Calling clear here ensures the gang scheduled in this iteration is accounted for.
if err := sch.candidateGangIterator.Clear(); err != nil {
return nil, err
}
}
ctx.Infof("Finished looping through candidate gangs: details %v", armadamaps.MapValues(statsPerQueue, func(s queueStats) string {
return fmt.Sprintf("{gangs=%d, jobs=%d, time=%fs}", s.gangCount, s.jobCount, s.time.Seconds())

loopNumber++
}

ctx.Infof("Finished %d loops through candidate gangs for pool %s: details %v", loopNumber, sch.schedulingContext.Pool, armadamaps.MapValues(statsPerQueue, func(s queueStats) string {
return fmt.Sprintf("{gangsConsidered=%d, jobsConsidered=%d, gangsScheduled=%d, "+
"firstGangConsideredSampleJobId=%s, firstGangConsideredResult=%s, firstGangConsideredQueuePosition=%d, "+
"lastGangScheduledSampleJobId=%s, lastGangScheduledQueuePosition=%d, lastGangScheduledQueueCost=%f,"+
"lastGangScheduledResources=%s, lastGangScheduledQueueResources=%s, time=%fs}",
s.gangsConsidered,
s.jobsConsidered,
s.gangsScheduled,
s.firstGangConsideredSampleJobId,
s.firstGangConsideredResult,
s.firstGangConsideredQueuePosition,
s.lastGangScheduledSampleJobId,
s.lastGangScheduledQueuePosition,
s.lastGangScheduledQueueCost,
s.lastGangScheduledResources.CompactString(),
s.lastGangScheduledQueueResources.CompactString(),
s.time.Seconds())
}))

if sch.schedulingContext.TerminationReason == "" {
sch.schedulingContext.TerminationReason = "no remaining candidate jobs"
}
Expand Down Expand Up @@ -342,12 +401,13 @@ func (it *CandidateGangIterator) Clear() error {
return nil
}

func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) {
func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, float64, error) {
if len(it.pq) == 0 {
// No queued jobs left.
return nil, nil
return nil, 0.0, nil
}
return it.pq[0].gctx, nil
first := it.pq[0]
return first.gctx, first.queueCost, nil
}

func (it *CandidateGangIterator) newPQItem(queue string, queueIt *QueuedGangIterator) *QueueCandidateGangIteratorItem {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (l *FairSchedulingAlgo) Schedule(
pool := executorGroup[0].Pool
minimumJobSize := executorGroup[0].MinimumJobSize
ctx.Infof(
"scheduling on executor group %s with capacity %s",
"Scheduling on executor group %s with capacity %s",
executorGroupLabel, fsctx.totalCapacityByPool[pool].CompactString(),
)

Expand Down

0 comments on commit d5f5002

Please sign in to comment.