Skip to content

Commit d060f96

Browse files
Merge pull request #2283 from mpatlasov/OCPBUGS-55348-Improve-CSILimits-plugin-for-release-4.18
UPSTREAM: 127757 : OCPBUGS-55639: scheduler: Improve CSILimits plugi…
2 parents af98ede + bf06ebe commit d060f96

File tree

5 files changed

+136
-2
lines changed

5 files changed

+136
-2
lines changed

pkg/scheduler/framework/plugins/nodevolumelimits/csi.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
v1 "k8s.io/api/core/v1"
2424
storagev1 "k8s.io/api/storage/v1"
2525
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/labels"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/util/rand"
2829
corelisters "k8s.io/client-go/listers/core/v1"
@@ -56,6 +57,7 @@ type CSILimits struct {
5657
pvLister corelisters.PersistentVolumeLister
5758
pvcLister corelisters.PersistentVolumeClaimLister
5859
scLister storagelisters.StorageClassLister
60+
vaLister storagelisters.VolumeAttachmentLister
5961

6062
randomVolumeIDPrefix string
6163

@@ -148,6 +150,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
148150
logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
149151
}
150152

153+
// Count CSI volumes from the new pod
151154
newVolumes := make(map[string]string)
152155
if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil {
153156
if apierrors.IsNotFound(err) {
@@ -168,6 +171,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
168171
return nil
169172
}
170173

174+
// Count CSI volumes from existing pods
171175
attachedVolumes := make(map[string]string)
172176
for _, existingPod := range nodeInfo.Pods {
173177
if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
@@ -182,6 +186,19 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
182186
attachedVolumeCount[volumeLimitKey]++
183187
}
184188

189+
// Count CSI volumes from VolumeAttachments
190+
volumeAttachments, err := pl.getNodeVolumeAttachmentInfo(logger, node.Name)
191+
if err != nil {
192+
return framework.AsStatus(err)
193+
}
194+
195+
for volumeUniqueName, driverName := range volumeAttachments {
196+
// Avoid double-counting volumes already used by existing pods
197+
if _, exists := attachedVolumes[volumeUniqueName]; !exists {
198+
attachedVolumeCount[driverName]++
199+
}
200+
}
201+
185202
newVolumeCount := map[string]int{}
186203
for _, volumeLimitKey := range newVolumes {
187204
newVolumeCount[volumeLimitKey]++
@@ -264,7 +281,7 @@ func (pl *CSILimits) filterAttachableVolumes(
264281
continue
265282
}
266283

267-
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
284+
volumeUniqueName := getVolumeUniqueName(driverName, volumeHandle)
268285
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
269286
result[volumeUniqueName] = volumeLimitKey
270287
}
@@ -306,7 +323,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
306323
if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
307324
return nil
308325
}
309-
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
326+
volumeUniqueName := getVolumeUniqueName(driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
310327
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
311328
result[volumeUniqueName] = volumeLimitKey
312329
return nil
@@ -416,13 +433,15 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
416433
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
417434
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
418435
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
436+
vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister()
419437
csiTranslator := csitrans.New()
420438

421439
return &CSILimits{
422440
csiNodeLister: csiNodesLister,
423441
pvLister: pvLister,
424442
pvcLister: pvcLister,
425443
scLister: scLister,
444+
vaLister: vaLister,
426445
randomVolumeIDPrefix: rand.String(32),
427446
translator: csiTranslator,
428447
}, nil
@@ -443,3 +462,40 @@ func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) m
443462
}
444463
return nodeVolumeLimits
445464
}
465+
466+
// getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node.
467+
func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) {
468+
volumeAttachments := make(map[string]string)
469+
vas, err := pl.vaLister.List(labels.Everything())
470+
if err != nil {
471+
return nil, err
472+
}
473+
for _, va := range vas {
474+
if va.Spec.NodeName == nodeName {
475+
if va.Spec.Attacher == "" {
476+
logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va))
477+
continue
478+
}
479+
if va.Spec.Source.PersistentVolumeName == nil {
480+
logger.V(5).Info("VolumeAttachment has no PV name", "VolumeAttachment", klog.KObj(va))
481+
continue
482+
}
483+
pv, err := pl.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
484+
if err != nil {
485+
logger.V(5).Info("Unable to get PV for VolumeAttachment", "VolumeAttachment", klog.KObj(va), "err", err)
486+
continue
487+
}
488+
if pv.Spec.CSI == nil {
489+
logger.V(5).Info("PV is not a CSI volume", "PV", klog.KObj(pv))
490+
continue
491+
}
492+
volumeID := getVolumeUniqueName(va.Spec.Attacher, pv.Spec.CSI.VolumeHandle)
493+
volumeAttachments[volumeID] = volumeutil.GetCSIAttachLimitKey(va.Spec.Attacher)
494+
}
495+
}
496+
return volumeAttachments, nil
497+
}
498+
499+
func getVolumeUniqueName(driverName, volumeHandle string) string {
500+
return fmt.Sprintf("%s/%s", driverName, volumeHandle)
501+
}

pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ func TestCSILimits(t *testing.T) {
282282
extraClaims []v1.PersistentVolumeClaim
283283
filterName string
284284
maxVols int32
285+
vaCount int
285286
driverNames []string
286287
test string
287288
migrationEnabled bool
@@ -290,6 +291,27 @@ func TestCSILimits(t *testing.T) {
290291
wantStatus *framework.Status
291292
wantPreFilterStatus *framework.Status
292293
}{
294+
{
295+
newPod: csiEBSOneVolPod,
296+
existingPods: []*v1.Pod{},
297+
filterName: "csi",
298+
maxVols: 2,
299+
driverNames: []string{ebsCSIDriverName},
300+
vaCount: 2,
301+
test: "should count VolumeAttachments towards volume limit when no pods exist",
302+
limitSource: "csinode",
303+
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
304+
},
305+
{
306+
newPod: csiEBSOneVolPod,
307+
existingPods: []*v1.Pod{},
308+
filterName: "csi",
309+
maxVols: 2,
310+
driverNames: []string{ebsCSIDriverName},
311+
vaCount: 1,
312+
test: "should schedule pod when VolumeAttachments count does not exceed limit",
313+
limitSource: "csinode",
314+
},
293315
{
294316
newPod: csiEBSOneVolPod,
295317
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
@@ -623,6 +645,7 @@ func TestCSILimits(t *testing.T) {
623645
pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...),
624646
pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
625647
scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]),
648+
vaLister: getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...),
626649
randomVolumeIDPrefix: rand.String(32),
627650
translator: csiTranslator,
628651
}
@@ -706,6 +729,28 @@ func TestCSILimitsQHint(t *testing.T) {
706729
}
707730
}
708731

732+
func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister {
733+
vaLister := tf.VolumeAttachmentLister{}
734+
for _, driver := range driverNames {
735+
for j := 0; j < count; j++ {
736+
pvName := fmt.Sprintf("csi-%s-%d", driver, j)
737+
va := storagev1.VolumeAttachment{
738+
ObjectMeta: metav1.ObjectMeta{
739+
Name: fmt.Sprintf("va-%s-%d", driver, j),
740+
},
741+
Spec: storagev1.VolumeAttachmentSpec{
742+
NodeName: "node-for-max-pd-test-1",
743+
Attacher: driver,
744+
Source: storagev1.VolumeAttachmentSource{
745+
PersistentVolumeName: &pvName,
746+
},
747+
},
748+
}
749+
vaLister = append(vaLister, va)
750+
}
751+
}
752+
return vaLister
753+
}
709754
func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister {
710755
pvLister := tf.PersistentVolumeLister{}
711756
for _, driver := range driverNames {

pkg/scheduler/testing/framework/fake_listers.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,27 @@ func (classes StorageClassLister) Get(name string) (*storagev1.StorageClass, err
313313
func (classes StorageClassLister) List(selector labels.Selector) ([]*storagev1.StorageClass, error) {
314314
return nil, fmt.Errorf("not implemented")
315315
}
316+
317+
// VolumeAttachmentLister declares a []storagev1.VolumeAttachment type for testing.
318+
type VolumeAttachmentLister []storagev1.VolumeAttachment
319+
320+
var _ storagelisters.VolumeAttachmentLister = VolumeAttachmentLister{}
321+
322+
// List lists all VolumeAttachments in the indexer.
323+
func (val VolumeAttachmentLister) List(selector labels.Selector) (ret []*storagev1.VolumeAttachment, err error) {
324+
var list []*storagev1.VolumeAttachment
325+
for i := range val {
326+
list = append(list, &val[i])
327+
}
328+
return list, nil
329+
}
330+
331+
// Get returns a fake VolumeAttachment object from the fake VolumeAttachments by name.
332+
func (val VolumeAttachmentLister) Get(name string) (*storagev1.VolumeAttachment, error) {
333+
for _, va := range val {
334+
if va.Name == name {
335+
return &va, nil
336+
}
337+
}
338+
return nil, errors.NewNotFound(storagev1.Resource("volumeattachments"), name)
339+
}

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,7 @@ func clusterRoles() []rbacv1.ClusterRole {
581581
rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
582582
// Needed for volume limits
583583
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(),
584+
rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie(),
584585
// Needed for namespaceSelector feature in pod affinity
585586
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("namespaces").RuleOrDie(),
586587
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,14 @@ items:
852852
- get
853853
- list
854854
- watch
855+
- apiGroups:
856+
- storage.k8s.io
857+
resources:
858+
- volumeattachments
859+
verbs:
860+
- get
861+
- list
862+
- watch
855863
- apiGroups:
856864
- ""
857865
resources:

0 commit comments

Comments
 (0)