Skip to content

Commit

Permalink
Add volume readiness check and corresponding tests for CreateVolume (
Browse files Browse the repository at this point in the history
…#317)

* Add 'go mod tidy' to 'test' target

* Add volume readiness check and corresponding tests for 'CreateVolume'

This commit introduces a new function, 'waitForVolumeAvailability', to the 'CreateVolume' function of our CSI driver. This function ensures that a volume reaches the 'Available' state before the 'CreateVolume' request is completed. To validate this functionality, corresponding tests have been added. This enhancement improves reliability and provides clearer error messages for debugging.

* Restructure go routines in CreateVolume call

* refactor: refactor log message for applied volume state

- Change the log message from "Volume applied and is now available" to "Applied volume" with additional information about the volume state.

Signed-off-by: Andreas Fritzler <[email protected]>

---------

Signed-off-by: Andreas Fritzler <[email protected]>
Co-authored-by: Andreas Fritzler <[email protected]>
  • Loading branch information
sujeet01 and afritzler authored Aug 29, 2023
1 parent 468cc51 commit 3b5ad66
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 17 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ vet: ## Run go vet against code.

test: generate-mocks fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out
go mod tidy

.PHONY: add-license
add-license: addlicense ## Add license headers to all go files.
Expand Down
8 changes: 8 additions & 0 deletions pkg/driver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package driver

import (
"time"

"github.com/onmetal/onmetal-csi-driver/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -46,4 +48,10 @@ const (
CSIDriverName = "csi.onmetal.de"
topologyKey = "topology." + CSIDriverName + "/zone"
volumeFieldOwner = client.FieldOwner("csi.onmetal.de/volume")

// Constants for volume polling mechanism

waitVolumeInitDelay = 1 * time.Second // Initial delay before starting to poll for volume status
waitVolumeFactor = 1.1 // Factor by which the delay increases with each poll attempt
waitVolumeActiveSteps = 5 // Number of consecutive active steps to wait for volume status change
)
35 changes: 33 additions & 2 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -129,7 +130,12 @@ func (d *driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if err := d.onmetalClient.Patch(ctx, volume, client.Apply, volumeFieldOwner, client.ForceOwnership); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to patch volume %s: %v", client.ObjectKeyFromObject(volume), err)
}
klog.InfoS("Applied volume", "Volume", client.ObjectKeyFromObject(volume))

if err := waitForVolumeAvailability(ctx, d.onmetalClient, volume); err != nil {
return nil, fmt.Errorf("failed to confirm availability of the volume: %w", err)
}

klog.InfoS("Applied volume", "Volume", client.ObjectKeyFromObject(volume), "State", storagev1alpha1.VolumeStateAvailable)

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand All @@ -148,6 +154,31 @@ func (d *driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}, nil
}

// waitForVolumeAvailability is a helper function that waits for a volume to become available.
// It uses an exponential backoff strategy to periodically check the status of the volume.
// The function returns an error if the volume does not become available within the specified number of attempts.
func waitForVolumeAvailability(ctx context.Context, onmetalClient client.Client, volume *storagev1alpha1.Volume) error {
backoff := wait.Backoff{
Duration: waitVolumeInitDelay,
Factor: waitVolumeFactor,
Steps: waitVolumeActiveSteps,
}

err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
err := onmetalClient.Get(ctx, client.ObjectKey{Namespace: volume.Namespace, Name: volume.Name}, volume)
if err == nil && volume.Status.State == storagev1alpha1.VolumeStateAvailable {
return true, nil
}
return false, err
})

if wait.Interrupted(err) {
return fmt.Errorf("volume %s did not reach '%s' state within the defined timeout: %w", client.ObjectKeyFromObject(volume), storagev1alpha1.VolumeStateAvailable, err)
}

return err
}

func (d *driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
klog.InfoS("Deleting volume", "Volume", req.GetVolumeId())
if req.GetVolumeId() == "" {
Expand All @@ -162,7 +193,7 @@ func (d *driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
if err := d.onmetalClient.Delete(ctx, vol); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to delete volume %s: %v", client.ObjectKeyFromObject(vol), err)
}
klog.InfoS("Deleted deleted volume", "Volume", req.GetVolumeId())
klog.InfoS("Deleted volume", "Volume", req.GetVolumeId())
return &csi.DeleteVolumeResponse{}, nil
}

Expand Down
113 changes: 98 additions & 15 deletions pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ var _ = Describe("Controller", func() {

By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

// Start a go routine to patch the created Volume to an available state in order to succeed
// the CreateVolume call as it waits for a Volume to reach an available state.
go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume = &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
res, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -120,25 +146,35 @@ var _ = Describe("Controller", func() {
HaveKeyWithValue("fstype", "ext4"),
HaveKeyWithValue("creation_time", ContainSubstring(strconv.Itoa(time.Now().Year()))))),
))

By("patching the volume state to be available")
volume = &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
})

It("should not assign the volume to a volume pool if the pool is not available", func(ctx SpecContext) {
By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-wrong-pool",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
res, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-wrong-pool",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -189,6 +225,29 @@ var _ = Describe("Controller", func() {
It("should delete a volume", func(ctx SpecContext) {
By("creating a volume through the csi driver")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-to-delete",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
_, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-to-delete",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -294,6 +353,30 @@ var _ = Describe("Controller", func() {

By("creating a volume with volume class other than expand only")
volSize := int64(5 * 1024 * 1024 * 1024)

go func() {
defer GinkgoRecover()
By("waiting for the volume to be created")
volume := &storagev1alpha1.Volume{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: "volume-not-expand",
},
}
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStatePending),
))

By("patching the volume state to make it available")
volumeBase := volume.DeepCopy()
volume.Status.State = storagev1alpha1.VolumeStateAvailable
Expect(k8sClient.Status().Patch(ctx, volume, client.MergeFrom(volumeBase))).To(Succeed())
Eventually(Object(volume)).Should(SatisfyAll(
HaveField("Status.State", storagev1alpha1.VolumeStateAvailable),
))
}()

By("creating a Volume")
_, err := drv.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "volume-not-expand",
CapacityRange: &csi.CapacityRange{RequiredBytes: volSize},
Expand Down Expand Up @@ -335,7 +418,7 @@ var _ = Describe("Controller", func() {
RequiredBytes: newVolumeSize,
},
})
Expect((err)).Should(MatchError("volume class resize policy does not allow resizing"))
Expect(err).Should(MatchError("volume class resize policy does not allow resizing"))
})

It("should publish/unpublish a volume on a node", func(ctx SpecContext) {
Expand Down

0 comments on commit 3b5ad66

Please sign in to comment.