Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2367] Core: Add support for resource updates #959

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,17 @@ type Allocation struct {
allowPreemptOther bool
originator bool
tags map[string]string
allocatedResource *resources.Resource
resKeyWithoutNode string // the reservation key without node

// Mutable fields which need protection
allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this allocation
scaleUpTriggered bool // whether this aloocation has triggered autoscaling or not
resKeyPerNode map[string]string // reservation key for a given node

allocated bool
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this allocation
scaleUpTriggered bool // whether this aloocation has triggered autoscaling or not
resKeyPerNode map[string]string // reservation key for a given node
allocatedResource *resources.Resource
askEvents *schedEvt.AskEvents
userQuotaCheckFailed bool
headroomCheckFailed bool
Expand Down Expand Up @@ -165,7 +164,7 @@ func (a *Allocation) String() string {
if a == nil {
return "nil allocation"
}
return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", a.allocationKey, a.applicationID, a.allocatedResource, a.IsAllocated())
return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", a.allocationKey, a.applicationID, a.GetAllocatedResource(), a.IsAllocated())
}

// GetAllocationKey returns the allocation key for this allocation.
Expand Down Expand Up @@ -322,9 +321,18 @@ func (a *Allocation) HasRelease() bool {

// GetAllocatedResource returns a reference to the allocated resources for this allocation. This must be treated as read-only.
func (a *Allocation) GetAllocatedResource() *resources.Resource {
a.RLock()
defer a.RUnlock()
return a.allocatedResource
}

// SetAllocatedResource updates the allocated resources for this allocation.
func (a *Allocation) SetAllocatedResource(allocatedResource *resources.Resource) {
a.Lock()
defer a.Unlock()
a.allocatedResource = allocatedResource
}

// MarkPreempted marks the allocation as preempted.
func (a *Allocation) MarkPreempted() {
a.Lock()
Expand Down
49 changes: 25 additions & 24 deletions pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,10 @@ func sortedLog(ask *Allocation) []*AllocationLogEntry {
func TestNewAlloc(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
alloc := newAllocationAsk("ask-1", "app-1", res)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
assert.Equal(t, alloc.GetAllocationKey(), "ask-1")
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly")
assert.Assert(t, !alloc.IsPlaceholder(), "ask should not have been a placeholder")
assert.Equal(t, time.Now().Round(time.Second), alloc.GetCreateTime().Round(time.Second))
Expand All @@ -263,27 +261,32 @@ func TestNewAlloc(t *testing.T) {
expected := "allocationKey ask-1, applicationID app-1, Resource map[first:1], Allocated false"
assert.Equal(t, allocStr, expected, "Strings should have been equal")
assert.Assert(t, !alloc.IsPlaceholderUsed(), fmt.Sprintf("Alloc should not be placeholder replacement by default: got %t, expected %t", alloc.IsPlaceholderUsed(), false))
// check that createTime is properly copied from the ask
tags := make(map[string]string)
tags[siCommon.CreationTime] = strconv.FormatInt(past, 10)
ask.tags = CloneAllocationTags(tags)
ask.createTime = time.Unix(past, 0)
alloc = markAllocated("node-1", ask)
assert.Equal(t, alloc.GetCreateTime(), ask.GetCreateTime(), "createTime was not copied from the ask")
assert.Assert(t, reflect.DeepEqual(ask.tags, ask.GetTagsClone()))
}

func TestSetAllocatedResources(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
res2, err := resources.NewResourceFromConf(map[string]string{"first": "2"})
assert.NilError(t, err, "Resource creation failed")
alloc := newAllocationAsk("ask-1", "app-1", res)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly")

alloc.SetAllocatedResource(res2)
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res2), "Allocated resource not set correctly")
}

func TestNewAllocatedAllocationResult(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
result := newAllocatedAllocationResult("node-1", ask)
alloc := newAllocation("ask-1", "app-1", res)
result := newAllocatedAllocationResult("node-1", alloc)
if result == nil {
t.Fatal("NewAllocatedAllocationResult create failed while it should not")
}
assert.Equal(t, result.ResultType, Allocated, "NewAllocatedAllocationResult should have Allocated result type")
assert.Equal(t, ask, result.Request, "wrong ask")
assert.Equal(t, alloc, result.Request, "wrong allocation")
assert.Equal(t, result.NodeID, "node-1", "wrong node id")
}
Expand Down Expand Up @@ -317,14 +320,12 @@ func TestNewUnreservedAllocationResult(t *testing.T) {
func TestNewReplacedAllocationResult(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := markAllocated("node-1", ask)
result := newReplacedAllocationResult("node-1", ask)
alloc := newAllocationWithKey("ask-1", "app-1", "node-1", res)
result := newReplacedAllocationResult("node-1", alloc)
if result == nil {
t.Fatal("NewReplacedllocationResult create failed while it should not")
}
assert.Equal(t, result.ResultType, Replaced, "NewReplacedAllocationResult should have Allocated result type")
assert.Equal(t, ask, result.Request, "wrong ask")
assert.Equal(t, alloc, result.Request, "wrong allocation")
assert.Equal(t, result.NodeID, "node-1", "wrong node id")
}
Expand Down Expand Up @@ -363,6 +364,7 @@ func TestSIFromNilAlloc(t *testing.T) {
assert.Equal(t, allocSI, nilSI, "Expected nil response from nil allocation")
}

//nolint:staticcheck
func TestSIFromAlloc(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
Expand All @@ -377,11 +379,10 @@ func TestSIFromAlloc(t *testing.T) {
AllowPreemptOther: false,
},
}
ask := newAllocationAsk("ask-1", "app-1", res)
ask.originator = true
ask.allowPreemptSelf = false
ask.allowPreemptOther = true
alloc := markAllocated("node-1", ask)
alloc := newAllocationWithKey("ask-1", "app-1", "node-1", res)
alloc.originator = true
alloc.allowPreemptSelf = false
alloc.allowPreemptOther = true
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
Expand Down
85 changes: 75 additions & 10 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
for _, ask := range sa.requests {
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource, detail)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
}
sa.requests = make(map[string]*Allocation)
sa.sortedRequests = sortedRequests{}
Expand Down Expand Up @@ -591,7 +591,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource, detail)
sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
if priority := ask.GetPriority(); priority >= sa.askMaxPriority {
sa.updateAskMaxPriority()
}
Expand Down Expand Up @@ -665,11 +665,72 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) error {
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))
sa.sortedRequests.insert(ask)
sa.appEvents.SendNewAskEvent(sa.ApplicationID, ask.allocationKey, ask.allocatedResource)
sa.appEvents.SendNewAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource())

return nil
}

// UpdateAllocationResources updates the app, queue, and user tracker with deltas for an allocation.
// If an existing allocation cannot be found or alloc is invalid, an error is returned.
func (sa *Application) UpdateAllocationResources(alloc *Allocation) error {
sa.Lock()
defer sa.Unlock()
if alloc == nil {
return fmt.Errorf("alloc cannot be nil when updating resources for app %s", sa.ApplicationID)
}
if resources.IsZero(alloc.GetAllocatedResource()) {
return fmt.Errorf("cannot update alloc with zero resources on app %s: %v", sa.ApplicationID, alloc)
}
existing := sa.requests[alloc.GetAllocationKey()]
if existing == nil {
return fmt.Errorf("existing alloc not found when updating resources on app %s: %v", sa.ApplicationID, alloc)
}

newResource := alloc.GetAllocatedResource().Clone()
existingResource := existing.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
if resources.IsZero(delta) {
return nil
}
delta.Prune()

if existing.IsAllocated() {
// update allocated resources
sa.allocatedResource = resources.Add(sa.allocatedResource, delta)
sa.allocatedResource.Prune()
sa.queue.IncAllocatedResource(delta)

// update user usage
sa.incUserResourceUsage(delta)

log.Log(log.SchedApplication).Info("updated allocated resources for application",
zap.String("appID", sa.ApplicationID),
zap.String("user", sa.user.User),
zap.String("alloc", existing.GetAllocationKey()),
zap.Bool("placeholder", existing.IsPlaceholder()),
zap.Stringer("existingResources", existingResource),
zap.Stringer("updatedResources", newResource),
zap.Stringer("delta", delta))
} else {
// update pending resources
sa.pending = resources.Add(sa.pending, delta)
sa.pending.Prune()
sa.queue.incPendingResource(delta)
log.Log(log.SchedApplication).Info("updated pending resources for application",
zap.String("appID", sa.ApplicationID),
zap.String("user", sa.user.User),
zap.String("alloc", existing.GetAllocationKey()),
zap.Bool("placeholder", existing.IsPlaceholder()),
zap.Stringer("existingResources", existingResource),
zap.Stringer("updatedResources", newResource),
zap.Stringer("delta", delta))
}

// update the allocation itself
existing.SetAllocatedResource(newResource)
return nil
}

// Add the ask when a node allocation is recovered.
// Safeguarded against a nil but the recovery generates the ask and should never be nil.
func (sa *Application) RecoverAllocationAsk(alloc *Allocation) {
Expand Down Expand Up @@ -1156,7 +1217,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.allocatedResource, ph.allocatedResource)
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
continue
}
// placeholder is the same or larger continue processing and difference is handled when the placeholder
Expand Down Expand Up @@ -1530,7 +1591,7 @@ func (sa *Application) tryNode(node *Node, ask *Allocation) (*AllocationResult,

// everything OK really allocate
if node.TryAddAllocation(ask) {
if err := sa.queue.IncAllocatedResource(ask.GetAllocatedResource(), false); err != nil {
if err := sa.queue.TryIncAllocatedResource(ask.GetAllocatedResource()); err != nil {
log.Log(log.SchedApplication).DPanic("queue update failed unexpectedly",
zap.Error(err))
// revert the node update
Expand Down Expand Up @@ -1696,7 +1757,7 @@ func (sa *Application) addAllocationInternal(allocType AllocationResultType, all
sa.allocatedResource = resources.Add(sa.allocatedResource, alloc.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource)
}
sa.appEvents.SendNewAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource)
sa.appEvents.SendNewAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource())
sa.allocations[alloc.GetAllocationKey()] = alloc
}

Expand Down Expand Up @@ -1853,7 +1914,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
}
}
delete(sa.allocations, allocationKey)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource, releaseType)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource(), releaseType)
return alloc
}

Expand Down Expand Up @@ -1884,7 +1945,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the application's user resource tracker
sa.trackCompletedResource(alloc)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.allocatedResource, si.TerminationType_STOPPED_BY_RM)
sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID, alloc.allocationKey, alloc.GetAllocatedResource(), si.TerminationType_STOPPED_BY_RM)
}

// if an app doesn't have any allocations and the user doesn't have other applications,
Expand Down Expand Up @@ -2151,11 +2212,15 @@ func (sa *Application) GetMaxApps() uint64 {
}

func (sa *Application) getUint64Tag(tag string) uint64 {
uintValue, err := strconv.ParseUint(sa.GetTag(tag), 10, 64)
value := sa.GetTag(tag)
if value == "" {
return 0
}
uintValue, err := strconv.ParseUint(value, 10, 64)
if err != nil {
log.Log(log.SchedApplication).Warn("application tag conversion failure",
zap.String("tag", tag),
zap.String("json string", sa.GetTag(tag)),
zap.String("json string", value),
zap.Error(err))
return 0
}
Expand Down
Loading
Loading