Skip to content

Commit

Permalink
[YUNIKORN-2204] Use ask unique id for allocation (apache#740)
Browse files Browse the repository at this point in the history
Do not generate a new UUID for an allocation when it gets added. Re-use
the ask key which is the same as the K8s pod UID to improve on the
troubleshooting experience.
The new allocationID will be the ask key followed by the current repeat
counter, seperated by a dash '-'

The UUID is renamed to allocationID on the allocation. Scheduler
interface changes will follow. Replaced GetUUID with GetAllocationID in
already modified lines

Closes: apache#740

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
manirajv06 authored and wilfred-s committed Nov 30, 2023
1 parent 84aac67 commit b516ca1
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 208 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {

// add orphan allocation to a node
node := schedulerContext.partitions[partName].nodes.GetNode("node")
alloc := objects.NewAllocation(allocID, "node", newAllocationAsk("key", "appID", resources.NewResource()))
alloc := objects.NewAllocation("node", newAllocationAsk("key", "appID", resources.NewResource()))
node.AddAllocation(alloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, !healthInfo.Healthy, "Scheduler should not be healthy")
Expand All @@ -216,7 +216,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan allocation check on the node should be successful")

// remove the allocation from the node, so we will have an orphan allocation assigned to the app
node.RemoveAllocation(allocID)
node.RemoveAllocation("key-0")
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan allocation check on the node should be successful")
assert.Assert(t, !healthInfo.HealthChecks[10].Succeeded, "The orphan allocation check on the app should not be successful")
Expand Down
37 changes: 26 additions & 11 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Allocation struct {
taskGroupName string // task group this allocation belongs to
placeholder bool // is this a placeholder allocation
nodeID string
uuid string
allocationID string
priority int32
tags map[string]string
allocatedResource *resources.Resource
Expand All @@ -77,7 +77,7 @@ type Allocation struct {
sync.RWMutex
}

func NewAllocation(uuid, nodeID string, ask *AllocationAsk) *Allocation {
func NewAllocation(nodeID string, ask *AllocationAsk) *Allocation {
var createTime time.Time
if ask.GetTag(siCommon.CreationTime) == "" {
createTime = time.Now()
Expand All @@ -92,7 +92,7 @@ func NewAllocation(uuid, nodeID string, ask *AllocationAsk) *Allocation {
bindTime: time.Now(),
nodeID: nodeID,
partitionName: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
uuid: uuid,
allocationID: ask.allocationKey + "-" + strconv.Itoa(ask.completedPendingAsk()),
tags: ask.GetTagsClone(),
priority: ask.GetPriority(),
allocatedResource: ask.GetAllocatedResource().Clone(),
Expand All @@ -103,14 +103,16 @@ func NewAllocation(uuid, nodeID string, ask *AllocationAsk) *Allocation {
}

func newReservedAllocation(nodeID string, ask *AllocationAsk) *Allocation {
alloc := NewAllocation("", nodeID, ask)
alloc := NewAllocation(nodeID, ask)
alloc.allocationID = ""
alloc.SetBindTime(time.Time{})
alloc.SetResult(Reserved)
return alloc
}

func newUnreservedAllocation(nodeID string, ask *AllocationAsk) *Allocation {
alloc := NewAllocation("", nodeID, ask)
alloc := NewAllocation(nodeID, ask)
alloc.allocationID = ""
alloc.SetBindTime(time.Time{})
alloc.SetResult(Unreserved)
return alloc
Expand Down Expand Up @@ -152,7 +154,9 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
createTime: time.Unix(creationTime, 0),
allocLog: make(map[string]*AllocationLogEntry),
}
return NewAllocation(alloc.UUID, alloc.NodeID, ask)
newAlloc := NewAllocation(alloc.NodeID, ask)
newAlloc.allocationID = alloc.UUID
return newAlloc
}

// Convert the Allocation into a SI object. This is a limited set of values that gets copied into the SI.
Expand Down Expand Up @@ -180,11 +184,11 @@ func (a *Allocation) String() string {
}
a.RLock()
defer a.RUnlock()
uuid := a.uuid
allocationID := a.allocationID
if a.result == Reserved || a.result == Unreserved {
uuid = "N/A"
allocationID = "N/A"
}
return fmt.Sprintf("applicationID=%s, uuid=%s, allocationKey=%s, Node=%s, result=%s", a.applicationID, uuid, a.allocationKey, a.nodeID, a.result.String())
return fmt.Sprintf("applicationID=%s, uuid=%s, allocationKey=%s, Node=%s, result=%s", a.applicationID, allocationID, a.allocationKey, a.nodeID, a.result.String())
}

// GetAsk returns the ask associated with this allocation
Expand Down Expand Up @@ -290,9 +294,20 @@ func (a *Allocation) GetInstanceType() string {
return a.instType
}

// GetUUID returns the uuid for this allocation
// GetAllocationID returns the allocationID for this allocation
func (a *Allocation) GetAllocationID() string {
return a.allocationID
}

// GetUUID returns the allocationID for this allocation
func (a *Allocation) GetUUID() string {
return a.uuid
return a.allocationID
}

// SetAllocationID set the allocationID for this allocation
// only for tests
func (a *Allocation) SetAllocationID(allocationID string) {
a.allocationID = allocationID
}

// GetPriority returns the priority of this allocation
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/objects/allocation_ask.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,10 @@ func (aa *AllocationAsk) LessThan(other *AllocationAsk) bool {

return aa.priority < other.priority
}

// completedPendingAsk How many pending asks has been completed or processed so far?
func (aa *AllocationAsk) completedPendingAsk() int {
aa.RLock()
defer aa.RUnlock()
return int(aa.maxAllocations - aa.pendingAskRepeat)
}
11 changes: 6 additions & 5 deletions pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ func TestNewAlloc(t *testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "Resource creation failed")
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := NewAllocation("test-uuid", "node-1", ask)
alloc := NewAllocation("node-1", ask)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
assert.Equal(t, alloc.GetAllocationID(), "ask-1-0")
assert.Equal(t, alloc.GetResult(), Allocated, "New alloc should default to result Allocated")
assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly")
assert.Assert(t, !alloc.IsPlaceholder(), "ask should not have been a placeholder")
Expand All @@ -62,7 +63,7 @@ func TestNewAlloc(t *testing.T) {
alloc.SetInstanceType(instType1)
assert.Equal(t, alloc.GetInstanceType(), instType1, "Instance type not set as expected")
allocStr := alloc.String()
expected := "applicationID=app-1, uuid=test-uuid, allocationKey=ask-1, Node=node-1, result=Allocated"
expected := "applicationID=app-1, uuid=ask-1-0, allocationKey=ask-1, Node=node-1, result=Allocated"
assert.Equal(t, allocStr, expected, "Strings should have been equal")
assert.Assert(t, !alloc.IsPlaceholderUsed(), fmt.Sprintf("Alloc should not be placeholder replacement by default: got %t, expected %t", alloc.IsPlaceholderUsed(), false))
created := alloc.GetCreateTime()
Expand All @@ -77,7 +78,7 @@ func TestNewAlloc(t *testing.T) {
tags[siCommon.CreationTime] = strconv.FormatInt(past, 10)
ask.tags = CloneAllocationTags(tags)
ask.createTime = time.Unix(past, 0)
alloc = NewAllocation("test-uuid", "node-1", ask)
alloc = NewAllocation("node-1", ask)
assert.Equal(t, alloc.GetCreateTime(), ask.GetCreateTime(), "createTime was not copied from the ask")
}

Expand Down Expand Up @@ -131,13 +132,13 @@ func TestSIFromAlloc(t *testing.T) {
assert.NilError(t, err, "Resource creation failed")
expectedSI := &si.Allocation{
AllocationKey: "ask-1",
UUID: "test-uuid",
UUID: "ask-1-0",
NodeID: "node-1",
ApplicationID: "app-1",
ResourcePerAlloc: res.ToProto(),
}
ask := newAllocationAsk("ask-1", "app-1", res)
alloc := NewAllocation("test-uuid", "node-1", ask)
alloc := NewAllocation("node-1", ask)
if alloc == nil {
t.Fatal("NewAllocation create failed while it should not")
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,6 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
zap.String("ask", ask.GetAllocationKey()),
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))

sa.sortedRequests.insert(ask)
sa.appEvents.sendNewAskEvent(ask)

Expand Down Expand Up @@ -1133,7 +1132,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// got the node run same checks as for reservation (all but fits)
// resource usage should not change anyway between placeholder and real one at this point
if node != nil && node.preReserveConditions(request) {
alloc := NewAllocation(common.GetNewUUID(), node.NodeID, request)
alloc := NewAllocation(node.NodeID, request)
// double link to make it easier to find
// alloc (the real one) releases points to the placeholder in the releases list
alloc.SetRelease(ph)
Expand Down Expand Up @@ -1175,7 +1174,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
return true
}
// allocation worked: on a non placeholder node update result and return
alloc := NewAllocation(common.GetNewUUID(), node.NodeID, reqFit)
alloc := NewAllocation(node.NodeID, reqFit)
// double link to make it easier to find
// alloc (the real one) releases points to the placeholder in the releases list
alloc.SetRelease(phFit)
Expand Down Expand Up @@ -1471,7 +1470,7 @@ func (sa *Application) tryNode(node *Node, ask *AllocationAsk) *Allocation {
}

// everything OK really allocate
alloc := NewAllocation(common.GetNewUUID(), node.NodeID, ask)
alloc := NewAllocation(node.NodeID, ask)
if node.AddAllocation(alloc) {
if err := sa.queue.IncAllocatedResource(alloc.GetAllocatedResource(), false); err != nil {
log.Log(log.SchedApplication).Warn("queue update failed unexpectedly",
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/objects/application_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestSendNewAllocationEvent(t *testing.T) {
appEvents.sendNewAllocationEvent(&Allocation{
applicationID: appID0,
allocationKey: aKey,
uuid: aUUID,
allocationID: aUUID,
})
assert.Equal(t, 1, len(mock.events), "event was not generated")
assert.Equal(t, si.EventRecord_APP, mock.events[0].Type, "event type is not expected")
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
name: "remove allocation cause of node removal",
eventSystemMock: newEventSystemMock(),
terminationType: si.TerminationType_UNKNOWN_TERMINATION_TYPE,
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, uuid: aUUID},
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, allocationID: aUUID},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
Expand All @@ -200,7 +200,7 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
name: "remove allocation cause of resource manager cancel",
eventSystemMock: newEventSystemMock(),
terminationType: si.TerminationType_STOPPED_BY_RM,
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, uuid: aUUID},
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, allocationID: aUUID},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
Expand All @@ -212,7 +212,7 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
name: "remove allocation cause of timeout",
eventSystemMock: newEventSystemMock(),
terminationType: si.TerminationType_TIMEOUT,
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, uuid: aUUID},
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, allocationID: aUUID},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
Expand All @@ -224,7 +224,7 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
name: "remove allocation cause of preemption",
eventSystemMock: newEventSystemMock(),
terminationType: si.TerminationType_PREEMPTED_BY_SCHEDULER,
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, uuid: aUUID},
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, allocationID: aUUID},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
Expand All @@ -236,7 +236,7 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
name: "remove allocation cause of replacement",
eventSystemMock: newEventSystemMock(),
terminationType: si.TerminationType_PLACEHOLDER_REPLACED,
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, uuid: aUUID},
allocation: &Allocation{applicationID: appID0, allocationKey: aKey, allocationID: aUUID},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
Expand Down
Loading

0 comments on commit b516ca1

Please sign in to comment.