Skip to content

Commit

Permalink
scheduler: add quota inplace (#2313)
Browse files Browse the repository at this point in the history
Signed-off-by: chuanyun.lcy <[email protected]>
Co-authored-by: chuanyun.lcy <[email protected]>
  • Loading branch information
shaloulcy and chuanyun.lcy authored Jan 9, 2025
1 parent 71c9682 commit 8d0e6ce
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 11 deletions.
32 changes: 32 additions & 0 deletions pkg/scheduler/frameworkext/helper/forcesync_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,35 @@ func ForceSyncFromInformer(stopCh <-chan struct{}, cacheSyncer CacheSyncer, info
syncEventHandler.syncDone()
return registration, err
}

func ForceSyncFromInformerWithReplace(stopCh <-chan struct{}, cacheSyncer CacheSyncer, informer cache.SharedInformer, handler cache.ResourceEventHandler, replaceHandler func([]interface{}) error, options ...Option) (cache.ResourceEventHandlerRegistration, error) {
syncEventHandler := newForceSyncEventHandler(handler, options...)
registration, err := informer.AddEventHandlerWithResyncPeriod(syncEventHandler, syncEventHandler.resyncPeriod)
if err != nil {
return nil, err
}

if cacheSyncer != nil {
cacheSyncer.Start(stopCh)
cacheSyncer.WaitForCacheSync(stopCh)
}

allObjects := informer.GetStore().List()
// record object uid.
for _, obj := range allObjects {
if metaAccessor, ok := obj.(metav1.ObjectMetaAccessor); ok {
objectMeta := metaAccessor.GetObjectMeta()
resourceVersion, err := strconv.ParseInt(objectMeta.GetResourceVersion(), 10, 64)
if err == nil {
syncEventHandler.objects[objectMeta.GetUID()] = resourceVersion
}
}
}
// replace objects
err = replaceHandler(allObjects)
if err != nil {
return nil, err
}
syncEventHandler.syncDone()
return registration, err
}
49 changes: 49 additions & 0 deletions pkg/scheduler/frameworkext/helper/forcesync_eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,52 @@ func TestSyncedEventHandler(t *testing.T) {
}, wait.NeverStop)
assert.NoError(t, err)
}

func TestSyncedEventHandlerWithReplace(t *testing.T) {
var objects []runtime.Object
for i := 0; i < 10; i++ {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
Name: fmt.Sprintf("node-%d", i),
ResourceVersion: fmt.Sprintf("%d", i+1),
},
}
objects = append(objects, node)
}
fakeClientSet := kubefake.NewSimpleClientset(objects...)
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
addTimes := map[string]int{}
var wg sync.WaitGroup
wg.Add(10)
ForceSyncFromInformerWithReplace(context.TODO().Done(), sharedInformerFactory, nodeInformer.Informer(), cache.ResourceEventHandlerFuncs{}, func(objs []interface{}) error {
for _, obj := range objs {
node := obj.(*corev1.Node)
addTimes[node.Name]++
wg.Done()
}
return nil
})
wg.Wait()
for _, v := range addTimes {
if v > 1 {
t.Errorf("unexpected add times, want 1 but got %d", v)
break
}
}
node, err := nodeInformer.Lister().Get("node-0")
assert.NoError(t, err)
assert.NotNil(t, node)
node = node.DeepCopy()
node.ResourceVersion = "100"
_, err = fakeClientSet.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
assert.NoError(t, err)
err = wait.PollUntil(1*time.Second, func() (done bool, err error) {
node, err := nodeInformer.Lister().Get("node-0")
assert.NoError(t, err)
assert.NotNil(t, node)
return node.ResourceVersion == "100", nil
}, wait.NeverStop)
assert.NoError(t, err)
}
93 changes: 85 additions & 8 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func NewGroupQuotaManager(treeID string, systemGroupMax, defaultGroupMax v1.Reso
quotaManager.quotaInfoMap[extension.DefaultQuotaName].setMaxQuotaNoLock(defaultGroupMax)
}

quotaManager.quotaInfoMap[extension.RootQuotaName] = NewQuotaInfo(true, false, extension.RootQuotaName, "")
rootQuotaInfo := NewQuotaInfo(true, false, extension.RootQuotaName, "")
quotaManager.quotaInfoMap[extension.RootQuotaName] = rootQuotaInfo
quotaManager.quotaTopoNodeMap[extension.RootQuotaName] = NewQuotaTopoNode(extension.RootQuotaName, rootQuotaInfo)
quotaManager.runtimeQuotaCalculatorMap[extension.RootQuotaName] = NewRuntimeQuotaCalculator(extension.RootQuotaName)
quotaManager.setScaleMinQuotaEnabled(true)
return quotaManager
Expand Down Expand Up @@ -401,8 +403,8 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
}
localQuotaInfo.updateQuotaInfoFromRemote(newQuotaInfo)
} else {
// TODO: inplace add
gqm.quotaInfoMap[quotaName] = newQuotaInfo
gqm.updateQuotaInternalNoLock(newQuotaInfo, nil)
return nil
}
}

Expand All @@ -412,10 +414,25 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
return nil
}

func (gqm *GroupQuotaManager) UpdateQuotaInfo(quota *v1alpha1.ElasticQuota) {
gqm.hierarchyUpdateLock.Lock()
defer gqm.hierarchyUpdateLock.Unlock()

newQuotaInfo := NewQuotaInfoFromQuota(quota)
gqm.quotaInfoMap[quota.Name] = newQuotaInfo
}

func (gqm *GroupQuotaManager) ResetQuota() {
gqm.hierarchyUpdateLock.Lock()
defer gqm.hierarchyUpdateLock.Unlock()

gqm.resetQuotaNoLock()
}

func (gqm *GroupQuotaManager) resetQuotaNoLock() {
start := time.Now()
defer func() {
klog.Infof("reset quota tree %v take %v", time.Since(start))
klog.Infof("reset quota tree %v take %v", gqm.treeID, time.Since(start))
}()
// rebuild gqm.quotaTopoNodeMap
gqm.rebuildQuotaTopoNodeMapNoLock()
Expand All @@ -427,7 +444,7 @@ func (gqm *GroupQuotaManager) resetQuotaNoLock() {
func (gqm *GroupQuotaManager) rebuildQuotaTopoNodeMapNoLock() {
// rebuild QuotaTopoNodeMap
gqm.quotaTopoNodeMap = make(map[string]*QuotaTopoNode)
rootNode := NewQuotaTopoNode(extension.RootQuotaName, NewQuotaInfo(true, true, extension.RootQuotaName, extension.RootQuotaName))
rootNode := NewQuotaTopoNode(extension.RootQuotaName, NewQuotaInfo(true, false, extension.RootQuotaName, ""))
gqm.quotaTopoNodeMap[extension.RootQuotaName] = rootNode

// add node according to the quotaInfoMap
Expand All @@ -449,6 +466,7 @@ func (gqm *GroupQuotaManager) rebuildQuotaTopoNodeMapNoLock() {
parQuotaTopoNode = NewQuotaTopoNode(topoNode.quotaInfo.ParentName, &QuotaInfo{
Name: topoNode.quotaInfo.ParentName,
})
gqm.quotaTopoNodeMap[topoNode.quotaInfo.ParentName] = parQuotaTopoNode
}
topoNode.parQuotaTopoNode = parQuotaTopoNode
parQuotaTopoNode.addChildGroupQuotaInfo(topoNode)
Expand Down Expand Up @@ -976,23 +994,82 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaAllocated(deltaAl
}

func (gqm *GroupQuotaManager) updateQuotaInternalNoLock(newQuotaInfo, oldQuotaInfo *QuotaInfo) {
// update topogy node map
gqm.updateQuotaTopoNodeNoLock(newQuotaInfo, oldQuotaInfo)

// update quota info map
if oldQuotaInfo == nil {
gqm.runtimeQuotaCalculatorMap[newQuotaInfo.Name] = NewRuntimeQuotaCalculator(newQuotaInfo.Name)
if gqm.runtimeQuotaCalculatorMap[newQuotaInfo.ParentName] == nil {
gqm.runtimeQuotaCalculatorMap[newQuotaInfo.ParentName] = NewRuntimeQuotaCalculator(newQuotaInfo.ParentName)
}
gqm.quotaInfoMap[newQuotaInfo.Name] = NewQuotaInfo(newQuotaInfo.IsParent, newQuotaInfo.AllowLentResource, newQuotaInfo.Name, newQuotaInfo.ParentName)
}

oldMax := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMax = oldQuotaInfo.CalculateInfo.Max
}
// max changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Max, oldQuotaInfo.CalculateInfo.Max) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Max, oldMax) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v max change, oldMax: %v, newMax: %v",
newQuotaInfo.Name, util.DumpJSON(oldMax), util.DumpJSON(newQuotaInfo.CalculateInfo.Max))
gqm.doUpdateOneGroupMaxQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Max)
}

// update resource keys
gqm.updateResourceKeyNoLock()

oldMin := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMin = oldQuotaInfo.CalculateInfo.Min
}
// min changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Min, oldQuotaInfo.CalculateInfo.Min) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Min, oldMin) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v min change, oldMin: %v, newMin: %v",
newQuotaInfo.Name, util.DumpJSON(oldMin), util.DumpJSON(newQuotaInfo.CalculateInfo.Min))
gqm.doUpdateOneGroupMinQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Min)
}

oldSharedWeight := v1.ResourceList{}
if oldQuotaInfo != nil {
oldSharedWeight = oldQuotaInfo.CalculateInfo.SharedWeight
}
// sharedweight changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.SharedWeight, oldQuotaInfo.CalculateInfo.SharedWeight) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.SharedWeight, oldSharedWeight) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v sharedWeight change, oldMin: %v, newMin: %v",
newQuotaInfo.Name, util.DumpJSON(oldSharedWeight), util.DumpJSON(newQuotaInfo.CalculateInfo.SharedWeight))
gqm.doUpdateOneGroupSharedWeightNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.SharedWeight)
}

}

func (gqm *GroupQuotaManager) updateQuotaTopoNodeNoLock(newQuotaInfo, oldQuotaInfo *QuotaInfo) {
if oldQuotaInfo != nil {
parentNode, ok := gqm.quotaTopoNodeMap[oldQuotaInfo.ParentName]
if ok {
delete(parentNode.childGroupQuotaInfos, oldQuotaInfo.Name)
}
}

node, ok := gqm.quotaTopoNodeMap[newQuotaInfo.Name]
if !ok {
node = NewQuotaTopoNode(newQuotaInfo.Name, newQuotaInfo)
gqm.quotaTopoNodeMap[newQuotaInfo.Name] = node
} else {
node.quotaInfo = newQuotaInfo
}

parentNode, ok := gqm.quotaTopoNodeMap[newQuotaInfo.ParentName]
if !ok {
parentNode = NewQuotaTopoNode(newQuotaInfo.ParentName, &QuotaInfo{
Name: newQuotaInfo.ParentName,
})
gqm.quotaTopoNodeMap[newQuotaInfo.ParentName] = parentNode
}
parentNode.childGroupQuotaInfos[newQuotaInfo.Name] = node
}

func (gqm *GroupQuotaManager) doUpdateOneGroupMaxQuotaNoLock(quotaName string, newMax v1.ResourceList) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
quotaInfoLen := len(curToAllParInfos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,9 @@ func NewGroupQuotaManagerForTest() *GroupQuotaManager {
}
quotaManager.quotaInfoMap[extension.SystemQuotaName] = systemQuotaInfo
quotaManager.quotaInfoMap[extension.DefaultQuotaName] = defaultQuotaInfo
quotaManager.quotaInfoMap[extension.RootQuotaName] = NewQuotaInfo(true, false, extension.RootQuotaName, "")
rootQuotaInfo := NewQuotaInfo(true, false, extension.RootQuotaName, "")
quotaManager.quotaInfoMap[extension.RootQuotaName] = rootQuotaInfo
quotaManager.quotaTopoNodeMap[extension.RootQuotaName] = NewQuotaTopoNode(extension.RootQuotaName, rootQuotaInfo)
quotaManager.runtimeQuotaCalculatorMap[extension.RootQuotaName] = NewRuntimeQuotaCalculator(extension.RootQuotaName)
return quotaManager
}
Expand Down Expand Up @@ -2169,3 +2171,96 @@ func TestGroupQuotaManager_ImmediateIgnoreTerminatingPod(t *testing.T) {
assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetRequest())
assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetUsed())
}

func TestGroupQuotaManager_UpdateQuotaInternalNoLock(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()
gqm.UpdateClusterTotalResource(createResourceList(1000, 1000))
assert.Equal(t, 1, len(gqm.runtimeQuotaCalculatorMap))
assert.Equal(t, 1, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 3, len(gqm.quotaInfoMap))

quota := CreateQuota("test1", extension.RootQuotaName, 64, 100, 50, 80, true, false)
quotaInfo := NewQuotaInfoFromQuota(quota)

// quota not exist, add quota info
// rootQuota requests[0,0]
// |-- test1 Max[64, 100] Min[50,80] request[0,0]
gqm.updateQuotaInternalNoLock(quotaInfo, nil)
assert.Equal(t, 2, len(gqm.runtimeQuotaCalculatorMap))
assert.Equal(t, 2, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 4, len(gqm.quotaInfoMap))

quotaInfo = gqm.getQuotaInfoByNameNoLock("test1")
assert.True(t, quotaInfo != nil)
assert.False(t, quotaInfo.IsParent)
assert.True(t, quotaInfo.AllowLentResource)
assert.Equal(t, createResourceList(64, 100), quotaInfo.CalculateInfo.Max)
assert.Equal(t, createResourceList(50, 80), quotaInfo.CalculateInfo.Min)
assert.Equal(t, createResourceList(50, 80), quotaInfo.CalculateInfo.AutoScaleMin)
assert.Equal(t, v1.ResourceList{}, quotaInfo.CalculateInfo.ChildRequest)
assert.Equal(t, v1.ResourceList{}, quotaInfo.CalculateInfo.Request)

rootQuotaInfo := gqm.getQuotaInfoByNameNoLock(extension.RootQuotaName)
assert.Equal(t, v1.ResourceList{}, rootQuotaInfo.CalculateInfo.Request)

// add requests
// rootQuota requests[64,100]
// |-- test1 Max[64, 100] Min[50,80] request[100,100]
request := createResourceList(100, 100)
gqm.updateGroupDeltaRequestNoLock("test1", request, request, 0)
quotaInfo = gqm.getQuotaInfoByNameNoLock("test1")
assert.Equal(t, createResourceList(100, 100), quotaInfo.CalculateInfo.ChildRequest)
assert.Equal(t, createResourceList(100, 100), quotaInfo.CalculateInfo.Request)

rootQuotaInfo = gqm.getQuotaInfoByNameNoLock(extension.RootQuotaName)
assert.Equal(t, createResourceList(64, 100), rootQuotaInfo.CalculateInfo.Request)

// change test1 min and max
// rootQuota requests[100,100]
// |-- test1 Max[200, 200] Min[60,100] request[100,100]
quota = CreateQuota("test1", extension.RootQuotaName, 200, 200, 60, 100, true, false)
newQuotaInfo := NewQuotaInfoFromQuota(quota)
gqm.updateQuotaInternalNoLock(newQuotaInfo, quotaInfo)
quotaInfo = gqm.getQuotaInfoByNameNoLock("test1")
assert.Equal(t, createResourceList(200, 200), quotaInfo.CalculateInfo.Max)
assert.Equal(t, createResourceList(60, 100), quotaInfo.CalculateInfo.Min)
assert.Equal(t, createResourceList(60, 100), quotaInfo.CalculateInfo.AutoScaleMin)
assert.Equal(t, createResourceList(100, 100), quotaInfo.CalculateInfo.ChildRequest)
assert.Equal(t, createResourceList(100, 100), quotaInfo.CalculateInfo.Request)

rootQuotaInfo = gqm.getQuotaInfoByNameNoLock(extension.RootQuotaName)
assert.Equal(t, createResourceList(100, 100), rootQuotaInfo.CalculateInfo.Request)
}

func TestGroupQuotaManager_UpdateQuotaTopoNodeNoLock(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()

test1 := NewQuotaInfo(true, true, "test1", extension.RootQuotaName)
test2 := NewQuotaInfo(true, true, "test2", extension.RootQuotaName)
test11 := NewQuotaInfo(false, true, "test11", "test1")

// add test1
gqm.updateQuotaTopoNodeNoLock(test1, nil)
assert.Equal(t, 2, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 1, len(gqm.quotaTopoNodeMap[extension.RootQuotaName].childGroupQuotaInfos))

// add test2
gqm.updateQuotaTopoNodeNoLock(test2, nil)
assert.Equal(t, 3, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 2, len(gqm.quotaTopoNodeMap[extension.RootQuotaName].childGroupQuotaInfos))

// add test11
gqm.updateQuotaTopoNodeNoLock(test11, nil)
assert.Equal(t, 4, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 2, len(gqm.quotaTopoNodeMap[extension.RootQuotaName].childGroupQuotaInfos))
assert.Equal(t, 1, len(gqm.quotaTopoNodeMap["test1"].childGroupQuotaInfos))
assert.Equal(t, 0, len(gqm.quotaTopoNodeMap["test2"].childGroupQuotaInfos))

// change test11 parent to test2
test12 := NewQuotaInfo(false, true, "test11", "test2")
gqm.updateQuotaTopoNodeNoLock(test12, test11)
assert.Equal(t, 4, len(gqm.quotaTopoNodeMap))
assert.Equal(t, 2, len(gqm.quotaTopoNodeMap[extension.RootQuotaName].childGroupQuotaInfos))
assert.Equal(t, 0, len(gqm.quotaTopoNodeMap["test1"].childGroupQuotaInfos))
assert.Equal(t, 1, len(gqm.quotaTopoNodeMap["test2"].childGroupQuotaInfos))
}
7 changes: 5 additions & 2 deletions pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error)
elasticQuota.createRootQuotaIfNotPresent()
elasticQuota.createSystemQuotaIfNotPresent()
elasticQuota.createDefaultQuotaIfNotPresent()
frameworkexthelper.ForceSyncFromInformer(ctx.Done(), scheSharedInformerFactory, informer, cache.ResourceEventHandlerFuncs{
_, err := frameworkexthelper.ForceSyncFromInformerWithReplace(ctx.Done(), scheSharedInformerFactory, informer, cache.ResourceEventHandlerFuncs{
AddFunc: elasticQuota.OnQuotaAdd,
UpdateFunc: elasticQuota.OnQuotaUpdate,
DeleteFunc: elasticQuota.OnQuotaDelete,
})
}, elasticQuota.ReplaceQuotas)
if err != nil {
return nil, err
}

nodeInformer := handle.SharedInformerFactory().Core().V1().Nodes().Informer()
frameworkexthelper.ForceSyncFromInformer(ctx.Done(), handle.SharedInformerFactory(), nodeInformer, cache.ResourceEventHandlerFuncs{
Expand Down
Loading

0 comments on commit 8d0e6ce

Please sign in to comment.