Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add volume readiness check and corresponding tests for CreateVolume #317

Merged
merged 4 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ vet: ## Run go vet against code.

test: generate-mocks fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out
go mod tidy

.PHONY: add-license
add-license: addlicense ## Add license headers to all go files.
Expand Down
8 changes: 8 additions & 0 deletions pkg/driver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package driver

import (
"time"

"github.com/onmetal/onmetal-csi-driver/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -46,4 +48,10 @@ const (
CSIDriverName = "csi.onmetal.de"
topologyKey = "topology." + CSIDriverName + "/zone"
volumeFieldOwner = client.FieldOwner("csi.onmetal.de/volume")

// Constants for volume polling mechanism

waitVolumeInitDelay = 1 * time.Second // Initial delay before starting to poll for volume status
waitVolumeFactor = 1.1 // Factor by which the delay increases with each poll attempt
waitVolumeActiveSteps = 5 // Number of consecutive active steps to wait for volume status change
)
35 changes: 33 additions & 2 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -129,7 +130,12 @@ func (d *driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if err := d.onmetalClient.Patch(ctx, volume, client.Apply, volumeFieldOwner, client.ForceOwnership); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to patch volume %s: %v", client.ObjectKeyFromObject(volume), err)
}
klog.InfoS("Applied volume", "Volume", client.ObjectKeyFromObject(volume))

if err := waitForVolumeAvailability(ctx, d.onmetalClient, volume); err != nil {
return nil, fmt.Errorf("failed to confirm availability of the volume: %w", err)
}

klog.InfoS("Applied volume", "Volume", client.ObjectKeyFromObject(volume), "State", storagev1alpha1.VolumeStateAvailable)

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand All @@ -148,6 +154,31 @@ func (d *driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}, nil
}

// waitForVolumeAvailability is a helper function that waits for a volume to become available.
// It uses an exponential backoff strategy to periodically check the status of the volume.
// The function returns an error if the volume does not become available within the specified number of attempts.
func waitForVolumeAvailability(ctx context.Context, onmetalClient client.Client, volume *storagev1alpha1.Volume) error {
backoff := wait.Backoff{
Duration: waitVolumeInitDelay,
Factor: waitVolumeFactor,
Steps: waitVolumeActiveSteps,
}

err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
err := onmetalClient.Get(ctx, client.ObjectKey{Namespace: volume.Namespace, Name: volume.Name}, volume)
if err == nil && volume.Status.State == storagev1alpha1.VolumeStateAvailable {
return true, nil
}
return false, err
})

if wait.Interrupted(err) {
return fmt.Errorf("volume %s did not reach '%s' state within the defined timeout: %w", client.ObjectKeyFromObject(volume), storagev1alpha1.VolumeStateAvailable, err)
}

return err
}

func (d *driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
klog.InfoS("Deleting volume", "Volume", req.GetVolumeId())
if req.GetVolumeId() == "" {
Expand All @@ -162,7 +193,7 @@ func (d *driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
if err := d.onmetalClient.Delete(ctx, vol); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to delete volume %s: %v", client.ObjectKeyFromObject(vol), err)
}
klog.InfoS("Deleted deleted volume", "Volume", req.GetVolumeId())
klog.InfoS("Deleted volume", "Volume", req.GetVolumeId())
return &csi.DeleteVolumeResponse{}, nil
}

Expand Down
113 changes: 98 additions & 15 deletions pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ var _ = Describe("Controller", func() {

By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

// Start a go routine to patch the created Volume to an available state in order to succeed
// the CreateVolume call as it waits for a Volume to reach an available state.
go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume = &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
res, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -120,25 +146,35 @@ var _ = Describe("Controller", func() {
HaveKeyWithValue("fstype", "ext4"),
HaveKeyWithValue("creation_time", ContainSubstring(strconv.Itoa(time.Now().Year()))))),
))

By("patching the volume state to be available")
volume = &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
})

It("should not assign the volume to a volume pool if the pool is not available", func(ctx SpecContext) {
By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-wrong-pool",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
res, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-wrong-pool",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -189,6 +225,29 @@ var _ = Describe("Controller", func() {
It("should delete a volume", func(ctx SpecContext) {
By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-to-delete",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
_, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-to-delete",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -294,6 +353,30 @@ var _ = Describe("Controller", func() {

By("creating a volume with volume class other than expand only")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-not-expand",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
_, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-not-expand",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -335,7 +418,7 @@ var _ = Describe("Controller", func() {
RequiredBytes: newVolumeSize,
},
})
Expect((err)).Should(MatchError("volume class resize policy does not allow resizing"))
Expect(err).Should(MatchError("volume class resize policy does not allow resizing"))
})

It("should publish/unpublish a volume on a node", func(ctx SpecContext) {
Expand Down
Loading