Skip to content

Commit

Permalink
[YUNIKORN-2367] Core: Add support for resource updates
Browse files Browse the repository at this point in the history
  • Loading branch information
craigcondit committed Sep 3, 2024
1 parent e18603a commit f559cf8
Show file tree
Hide file tree
Showing 14 changed files with 619 additions and 319 deletions.
28 changes: 18 additions & 10 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,17 @@ type Allocation struct {
allowPreemptOther bool
originator bool
tags map[string]string
allocatedResource *resources.Resource
resKeyWithoutNode string // the reservation key without node

// Mutable fields which need protection
allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this allocation
scaleUpTriggered bool // whether this aloocation has triggered autoscaling or not
resKeyPerNode map[string]string // reservation key for a given node

allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this allocation
scaleUpTriggered bool // whether this aloocation has triggered autoscaling or not
resKeyPerNode map[string]string // reservation key for a given node
allocatedResource *resources.Resource
askEvents *schedEvt.AskEvents
userQuotaCheckFailed bool
headroomCheckFailed bool
Expand Down Expand Up @@ -165,7 +164,7 @@ func (a *Allocation) String() string {
if a == nil {
return "nil allocation"
}
return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", a.allocationKey, a.applicationID, a.allocatedResource, a.IsAllocated())
return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", a.allocationKey, a.applicationID, a.GetAllocatedResource(), a.IsAllocated())
}

// GetAllocationKey returns the allocation key for this allocation.
Expand Down Expand Up @@ -322,9 +321,18 @@ func (a *Allocation) HasRelease() bool {

// GetAllocatedResource returns a reference to the allocated resources for this allocation. This must be treated as read-only.
func (a *Allocation) GetAllocatedResource() *resources.Resource {
a.RLock()
defer a.RUnlock()
return a.allocatedResource
}

// SetAllocatedResource updates the allocated resources for this allocation.
func (a *Allocation) SetAllocatedResource(allocatedResource *resources.Resource) {
a.Lock()
defer a.Unlock()
a.allocatedResource = allocatedResource
}

// MarkPreempted marks the allocation as preempted.
func (a *Allocation) MarkPreempted() {
a.Lock()
Expand Down
49 changes: 25 additions & 24 deletions pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,10 @@ func sortedLog(ask *Allocation) []*AllocationLogEntry {
func TestNewAlloc(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
alloc := newAllocationAsk("ask-1", "app-1", res)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
assert.Equal(t, alloc.GetAllocationKey(), "ask-1")
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly")
assert.Assert(t, !alloc.IsPlaceholder(), "ask should not have been a placeholder")
assert.Equal(t, time.Now().Round(time.Second), alloc.GetCreateTime().Round(time.Second))
Expand All @@ -263,27 +261,32 @@ func TestNewAlloc(t *testing.T) {
expected := "allocationKey ask-1, applicationID app-1, Resource map[first:1], Allocated false"
assert.Equal(t, allocStr, expected, "Strings should have been equal")
assert.Assert(t, !alloc.IsPlaceholderUsed(), fmt.Sprintf("Alloc should not be placeholder replacement by default: got %t, expected %t", alloc.IsPlaceholderUsed(), false))
// check that createTime is properly copied from the ask
tags := make(map[string]string)
tags[siCommon.CreationTime] = strconv.FormatInt(past, 10)
ask.tags = CloneAllocationTags(tags)
ask.createTime = time.Unix(past, 0)
alloc = markAllocated("node-1", ask)
assert.Equal(t, alloc.GetCreateTime(), ask.GetCreateTime(), "createTime was not copied from the ask")
assert.Assert(t, reflect.DeepEqual(ask.tags, ask.GetTagsClone()))
}

func TestSetAllocatedResources(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
res2, err := resources.NewResourceFromConf(map[string]string{"first": "2"})
assert.NilError(t, err, "Resource creation failed")
alloc := newAllocationAsk("ask-1", "app-1", res)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly")

alloc.SetAllocatedResource(res2)
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res2), "Allocated resource not set correctly")
}

func TestNewAllocatedAllocationResult(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
result := newAllocatedAllocationResult("node-1", ask)
alloc := newAllocation("ask-1", "app-1", res)
result := newAllocatedAllocationResult("node-1", alloc)
if result == nil {
t.Fatal("NewAllocatedAllocationResult create failed while it should not")
}
assert.Equal(t, result.ResultType, Allocated, "NewAllocatedAllocationResult should have Allocated result type")
assert.Equal(t, ask, result.Request, "wrong ask")
assert.Equal(t, alloc, result.Request, "wrong allocation")
assert.Equal(t, result.NodeID, "node-1", "wrong node id")
}
Expand Down Expand Up @@ -317,14 +320,12 @@ func TestNewUnreservedAllocationResult(t *testing.T) {
func TestNewReplacedAllocationResult(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
result := newReplacedAllocationResult("node-1", ask)
alloc := newAllocationWithKey("ask-1", "app-1", "node-1", res)
result := newReplacedAllocationResult("node-1", alloc)
if result == nil {
t.Fatal("NewReplacedllocationResult create failed while it should not")
}
assert.Equal(t, result.ResultType, Replaced, "NewReplacedAllocationResult should have Allocated result type")
assert.Equal(t, ask, result.Request, "wrong ask")
assert.Equal(t, alloc, result.Request, "wrong allocation")
assert.Equal(t, result.NodeID, "node-1", "wrong node id")
}
Expand Down Expand Up @@ -363,6 +364,7 @@ func TestSIFromNilAlloc(t *testing.T) {
assert.Equal(t, allocSI, nilSI, "Expected nil response from nil allocation")
}

//nolint:staticcheck
func TestSIFromAlloc(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
Expand All @@ -377,11 +379,10 @@ func TestSIFromAlloc(t *testing.T) {
AllowPreemptOther: false,
},
}
ask := newAllocationAsk("ask-1", "app-1", res)
ask.originator = true
ask.allowPreemptSelf = false
ask.allowPreemptOther = true
alloc := markAllocated("node-1", ask)
alloc := newAllocationWithKey("ask-1", "app-1", "node-1", res)
alloc.originator = true
alloc.allowPreemptSelf = false
alloc.allowPreemptOther = true
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
Expand Down
77 changes: 69 additions & 8 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
for _, ask := range sa.requests {
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource, detail)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
}
sa.requests = make(map[string]*Allocation)
sa.sortedRequests = sortedRequests{}
Expand Down Expand Up @@ -591,7 +591,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource, detail)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
if priority := ask.GetPriority(); priority >= sa.askMaxPriority {
sa.updateAskMaxPriority()
}
Expand Down Expand Up @@ -665,11 +665,72 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) error {
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))
sa.sortedRequests.insert(ask)
sa.appEvents.SendNewAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource)
sa.appEvents.SendNewAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource())

return nil
}

// UpdateAllocationResources updates the app, queue, and user tracker with deltas for an allocation.
// If an existing allocation cannot be found or alloc is invalid, an error is returned.
func (sa *Application) UpdateAllocationResources(alloc *Allocation) error {
sa.Lock()
defer sa.Unlock()
if alloc == nil {
return fmt.Errorf("alloc cannot be nil when updating resources for app %s", sa.ApplicationID)
}
if resources.IsZero(alloc.GetAllocatedResource()) {
return fmt.Errorf("cannot update alloc with zero resources on app %s: %v", sa.ApplicationID, alloc)
}
existing := sa.requests[alloc.GetAllocationKey()]
if existing == nil {
return fmt.Errorf("existing alloc not found when updating resources on app %s: %v", sa.ApplicationID, alloc)
}

newResource := alloc.GetAllocatedResource().Clone()
existingResource := existing.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
if resources.IsZero(delta) {
return nil
}
delta.Prune()

if existing.IsAllocated() {
// update allocated resources
sa.allocatedResource = resources.Add(sa.allocatedResource, delta)
sa.allocatedResource.Prune()
sa.queue.IncAllocatedResource(delta)

// update user usage
sa.incUserResourceUsage(delta)

log.Log(log.SchedApplication).Info("updated allocated resources for application",
zap.String("appID", sa.ApplicationID),
zap.String("user", sa.user.User),
zap.String("alloc", existing.GetAllocationKey()),
zap.Bool("placeholder", existing.IsPlaceholder()),
zap.Stringer("existingResources", existingResource),
zap.Stringer("updatedResources", newResource),
zap.Stringer("delta", delta))
} else {
// update pending resources
sa.pending = resources.Add(sa.pending, delta)
sa.pending.Prune()
sa.queue.incPendingResource(delta)
log.Log(log.SchedApplication).Info("updated pending resources for application",
zap.String("appID", sa.ApplicationID),
zap.String("user", sa.user.User),
zap.String("alloc", existing.GetAllocationKey()),
zap.Bool("placeholder", existing.IsPlaceholder()),
zap.Stringer("existingResources", existingResource),
zap.Stringer("updatedResources", newResource),
zap.Stringer("delta", delta))
}

// update the allocation itself
existing.SetAllocatedResource(newResource)
return nil
}

// Add the ask when a node allocation is recovered.
// Safeguarded against a nil but the recovery generates the ask and should never be nil.
func (sa *Application) RecoverAllocationAsk(alloc *Allocation) {
Expand Down Expand Up @@ -1156,7 +1217,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.allocatedResource, ph.allocatedResource)
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
continue
}
// placeholder is the same or larger continue processing and difference is handled when the placeholder
Expand Down Expand Up @@ -1530,7 +1591,7 @@ func (sa *Application) tryNode(node *Node, ask *Allocation) (*AllocationResult,

// everything OK really allocate
if node.TryAddAllocation(ask) {
if err := sa.queue.IncAllocatedResource(ask.GetAllocatedResource(), false); err != nil {
if err := sa.queue.TryIncAllocatedResource(ask.GetAllocatedResource()); err != nil {
log.Log(log.SchedApplication).DPanic("queue update failed unexpectedly",
zap.Error(err))
// revert the node update
Expand Down Expand Up @@ -1696,7 +1757,7 @@ func (sa *Application) addAllocationInternal(allocType AllocationResultType, all
sa.allocatedResource = resources.Add(sa.allocatedResource, alloc.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource)
}
sa.appEvents.SendNewAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource)
sa.appEvents.SendNewAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource())
sa.allocations[alloc.GetAllocationKey()] = alloc
}

Expand Down Expand Up @@ -1853,7 +1914,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
}
}
delete(sa.allocations, allocationKey)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource, releaseType)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource(), releaseType)
return alloc
}

Expand Down Expand Up @@ -1884,7 +1945,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the application's user resource tracker
sa.trackCompletedResource(alloc)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource, si.TerminationType_STOPPED_BY_RM)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource(), si.TerminationType_STOPPED_BY_RM)
}

// if an app doesn't have any allocations and the user doesn't have other applications,
Expand Down
Loading

0 comments on commit f559cf8

Please sign in to comment.