Skip to content

Commit

Permalink
koord-scheduler: inplace update quota min and max (#1672)
Browse files Browse the repository at this point in the history
Signed-off-by: chuanyun.lcy <[email protected]>
Co-authored-by: chuanyun.lcy <[email protected]>
  • Loading branch information
shaloulcy and chuanyun.lcy authored Sep 21, 2023
1 parent 773eadd commit e70b974
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 12 deletions.
124 changes: 120 additions & 4 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,19 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
newQuotaInfo := NewQuotaInfoFromQuota(quota)
// update the local quotaInfo's crd
if localQuotaInfo, exist := gqm.quotaInfoMap[quotaName]; exist {
// if the quotaMeta doesn't change, only runtime/used/request change causes update,
// if the quotaMeta doesn't change, only runtime/used/request/min/max/sharedWeight change causes update,
// no need to call updateQuotaGroupConfigNoLock.
if !localQuotaInfo.isQuotaMetaChange(newQuotaInfo) {
gqm.updateQuotaInternalNoLock(newQuotaInfo, localQuotaInfo)
return nil
}
localQuotaInfo.updateQuotaInfoFromRemote(newQuotaInfo)
} else {
gqm.quotaInfoMap[quotaName] = newQuotaInfo
}
}

klog.Infof("reset quota tree %v, for quota %v/%v updated", gqm.treeID, quota.Namespace, quota.Name)
gqm.updateQuotaGroupConfigNoLock()

return nil
Expand All @@ -415,15 +418,15 @@ func (gqm *GroupQuotaManager) updateQuotaGroupConfigNoLock() {
func (gqm *GroupQuotaManager) buildSubParGroupTopoNoLock() {
// rebuild QuotaTopoNodeMap
gqm.quotaTopoNodeMap = make(map[string]*QuotaTopoNode)
rootNode := NewQuotaTopoNode(NewQuotaInfo(false, true, extension.RootQuotaName, extension.RootQuotaName))
rootNode := NewQuotaTopoNode(extension.RootQuotaName, NewQuotaInfo(false, true, extension.RootQuotaName, extension.RootQuotaName))
gqm.quotaTopoNodeMap[extension.RootQuotaName] = rootNode

// add node according to the quotaInfoMap
for quotaName, quotaInfo := range gqm.quotaInfoMap {
if quotaName == extension.SystemQuotaName || quotaName == extension.DefaultQuotaName {
continue
}
gqm.quotaTopoNodeMap[quotaName] = NewQuotaTopoNode(quotaInfo)
gqm.quotaTopoNodeMap[quotaName] = NewQuotaTopoNode(quotaName, quotaInfo)
}

// build tree according to the parGroupName
Expand All @@ -434,7 +437,7 @@ func (gqm *GroupQuotaManager) buildSubParGroupTopoNoLock() {
parQuotaTopoNode := gqm.quotaTopoNodeMap[topoNode.quotaInfo.ParentName]
// incase load child before its parent
if parQuotaTopoNode == nil {
parQuotaTopoNode = NewQuotaTopoNode(&QuotaInfo{
parQuotaTopoNode = NewQuotaTopoNode(topoNode.quotaInfo.ParentName, &QuotaInfo{
Name: topoNode.quotaInfo.ParentName,
})
}
Expand Down Expand Up @@ -888,6 +891,9 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaAllocated(deltaAl
curQuotaInfo := curToAllParInfos[i]
oldGuaranteed := curQuotaInfo.CalculateInfo.Guaranteed
curQuotaInfo.addAllocatedQuotaNoLock(deltaAllocated)
if curQuotaInfo.Name == extension.RootQuotaName {
return
}

// update the guarantee.
guaranteed := curQuotaInfo.CalculateInfo.Allocated.DeepCopy()
Expand Down Expand Up @@ -915,3 +921,113 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaAllocated(deltaAl
deltaAllocated = quotav1.Subtract(guaranteed, oldGuaranteed)
}
}

func (gqm *GroupQuotaManager) updateQuotaInternalNoLock(newQuotaInfo, oldQuotaInfo *QuotaInfo) {
// max changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Max, oldQuotaInfo.CalculateInfo.Max) {
gqm.doUpdateOneGroupMaxQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Max)
}

// min changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Min, oldQuotaInfo.CalculateInfo.Min) {
gqm.doUpdateOneGroupMinQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Min)
}

// sharedweight changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.SharedWeight, oldQuotaInfo.CalculateInfo.SharedWeight) {
gqm.doUpdateOneGroupSharedWeightNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.SharedWeight)
}

}

func (gqm *GroupQuotaManager) doUpdateOneGroupMaxQuotaNoLock(quotaName string, newMax v1.ResourceList) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
quotaInfoLen := len(curToAllParInfos)
if quotaInfoLen <= 0 {
return
}

defer gqm.scopedLockForQuotaInfo(curToAllParInfos)()

curQuotaInfo := curToAllParInfos[0]
oldSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
curQuotaInfo.setMaxNoLock(newMax)

if quotaInfoLen > 1 {
parentRuntimeCalculator := gqm.getRuntimeQuotaCalculatorByNameNoLock(curQuotaInfo.ParentName)
if parentRuntimeCalculator == nil {
klog.Errorf("runtimeQuotaCalculator not exist! quotaName:%v parentName:%v", curQuotaInfo.Name, curQuotaInfo.ParentName)
return
}
parentRuntimeCalculator.updateOneGroupMaxQuota(curQuotaInfo)

newSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
deltaRequest := quotav1.Subtract(newSubLimitReq, oldSubLimitReq)
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, curToAllParInfos[1:])
}
}

func (gqm *GroupQuotaManager) doUpdateOneGroupMinQuotaNoLock(quotaName string, newMin v1.ResourceList) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
quotaInfoLen := len(curToAllParInfos)
if quotaInfoLen <= 0 {
return
}

defer gqm.scopedLockForQuotaInfo(curToAllParInfos)()

// Update quota info
curQuotaInfo := curToAllParInfos[0]
curQuotaInfo.setMinNoLock(newMin)
curQuotaInfo.setAutoScaleMinQuotaNoLock(newMin)
gqm.scaleMinQuotaManager.update(curQuotaInfo.ParentName, quotaName, newMin, gqm.scaleMinQuotaEnabled)

// Update request. If the quota not allow to lent resource, the new min will effect the request.
oldSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
realRequest := curQuotaInfo.CalculateInfo.ChildRequest.DeepCopy()
if !curQuotaInfo.AllowLentResource {
realRequest = quotav1.Max(realRequest, curQuotaInfo.CalculateInfo.Min)
}
curQuotaInfo.CalculateInfo.Request = realRequest

if quotaInfoLen > 1 {
// update parent runtime calculator for min changed
parentRuntimeCalculator := gqm.getRuntimeQuotaCalculatorByNameNoLock(curQuotaInfo.ParentName)
if parentRuntimeCalculator == nil {
klog.Errorf("runtimeQuotaCalculator not exist! quotaName:%v parentName:%v", curQuotaInfo.Name, curQuotaInfo.ParentName)
return
}
parentRuntimeCalculator.updateOneGroupMinQuota(curQuotaInfo)

newSubLimitReq := curQuotaInfo.getLimitRequestNoLock()
deltaRequest := quotav1.Subtract(newSubLimitReq, oldSubLimitReq)
gqm.recursiveUpdateGroupTreeWithDeltaRequest(deltaRequest, curToAllParInfos[1:])
}

// update the guarantee.
if utilfeature.DefaultFeatureGate.Enabled(features.ElasticQuotaGuaranteeUsage) {
oldGuaranteed := curQuotaInfo.CalculateInfo.Guaranteed
newGuaranteed := quotav1.Max(curQuotaInfo.CalculateInfo.Allocated, curQuotaInfo.CalculateInfo.Min)
curQuotaInfo.CalculateInfo.Guaranteed = newGuaranteed

if quotaInfoLen > 1 {
parentRuntimeCalculator := gqm.getRuntimeQuotaCalculatorByNameNoLock(curQuotaInfo.ParentName)
if parentRuntimeCalculator == nil {
klog.Errorf("runtimeQuotaCalculator not exist! quotaName:%v parentName:%v", curQuotaInfo.Name, curQuotaInfo.ParentName)
return
}
if parentRuntimeCalculator.needUpdateOneGroupGuaranteed(curQuotaInfo) {
parentRuntimeCalculator.updateOneGroupGuaranteed(curQuotaInfo)
}
deltaAllocated := quotav1.Subtract(newGuaranteed, oldGuaranteed)
gqm.recursiveUpdateGroupTreeWithDeltaAllocated(deltaAllocated, curToAllParInfos[1:])
}
}
}

func (gqm *GroupQuotaManager) doUpdateOneGroupSharedWeightNoLock(quotaName string, newSharedWeight v1.ResourceList) {
quotaInfo := gqm.getQuotaInfoByNameNoLock(quotaName)
quotaInfo.setSharedWeightNoLock(newSharedWeight)

gqm.updateOneGroupSharedWeightNoLock(quotaInfo)
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestGroupQuotaManager_UpdateQuotaInternal(t *testing.T) {

func TestGroupQuotaManager_UpdateQuota(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()
quota := CreateQuota("test1", "test-parent", 64, 100*GigaByte, 50, 80*GigaByte, true, false)
quota := CreateQuota("test1", extension.RootQuotaName, 64, 100*GigaByte, 50, 80*GigaByte, true, false)
gqm.UpdateQuota(quota, false)
assert.Equal(t, len(gqm.quotaInfoMap), 4)
}
Expand Down Expand Up @@ -1755,3 +1755,60 @@ func TestGroupQuotaManager_OnPodUpdateUsedForGuarantee(t *testing.T) {
assert.Equal(t, createResourceList(30, 30), gqm.GetQuotaInfoByName("1").GetAllocated())
assert.Equal(t, createResourceList(30, 30), gqm.GetQuotaInfoByName("1").GetGuaranteed())
}

func TestUpdateQuotaInternalNoLock(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, k8sfeature.DefaultFeatureGate, features.ElasticQuotaGuaranteeUsage, true)()
gqm := NewGroupQuotaManagerForTest()
gqm.scaleMinQuotaEnabled = true
gqm.UpdateClusterTotalResource(createResourceList(50, 50))

// quota1 Max[40, 40] Min[20,20] request[0,0]
// |-- quota2 Max[30, 20] Min[10,10] request[0,0]
// |-- quota3 Max[20, 10] Min[5,5] request[0,0]
qi1 := createQuota("1", extension.RootQuotaName, 40, 40, 20, 20)
qi2 := createQuota("2", "1", 30, 10, 10, 10)
qi3 := createQuota("3", "1", 20, 10, 5, 5)
gqm.UpdateQuota(qi1, false)
gqm.UpdateQuota(qi2, false)
gqm.UpdateQuota(qi3, false)

// add pod1
pod1 := schetesting.MakePod().Name("1").Obj()
pod1.Spec.Containers = []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: createResourceList(5, 5),
},
},
}
pod1.Spec.NodeName = "node1"
gqm.OnPodAdd("2", pod1)
assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetChildRequest())
assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("2").GetRequest())
assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetUsed())
assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetAllocated())
assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("2").GetGuaranteed())

// check parent info
// the parent should guarantee children min
assert.Equal(t, createResourceList(15, 15), gqm.GetQuotaInfoByName("1").GetAllocated())
assert.Equal(t, createResourceList(20, 20), gqm.GetQuotaInfoByName("1").GetGuaranteed())

// update quota2 min
// quota1 Max[40, 40] Min[20,20] request[0,0]
// |-- quota2 Max[30, 20] Min[20,20] request[0,0]
// |-- quota3 Max[20, 10] Min[5,5] request[0,0]
qi2.Spec.Min = createResourceList(20, 20)
gqm.UpdateQuota(qi2, false)

assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetChildRequest())
assert.Equal(t, createResourceList(20, 20), gqm.GetQuotaInfoByName("2").GetRequest())
assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetUsed())
assert.Equal(t, createResourceList(5, 5), gqm.GetQuotaInfoByName("2").GetAllocated())
assert.Equal(t, createResourceList(20, 20), gqm.GetQuotaInfoByName("2").GetGuaranteed())

// check parent info
// the parent should guarantee children min
assert.Equal(t, createResourceList(25, 25), gqm.GetQuotaInfoByName("1").GetAllocated())
assert.Equal(t, createResourceList(25, 25), gqm.GetQuotaInfoByName("1").GetGuaranteed())
}
17 changes: 11 additions & 6 deletions pkg/scheduler/plugins/elasticquota/core/quota_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ func (qi *QuotaInfo) getLimitRequestNoLock() v1.ResourceList {
return limitRequest
}

func (qi *QuotaInfo) setMaxNoLock(max v1.ResourceList) {
qi.CalculateInfo.Max = max.DeepCopy()
}

func (qi *QuotaInfo) setMinNoLock(min v1.ResourceList) {
qi.CalculateInfo.Min = min.DeepCopy()
}

func (qi *QuotaInfo) addRequestNonNegativeNoLock(delta v1.ResourceList) {
qi.CalculateInfo.Request = quotav1.Add(qi.CalculateInfo.Request, delta)
for _, resName := range quotav1.IsNegative(qi.CalculateInfo.Request) {
Expand Down Expand Up @@ -325,10 +333,7 @@ func (qi *QuotaInfo) isQuotaMetaChange(quotaInfo *QuotaInfo) bool {
qi.lock.Lock()
defer qi.lock.Unlock()

if !quotav1.Equals(qi.CalculateInfo.Max, quotaInfo.CalculateInfo.Max) ||
!quotav1.Equals(qi.CalculateInfo.Min, quotaInfo.CalculateInfo.Min) ||
!quotav1.Equals(qi.CalculateInfo.SharedWeight, quotaInfo.CalculateInfo.SharedWeight) ||
qi.AllowLentResource != quotaInfo.AllowLentResource ||
if qi.AllowLentResource != quotaInfo.AllowLentResource ||
qi.IsParent != quotaInfo.IsParent ||
qi.ParentName != quotaInfo.ParentName {
return true
Expand Down Expand Up @@ -439,9 +444,9 @@ type QuotaTopoNode struct {
childGroupQuotaInfos map[string]*QuotaTopoNode
}

func NewQuotaTopoNode(quotaInfo *QuotaInfo) *QuotaTopoNode {
func NewQuotaTopoNode(name string, quotaInfo *QuotaInfo) *QuotaTopoNode {
return &QuotaTopoNode{
name: quotaInfo.Name,
name: name,
quotaInfo: quotaInfo, // not deepCopy
childGroupQuotaInfos: make(map[string]*QuotaTopoNode),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/elasticquota/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,8 @@ func TestPlugin_Recover(t *testing.T) {
suit.Handle.ClientSet().CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
}
time.Sleep(100 * time.Millisecond)
suit.AddQuota("test1", "test-parent", 100, 1000, 0, 0, 0, 0, false, "")
suit.AddQuota("test-parent", extension.RootQuotaName, 100, 1000, 0, 0, 0, 0, true, "")
suit.AddQuota("test1", "test-parent", 100, 1000, 0, 0, 0, 0, false, "")
time.Sleep(100 * time.Millisecond)
pods := []*corev1.Pod{
defaultCreatePodWithQuotaName("1", "test1", 10, 10, 10),
Expand Down

0 comments on commit e70b974

Please sign in to comment.