Skip to content

Commit

Permalink
[YUNIKORN-2808] E2E test Verify_preemption_on_priority_queue test is …
Browse files Browse the repository at this point in the history
…flaky
  • Loading branch information
manirajv06 committed Aug 23, 2024
1 parent 375895b commit b380a29
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions pkg/scheduler/objects/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func (p *Preemptor) initWorkingState() {
p.allocationsByNode = allocationsByNode
p.queueByAlloc = queueByAlloc
p.nodeAvailableMap = nodeAvailableMap
log.Log(log.SchedPreemption).Info("Preemption triggered based on working state", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.Int("allocationsByNode", len(p.allocationsByNode)),
zap.Int("queueByAlloc", len(p.queueByAlloc)),
zap.Int("nodeAvailableMap", len(p.nodeAvailableMap)))
}

// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
Expand All @@ -198,6 +203,8 @@ func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
snapshot.RemoveAllocation(alloc.GetAllocatedResource())
remaining := currentQueue.GetRemainingGuaranteedResource()
if remaining != nil && resources.StrictlyGreaterThanOrEquals(remaining, resources.Zero) {
log.Log(log.SchedPreemption).Info("Preemption Queue Guarantees check has passed", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath))
return true
}
}
Expand Down Expand Up @@ -530,6 +537,10 @@ func (p *Preemptor) tryNodes() (string, []*Allocation, bool) {
}
// identify which victims and in which order should be tried
if idx, victims := p.calculateVictimsByNode(nodeAvailable, allocations); victims != nil {
log.Log(log.SchedPreemption).Info("Node wise Potential victims collected for preemption", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("victims count", len(victims)))
victimsByNode[nodeID] = victims
keys := make([]string, 0)
for _, victim := range victims {
Expand Down Expand Up @@ -569,13 +580,21 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
// no preemption possible
return nil, false
}
log.Log(log.SchedPreemption).Info("Node chosen for preemption", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("victims count", len(victims)))

// look for additional victims in case we have not yet made enough capacity in the queue
extraVictims, ok := p.calculateAdditionalVictims(victims)
if !ok {
// not enough resources were preempted
return nil, false
}
log.Log(log.SchedPreemption).Info("Additional victims chosen for preemption", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("additional victims count", len(extraVictims)))
victims = append(victims, extraVictims...)
if len(victims) == 0 {
return nil, false
Expand Down

0 comments on commit b380a29

Please sign in to comment.