From 149c88ad499001bb36ab1e362c118c0d0b227bd6 Mon Sep 17 00:00:00 2001 From: ypgohoka Date: Wed, 2 Aug 2023 16:05:21 +0530 Subject: [PATCH] CSI Volume Cloning --- .../providers/oci/instances_test.go | 5 + pkg/csi/driver/bv_controller.go | 131 ++++++---- pkg/csi/driver/bv_controller_test.go | 5 + pkg/csi/driver/bv_node.go | 15 +- pkg/metrics/constants.go | 2 + pkg/oci/client/block_storage.go | 47 +++- pkg/util/disk/iscsi.go | 25 +- pkg/util/disk/mount_helper.go | 68 +++++ pkg/util/disk/paravirtualized.go | 6 +- pkg/volume/provisioner/block/block_test.go | 5 + pkg/volume/provisioner/fss/fss_test.go | 5 + .../cloud-provider-oci/csi_volume_cloning.go | 232 ++++++++++++++++++ .../cloud-provider-oci/csi_volume_creation.go | 3 +- test/e2e/framework/framework.go | 1 + test/e2e/framework/pvc_util.go | 149 ++++++++++- 15 files changed, 618 insertions(+), 81 deletions(-) create mode 100644 test/e2e/cloud-provider-oci/csi_volume_cloning.go diff --git a/pkg/cloudprovider/providers/oci/instances_test.go b/pkg/cloudprovider/providers/oci/instances_test.go index ecb06a22ca..44161976c4 100644 --- a/pkg/cloudprovider/providers/oci/instances_test.go +++ b/pkg/cloudprovider/providers/oci/instances_test.go @@ -611,6 +611,11 @@ func (c *MockNetworkLoadBalancerClient) UpdateNetworkSecurityGroups(ctx context. // MockBlockStorageClient mocks BlockStorage client implementation type MockBlockStorageClient struct{} +// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface. +func (*MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) { + return nil, nil +} + func (MockBlockStorageClient) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) { return nil, nil } diff --git a/pkg/csi/driver/bv_controller.go b/pkg/csi/driver/bv_controller.go index bab6e5f04d..d2ed6592c9 100644 --- a/pkg/csi/driver/bv_controller.go +++ b/pkg/csi/driver/bv_controller.go @@ -235,7 +235,80 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi } availableDomainShortName := "" - if req.AccessibilityRequirements != nil && req.AccessibilityRequirements.Preferred != nil { + volumeName := req.Name + + dimensionsMap := make(map[string]string) + dimensionsMap[metrics.ResourceOCIDDimension] = volumeName + + srcSnapshotId := "" + srcVolumeId := "" + volumeContentSource := req.GetVolumeContentSource() + if volumeContentSource != nil { + _, isVolumeContentSource_Snapshot := volumeContentSource.GetType().(*csi.VolumeContentSource_Snapshot) + _, isVolumeContentSource_Volume := volumeContentSource.GetType().(*csi.VolumeContentSource_Volume) + + if !isVolumeContentSource_Snapshot && !isVolumeContentSource_Volume { + log.Error("Unsupported volumeContentSource") + return nil, status.Error(codes.InvalidArgument, "Unsupported volumeContentSource") + } + + if isVolumeContentSource_Snapshot { + srcSnapshot := volumeContentSource.GetSnapshot() + if srcSnapshot == nil { + log.With("volumeSourceType", "snapshot").Error("Error fetching snapshot from the volumeContentSource") + return nil, status.Error(codes.InvalidArgument, "Error fetching snapshot from the volumeContentSource") + } + + id := srcSnapshot.GetSnapshotId() + volumeBackup, err := d.client.BlockStorage().GetVolumeBackup(ctx, id) + if err != nil { + if k8sapierrors.IsNotFound(err) { + log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get snapshot with ID %v", id) + return nil, status.Errorf(codes.NotFound, "Failed to get snapshot with ID %v", id) + } + log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch snapshot with ID %v with error %v", id, err) + return nil, status.Errorf(codes.Internal, "Failed to fetch snapshot with ID %v with error %v", id, err) + } + + volumeBackupSize := *volumeBackup.SizeInMBs * client.MiB + if volumeBackupSize < size { + volumeContext[needResize] = "true" + volumeContext[newSize] = strconv.FormatInt(size, 10) + } + + srcSnapshotId = id + } else { + srcVolume := volumeContentSource.GetVolume() + if srcVolume == nil { + log.With("volumeSourceType", "pvc").Error("Error fetching volume from the volumeContentSource") + return nil, status.Error(codes.InvalidArgument, "Error fetching volume from the volumeContentSource") + } + + id := srcVolume.GetVolumeId() + srcBlockVolume, err := d.client.BlockStorage().GetVolume(ctx, id) + if err != nil { + if client.IsNotFound(err) { + log.With("service", "blockstorage", "verb", "get", "resource", "blockVolume", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get volume with ID %v", id) + return nil, status.Errorf(codes.NotFound, "Failed to get volume with ID %v", id) + } + log.With("service", "blockstorage", "verb", "get", "resource", "blockVolume", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch volume with ID %v with error %v", id, err) + return nil, status.Errorf(codes.Internal, "Failed to fetch volume with ID %v with error %v", id, err) + } + + availableDomainShortName = *srcBlockVolume.AvailabilityDomain + log.With("AD", availableDomainShortName).Info("Using availability domain of source volume to provision clone volume.") + + srcBlockVolumeSize := *srcBlockVolume.SizeInMBs * client.MiB + if srcBlockVolumeSize < size { + volumeContext["needResize"] = "true" + volumeContext["newSize"] = strconv.FormatInt(size, 10) + } + + srcVolumeId = id + } + } + + if req.AccessibilityRequirements != nil && req.AccessibilityRequirements.Preferred != nil && availableDomainShortName == "" { for _, t := range req.AccessibilityRequirements.Preferred { availableDomainShortName, _ = t.Segments[kubeAPI.LabelZoneFailureDomain] log.With("AD", availableDomainShortName).Info("Using preferred topology for AD.") @@ -257,50 +330,16 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi } } - volumeName := req.Name - - dimensionsMap := make(map[string]string) - dimensionsMap[metrics.ResourceOCIDDimension] = volumeName - - srcSnapshotId := "" - volumeContentSource := req.GetVolumeContentSource() - if volumeContentSource != nil { - if _, ok := volumeContentSource.GetType().(*csi.VolumeContentSource_Snapshot); !ok { - log.Error("Unsupported volumeContentSource") - return nil, status.Error(codes.InvalidArgument, "Unsupported volumeContentSource") - } - srcSnapshot := volumeContentSource.GetSnapshot() - if srcSnapshot == nil { - log.Error("Error fetching snapshot from the volumeContentSource") - return nil, status.Error(codes.InvalidArgument, "Error fetching snapshot from the volumeContentSource") - } - - id := srcSnapshot.GetSnapshotId() - volumeBackup, err := d.client.BlockStorage().GetVolumeBackup(ctx, id) - if err != nil { - if k8sapierrors.IsNotFound(err) { - log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get snapshot with ID %v", id) - return nil, status.Errorf(codes.NotFound, "Failed to get snapshot with ID %v", id) - } - log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch snapshot with ID %v with error %v", id, err) - return nil, status.Errorf(codes.Internal, "Failed to fetch snapshot with ID %v with error %v", id, err) - } - - volumeBackupSize := *volumeBackup.SizeInMBs * client.MiB - if volumeBackupSize < size { - volumeContext[needResize] = "true" - volumeContext[newSize] = strconv.FormatInt(size, 10) - } - - srcSnapshotId = id - } - metric := metrics.PVProvision metricType := util.CSIStorageType if srcSnapshotId != "" { metric = metrics.BlockSnapshotRestore metricType = util.CSIStorageType } + if srcVolumeId != "" { + metric = metrics.PVClone + metricType = util.CSIStorageType + } if availableDomainShortName == "" { metricDimension = util.GetMetricDimensionForComponent(util.ErrValidation, metricType) @@ -377,7 +416,7 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi bvTags = scTags } - provisionedVolume, err = provision(log, d.client, volumeName, size, *ad.Name, d.config.CompartmentID, srcSnapshotId, + provisionedVolume, err = provision(log, d.client, volumeName, size, *ad.Name, d.config.CompartmentID, srcSnapshotId, srcVolumeId, volumeParams.diskEncryptionKey, volumeParams.vpusPerGB, timeout, bvTags) if err != nil { log.With("Ad name", *ad.Name, "Compartment Id", d.config.CompartmentID).With(zap.Error(err)).Error("New volume creation failed.") @@ -391,7 +430,12 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() log.Info("Waiting for volume to become available.") - _, err = d.client.BlockStorage().AwaitVolumeAvailableORTimeout(ctx, *provisionedVolume.Id) + + if srcVolumeId != "" { + _, err = d.client.BlockStorage().AwaitVolumeCloneAvailableOrTimeout(ctx, *provisionedVolume.Id) + } else { + _, err = d.client.BlockStorage().AwaitVolumeAvailableORTimeout(ctx, *provisionedVolume.Id) + } if err != nil { log.With("service", "blockstorage", "verb", "get", "resource", "volume", "statusCode", util.GetHttpStatusCode(err)). With("volumeName", volumeName).Error("Create volume failed with time out") @@ -832,6 +876,7 @@ func (d *BlockVolumeControllerDriver) ControllerGetCapabilities(ctx context.Cont csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + csi.ControllerServiceCapability_RPC_CLONE_VOLUME, } { caps = append(caps, newCap(cap)) } @@ -1185,7 +1230,7 @@ func (d *BlockVolumeControllerDriver) ControllerGetVolume(ctx context.Context, r } func provision(log *zap.SugaredLogger, c client.Interface, volName string, volSize int64, availDomainName, compartmentID, - backupID, kmsKeyID string, vpusPerGB int64, timeout time.Duration, bvTags *config.TagConfig) (core.Volume, error) { + backupID, srcVolumeID, kmsKeyID string, vpusPerGB int64, timeout time.Duration, bvTags *config.TagConfig) (core.Volume, error) { ctx := context.Background() @@ -1205,6 +1250,8 @@ func provision(log *zap.SugaredLogger, c client.Interface, volName string, volSi if backupID != "" { volumeDetails.SourceDetails = &core.VolumeSourceFromVolumeBackupDetails{Id: &backupID} + } else if srcVolumeID != "" { + volumeDetails.SourceDetails = &core.VolumeSourceFromVolumeDetails{Id: &srcVolumeID} } if kmsKeyID != "" { diff --git a/pkg/csi/driver/bv_controller_test.go b/pkg/csi/driver/bv_controller_test.go index bf9a73ace6..fceb59d458 100644 --- a/pkg/csi/driver/bv_controller_test.go +++ b/pkg/csi/driver/bv_controller_test.go @@ -85,6 +85,11 @@ func (MockOCIClient) Identity() client.IdentityInterface { type MockBlockStorageClient struct{} +// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface. +func (c *MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) { + return &core.Volume{}, nil +} + func (c *MockBlockStorageClient) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id string) (*core.VolumeBackup, error) { return &core.VolumeBackup{}, nil } diff --git a/pkg/csi/driver/bv_node.go b/pkg/csi/driver/bv_node.go index f6b642771d..ad945d56d2 100644 --- a/pkg/csi/driver/bv_node.go +++ b/pkg/csi/driver/bv_node.go @@ -30,7 +30,7 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/hostutil" - "github.com/oracle/oci-cloud-controller-manager/pkg/csi-util" + csi_util "github.com/oracle/oci-cloud-controller-manager/pkg/csi-util" "github.com/oracle/oci-cloud-controller-manager/pkg/oci/client" "github.com/oracle/oci-cloud-controller-manager/pkg/util/disk" ) @@ -190,6 +190,17 @@ func (d BlockVolumeNodeDriver) NodeStageVolume(ctx context.Context, req *csi.Nod } } + existingFs, err := mountHandler.GetDiskFormat(devicePath) + if err != nil { + logger.With("devicePath", devicePath, zap.Error(err)).Error("GetDiskFormatFailed") + } + + if existingFs != "" && existingFs != fsType { + returnError := fmt.Sprintf("FS Type mismatch detected. The existing fs type on the volume: %q doesn't match the requested fs type: %q. Please change fs type in PV to match the existing fs type.", existingFs, fsType) + logger.Error(returnError) + return nil, status.Error(codes.Internal, returnError) + } + logger.With("devicePath", devicePath, "fsType", fsType).Info("mounting the volume to staging path.") err = mountHandler.FormatAndMount(devicePath, req.StagingTargetPath, fsType, options) @@ -394,7 +405,7 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N } if needsResize { - logger.Info("Starting to expand volume restored from snapshot") + logger.Info("Starting to expand volume to requested size") requestedSize, err := strconv.ParseInt(req.PublishContext[newSize], 10, 64) if err != nil { diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index 6b26ae0fe4..710dc8a17c 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -39,6 +39,8 @@ const ( PVDelete = "PV_DELETE" // PVExpand is the OCI metric suffix for PV Expand PVExpand = "PV_EXPAND" + // PVClone is the OCI metric for PV Clone + PVClone = "PV_CLONE" // FSSProvision is the OCI metric suffix for FSS provision FSSProvision = "FSS_PROVISION" diff --git a/pkg/oci/client/block_storage.go b/pkg/oci/client/block_storage.go index 220f698a06..f1b42bfa4b 100644 --- a/pkg/oci/client/block_storage.go +++ b/pkg/oci/client/block_storage.go @@ -36,8 +36,9 @@ const ( ) const ( - volumePollInterval = 5 * time.Second + volumePollInterval = 5 * time.Second volumeBackupPollInterval = 5 * time.Second + volumeClonePollInterval = 10 * time.Second // OCIVolumeID is the name of the oci volume id. OCIVolumeID = "ociVolumeID" // OCIVolumeBackupID is the name of the oci volume backup id annotation. @@ -51,6 +52,7 @@ const ( // by the volume provisioner. type BlockStorageInterface interface { AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) + AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) CreateVolume(ctx context.Context, details core.CreateVolumeDetails) (*core.Volume, error) DeleteVolume(ctx context.Context, id string) error GetVolume(ctx context.Context, id string) (*core.Volume, error) @@ -99,7 +101,7 @@ func (c *client) GetVolumeBackup(ctx context.Context, id string) (*core.VolumeBa return &resp.VolumeBackup, nil } -//AwaitVolumeAvailableORTimeout takes context as timeout +// AwaitVolumeAvailableORTimeout takes context as timeout func (c *client) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) { var vol *core.Volume if err := wait.PollImmediateUntil(volumePollInterval, func() (bool, error) { @@ -128,7 +130,7 @@ func (c *client) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) ( return vol, nil } -//AwaitVolumeBackupAvailableOrTimeout takes context as timeout +// AwaitVolumeBackupAvailableOrTimeout takes context as timeout func (c *client) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id string) (*core.VolumeBackup, error) { var volBackup *core.VolumeBackup if err := wait.PollImmediateUntil(volumeBackupPollInterval, func() (bool, error) { @@ -157,6 +159,37 @@ func (c *client) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id str return volBackup, nil } +func (c *client) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) { + var volClone *core.Volume + if err := wait.PollImmediateUntil(volumeClonePollInterval, func() (bool, error) { + var err error + volClone, err = c.GetVolume(ctx, id) + if err != nil { + if !IsRetryable(err) { + return false, err + } + return false, nil + } + + switch state := volClone.LifecycleState; state { + case core.VolumeLifecycleStateAvailable: + if *volClone.IsHydrated == true { + return true, nil + } + return false, nil + case core.VolumeLifecycleStateFaulty, + core.VolumeLifecycleStateTerminated, + core.VolumeLifecycleStateTerminating: + return false, errors.Errorf("Clone volume did not become available and hydrated (lifecycleState=%q) (hydrationStatus=%v)", state, *volClone.IsHydrated) + } + return false, nil + }, ctx.Done()); err != nil { + return nil, err + } + + return volClone, nil +} + func (c *client) CreateVolume(ctx context.Context, details core.CreateVolumeDetails) (*core.Volume, error) { if !c.rateLimiter.Writer.TryAccept() { return nil, RateLimitError(true, "CreateVolume") @@ -238,7 +271,7 @@ func (c *client) DeleteVolumeBackup(ctx context.Context, id string) error { } _, err := c.bs.DeleteVolumeBackup(ctx, core.DeleteVolumeBackupRequest{ - VolumeBackupId: &id, + VolumeBackupId: &id, RequestMetadata: c.requestMetadata}) incRequestCounter(err, deleteVerb, snapshotResource) @@ -306,10 +339,10 @@ func (c *client) GetVolumeBackupsByName(ctx context.Context, snapshotName, compa } listVolumeBackupsResponse, err := c.bs.ListVolumeBackups(ctx, - core.ListVolumeBackupsRequest { + core.ListVolumeBackupsRequest{ CompartmentId: &compartmentID, Page: page, - DisplayName: &snapshotName, + DisplayName: &snapshotName, RequestMetadata: c.requestMetadata, }) @@ -318,7 +351,7 @@ func (c *client) GetVolumeBackupsByName(ctx context.Context, snapshotName, compa } logger := c.logger.With("snapshotName", snapshotName, "CompartmentID", compartmentID, - "OpcRequestId",*(listVolumeBackupsResponse.OpcRequestId)) + "OpcRequestId", *(listVolumeBackupsResponse.OpcRequestId)) logger.Info("OPC Request ID recorded while fetching volume backups by name.") for _, volumeBackup := range listVolumeBackupsResponse.Items { diff --git a/pkg/util/disk/iscsi.go b/pkg/util/disk/iscsi.go index 9f10b5661a..0f827aab33 100644 --- a/pkg/util/disk/iscsi.go +++ b/pkg/util/disk/iscsi.go @@ -23,7 +23,6 @@ import ( "strconv" "go.uber.org/zap" - "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/mount-utils" "k8s.io/utils/exec" ) @@ -89,6 +88,8 @@ type Interface interface { UnmountPath(path string) error Resize(devicePath string, volumePath string) (bool, error) + + GetDiskFormat(devicePath string) (string, error) } // iSCSIMounter implements Interface. @@ -103,7 +104,7 @@ type iSCSIMounter struct { logger *zap.SugaredLogger } -//Disk interface +// Disk interface type Disk struct { IQN string IPv4 string @@ -137,7 +138,7 @@ func New(logger *zap.SugaredLogger, iqn, ipv4 string, port int) Interface { return newWithMounter(logger, mount.New(mountCommand), iqn, ipv4, port) } -//NewFromISCSIDisk creates a new iSCSI handler from ISCSIDisk. +// NewFromISCSIDisk creates a new iSCSI handler from ISCSIDisk. func NewFromISCSIDisk(logger *zap.SugaredLogger, sd *Disk) Interface { return &iSCSIMounter{ disk: sd, @@ -372,6 +373,10 @@ func formatAndMount(source string, target string, fstype string, options []strin return sm.FormatAndMount(source, target, fstype, options) } +func (c *iSCSIMounter) GetDiskFormat(disk string) (string, error) { + return getDiskFormat(c.runner, disk, c.logger) +} + func (c *iSCSIMounter) Mount(source string, target string, fstype string, options []string) error { safeMounter := &mount.SafeFormatAndMount{ Interface: c.mounter, @@ -388,20 +393,6 @@ func (c *iSCSIMounter) DeviceOpened(pathname string) (bool, error) { return deviceOpened(pathname, c.logger) } -func deviceOpened(pathname string, logger *zap.SugaredLogger) (bool, error) { - hostUtil := hostutil.NewHostUtil() - exists, err := hostUtil.PathExists(pathname) - if err != nil { - logger.With(zap.Error(err)).Errorf("Failed to find is path exists %s", pathname) - return false, err - } - if !exists { - logger.Infof("Path does not exist %s", pathname) - return false, nil - } - return hostUtil.DeviceOpened(pathname) -} - func (c *iSCSIMounter) UnmountPath(path string) error { return UnmountPath(c.logger, path, c.mounter) } diff --git a/pkg/util/disk/mount_helper.go b/pkg/util/disk/mount_helper.go index e71bf861bf..7dc4b512d2 100644 --- a/pkg/util/disk/mount_helper.go +++ b/pkg/util/disk/mount_helper.go @@ -24,7 +24,9 @@ import ( "time" "go.uber.org/zap" + "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/mount-utils" + utilexec "k8s.io/utils/exec" ) const ( @@ -148,3 +150,69 @@ func UnmountWithEncrypt(logger *zap.SugaredLogger, target string) error { logger.Debugf("unmount output: %v", string(output)) return nil } + +func getDiskFormat(ex utilexec.Interface, disk string, logger *zap.SugaredLogger) (string, error) { + args := []string{"-p", "-s", "TYPE", "-s", "PTTYPE", "-o", "export", disk} + logger.With(disk).Infof("Attempting to determine if disk %q is formatted using blkid with args: %q", disk, args) + dataOut, err := ex.Command("blkid", args...).CombinedOutput() + output := string(dataOut) + logger.Infof("Output: %q", output) + + if err != nil { + if exit, ok := err.(utilexec.ExitError); ok { + if exit.ExitStatus() == 2 { + // Disk device is unformatted. + // For `blkid`, if the specified token (TYPE/PTTYPE, etc) was + // not found, or no (specified) devices could be identified, an + // exit code of 2 is returned. + return "", nil + } + } + logger.With(disk).Errorf("Could not determine if disk %q is formatted (%v)", disk, err) + return "", err + } + + var fstype, pttype string + + lines := strings.Split(output, "\n") + for _, l := range lines { + if len(l) <= 0 { + // Ignore empty line. + continue + } + cs := strings.Split(l, "=") + if len(cs) != 2 { + return "", fmt.Errorf("blkid returns invalid output: %s", output) + } + // TYPE is filesystem type, and PTTYPE is partition table type, according + // to https://www.kernel.org/pub/linux/utils/util-linux/v2.21/libblkid-docs/. + if cs[0] == "TYPE" { + fstype = cs[1] + } else if cs[0] == "PTTYPE" { + pttype = cs[1] + } + } + + if len(pttype) > 0 { + logger.With(disk).Infof("Disk %s detected partition table type: %s", disk, pttype) + // Returns a special non-empty string as filesystem type, then kubelet + // will not format it. + return "unknown data, probably partitions", nil + } + + return fstype, nil +} + +func deviceOpened(pathname string, logger *zap.SugaredLogger) (bool, error) { + hostUtil := hostutil.NewHostUtil() + exists, err := hostUtil.PathExists(pathname) + if err != nil { + logger.With(zap.Error(err)).Errorf("Failed to find is path exists %s", pathname) + return false, err + } + if !exists { + logger.Infof("Path does not exist %s", pathname) + return false, nil + } + return hostUtil.DeviceOpened(pathname) +} diff --git a/pkg/util/disk/paravirtualized.go b/pkg/util/disk/paravirtualized.go index dcb1a97bd5..865a33fbed 100644 --- a/pkg/util/disk/paravirtualized.go +++ b/pkg/util/disk/paravirtualized.go @@ -30,7 +30,7 @@ type pvMounter struct { logger *zap.SugaredLogger } -//NewFromPVDisk creates a new PV handler from PVDisk. +// NewFromPVDisk creates a new PV handler from PVDisk. func NewFromPVDisk(logger *zap.SugaredLogger) Interface { return &pvMounter{ runner: exec.New(), @@ -97,3 +97,7 @@ func (c *pvMounter) Resize(devicePath string, volumePath string) (bool, error) { resizefs := mount.NewResizeFs(c.runner) return resizefs.Resize(devicePath, volumePath) } + +func (c *pvMounter) GetDiskFormat(disk string) (string, error) { + return getDiskFormat(c.runner, disk, c.logger) +} diff --git a/pkg/volume/provisioner/block/block_test.go b/pkg/volume/provisioner/block/block_test.go index eee47e5c77..6f1171478b 100644 --- a/pkg/volume/provisioner/block/block_test.go +++ b/pkg/volume/provisioner/block/block_test.go @@ -55,6 +55,11 @@ type MockBlockStorageClient struct { VolumeState core.VolumeLifecycleStateEnum } +// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface. +func (c *MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) { + return &core.Volume{}, nil +} + func (c *MockBlockStorageClient) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) { return &core.Volume{ Id: &id, diff --git a/pkg/volume/provisioner/fss/fss_test.go b/pkg/volume/provisioner/fss/fss_test.go index 7f348be781..60444deedb 100644 --- a/pkg/volume/provisioner/fss/fss_test.go +++ b/pkg/volume/provisioner/fss/fss_test.go @@ -55,6 +55,11 @@ type MockBlockStorageClient struct { VolumeState core.VolumeLifecycleStateEnum } +// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface. +func (*MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) { + return &core.Volume{}, nil +} + func (c *MockBlockStorageClient) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) { return &core.Volume{ Id: &id, diff --git a/test/e2e/cloud-provider-oci/csi_volume_cloning.go b/test/e2e/cloud-provider-oci/csi_volume_cloning.go new file mode 100644 index 0000000000..1e0ef125c4 --- /dev/null +++ b/test/e2e/cloud-provider-oci/csi_volume_cloning.go @@ -0,0 +1,232 @@ +package e2e + +import ( + "time" + + . "github.com/onsi/ginkgo" + + csi_util "github.com/oracle/oci-cloud-controller-manager/pkg/csi-util" + "github.com/oracle/oci-cloud-controller-manager/test/e2e/framework" + v1 "k8s.io/api/core/v1" +) + +var _ = Describe("CSI Volume Creation with PVC datasource", func() { + f := framework.NewDefaultFramework("csi-volume-cloning") + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Create PVC with source PVC name specified in dataSource", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-cloning-basic") + + scName := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", nil, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + srcPod := pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.DeleteAndAwaitPod(f.Namespace.Name, srcPod) + pvcJig.DeleteAndAwaitPVC(f.Namespace.Name, srcPvc.Name) + pvcJig.DeleteAndAwaitPod(f.Namespace.Name, clonePod) + pvcJig.DeleteAndAwaitPVC(f.Namespace.Name, clonePvc.Name) + }) + }) + + Context("[cloudprovider][storage][csi][cloning][expand]", func() { + It("Create Clone PVC with size greater than the source PVC", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-volume-size-test") + + scName := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", nil, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MaxVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckVolumeCapacity(framework.MaxVolumeBlock, clonePvc.Name, f.Namespace.Name) + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckExpandedVolumeReadWrite(f.Namespace.Name, clonePod) + pvcJig.CheckUsableVolumeSizeInsidePod(f.Namespace.Name, clonePod, "98") + }) + }) + + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Should be able to create a clone volume with in-transit encryption", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-cloning-cmek-iscsi-in-transit-e2e-tests") + scParameter := map[string]string{ + framework.KmsKey: setupF.CMEKKMSKey, + framework.AttachmentType: framework.AttachmentTypeParavirtualized, + } + scName := f.CreateStorageClassOrFail(framework.ClassOCIKMS, "blockvolume.csi.oraclecloud.com", scParameter, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + srcPod := pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + pvcJig.CheckCMEKKey(f.Client.BlockStorage(), srcPvc.Name, f.Namespace.Name, setupF.CMEKKMSKey) + pvcJig.CheckAttachmentTypeAndEncryptionType(f.Client.Compute(), srcPvc.Name, f.Namespace.Name, srcPod, framework.AttachmentTypeParavirtualized) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckCMEKKey(f.Client.BlockStorage(), clonePvc.Name, f.Namespace.Name, setupF.CMEKKMSKey) + pvcJig.CheckAttachmentTypeAndEncryptionType(f.Client.Compute(), clonePvc.Name, f.Namespace.Name, clonePod, framework.AttachmentTypeParavirtualized) + + f.VolumeIds = append(f.VolumeIds, srcPvc.Spec.VolumeName, clonePvc.Spec.VolumeName) + _ = f.DeleteStorageClass(framework.ClassOCIKMS) + }) + }) + + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Create PVC with source PVC name specified in dataSource - ISCSI", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-cloning-iscsi-test") + + scParameter := map[string]string{ + framework.KmsKey: setupF.CMEKKMSKey, + framework.AttachmentType: framework.AttachmentTypeISCSI, + } + scName := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", scParameter, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + }) + }) + + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Create PVC with source PVC name specified in dataSource - ParaVirtualized", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-cloning-pv") + + scParameter := map[string]string{ + framework.KmsKey: setupF.CMEKKMSKey, + framework.AttachmentType: framework.AttachmentTypeParavirtualized, + } + scName := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", scParameter, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + }) + }) +}) + +var _ = Describe("CSI Volume Cloning with different storage classes", func() { + f := framework.NewBackupFramework("csi-cloning-sc") + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Should be able to create a clone with different storage class than the source volume - different vpusPerGB", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-volume-sc-test") + + scParameters1 := map[string]string{ + framework.AttachmentType: framework.AttachmentTypeISCSI, + csi_util.VpusPerGB: "20", + } + scName1 := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", scParameters1, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName1, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) // Wait for data to be written to source PV + + scParameters2 := map[string]string{ + framework.AttachmentType: framework.AttachmentTypeISCSI, + csi_util.VpusPerGB: "0", + } + scName2 := f.CreateStorageClassOrFail(framework.ClassOCILowCost, "blockvolume.csi.oraclecloud.com", scParameters2, pvcJig.Labels, "Immediate", true, "Delete", nil) + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName2, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckAttachmentTypeAndEncryptionType(f.Client.Compute(), clonePvc.Name, f.Namespace.Name, clonePod, framework.AttachmentTypeISCSI) + pvcJig.CheckVolumePerformanceLevel(f.BlockStorageClient, clonePvc.Namespace, clonePvc.Name, csi_util.LowCostPerformanceOption) + }) + }) +}) + +var _ = Describe("CSI Volume Cloning with static source Volume", func() { + f := framework.NewBackupFramework("csi-static-cloning") + Context("[cloudprovider][storage][csi][static][cloning]", func() { + It("Create Clone PVC from a statically created source volume", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-static-cloning-test") + + scName := f.CreateStorageClassOrFail(framework.ClassOCICSI, "blockvolume.csi.oraclecloud.com", + nil, pvcJig.Labels, "WaitForFirstConsumer", false, "Delete", nil) + + compartmentId := "" + if setupF.Compartment1 != "" { + compartmentId = setupF.Compartment1 + } else if f.CloudProviderConfig.CompartmentID != "" { + compartmentId = f.CloudProviderConfig.CompartmentID + } else if f.CloudProviderConfig.Auth.CompartmentID != "" { + compartmentId = f.CloudProviderConfig.Auth.CompartmentID + } else { + framework.Failf("Compartment Id undefined.") + } + srcPvc, volumeId := pvcJig.CreateAndAwaitStaticPVCOrFailCSI(f.BlockStorageClient, f.Namespace.Name, framework.MinVolumeBlock, 10, scName, setupF.AdLocation, compartmentId, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + f.VolumeIds = append(f.VolumeIds, srcPvc.Spec.VolumeName) + pvcJig.NewPodForCSI("app1", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(90 * time.Second) //waiting for pod to up and running + pvcJig.CheckVolumeCapacity("50Gi", srcPvc.Name, f.Namespace.Name) + + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + clonePod := pvcJig.NewPodForCSIClone("app2", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + + f.VolumeIds = append(f.VolumeIds, volumeId) + }) + }) +}) + +var _ = Describe("CSI Volume Cloning Performance Level", func() { + f := framework.NewBackupFramework("csi-cloning-perf") + Context("[cloudprovider][storage][csi][cloning]", func() { + It("Create high performance clone from a low performance source volume", func() { + pvcJig := framework.NewPVCTestJig(f.ClientSet, "csi-cloning-perf-test") + + scName1 := f.CreateStorageClassOrFail(framework.ClassOCILowCost, "blockvolume.csi.oraclecloud.com", + map[string]string{framework.AttachmentType: framework.AttachmentTypeISCSI, csi_util.VpusPerGB: "0"}, + pvcJig.Labels, "WaitForFirstConsumer", true, "Delete", nil) + srcPvc := pvcJig.CreateAndAwaitPVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName1, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + pvcJig.NewPodForCSI("low-cost-source-pvc-app", f.Namespace.Name, srcPvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) //waiting for pod to up and running + + pvcJig.CheckVolumePerformanceLevel(f.BlockStorageClient, srcPvc.Namespace, srcPvc.Name, csi_util.LowCostPerformanceOption) + + scName2 := f.CreateStorageClassOrFail(framework.ClassOCIHigh, "blockvolume.csi.oraclecloud.com", + map[string]string{framework.AttachmentType: framework.AttachmentTypeISCSI, csi_util.VpusPerGB: "20"}, + pvcJig.Labels, "WaitForFirstConsumer", true, "Delete", nil) + clonePvc := pvcJig.CreateAndAwaitClonePVCOrFailCSI(f.Namespace.Name, framework.MinVolumeBlock, scName2, srcPvc.Name, nil, v1.PersistentVolumeFilesystem, v1.ReadWriteOnce, v1.ClaimPending) + clonePod := pvcJig.NewPodForCSIClone("high-cost-clone-pvc-app", f.Namespace.Name, clonePvc.Name, setupF.AdLabel) + + time.Sleep(60 * time.Second) //waiting for pod to up and running + + pvcJig.CheckFileExists(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckFileCorruption(f.Namespace.Name, clonePod, "/data", "testdata.txt") + pvcJig.CheckVolumePerformanceLevel(f.BlockStorageClient, clonePvc.Namespace, clonePvc.Name, csi_util.HigherPerformanceOption) + + f.VolumeIds = append(f.VolumeIds, srcPvc.Spec.VolumeName) + _ = f.DeleteStorageClass(framework.ClassOCILowCost) + }) + }) +}) diff --git a/test/e2e/cloud-provider-oci/csi_volume_creation.go b/test/e2e/cloud-provider-oci/csi_volume_creation.go index 79fc8a5d76..003c016290 100644 --- a/test/e2e/cloud-provider-oci/csi_volume_creation.go +++ b/test/e2e/cloud-provider-oci/csi_volume_creation.go @@ -16,9 +16,10 @@ package e2e import ( "fmt" - v1 "k8s.io/api/core/v1" "time" + v1 "k8s.io/api/core/v1" + . "github.com/onsi/ginkgo" csi_util "github.com/oracle/oci-cloud-controller-manager/pkg/csi-util" "github.com/oracle/oci-cloud-controller-manager/test/e2e/framework" diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 8886633c4c..d2f8d8d747 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -37,6 +37,7 @@ const ( JobCompletionTimeout = 5 * time.Minute deploymentAvailableTimeout = 5 * time.Minute + CloneAvailableTimeout = 10 * time.Minute DefaultClusterKubeconfig = "/tmp/clusterkubeconfig" DefaultCloudConfig = "/tmp/cloudconfig" diff --git a/test/e2e/framework/pvc_util.go b/test/e2e/framework/pvc_util.go index 09ea2ebad2..774434b4ff 100644 --- a/test/e2e/framework/pvc_util.go +++ b/test/e2e/framework/pvc_util.go @@ -44,13 +44,14 @@ import ( ) const ( - KmsKey = "kms-key-id" - AttachmentTypeISCSI = "iscsi" - AttachmentTypeParavirtualized = "paravirtualized" - AttachmentType = "attachment-type" - FstypeKey = "csi.storage.k8s.io/fstype" - DataSourceVolumeSnapshotKind = "VolumeSnapshot" - DataSourceVolumeSnapshotAPIGroup = "snapshot.storage.k8s.io" + KmsKey = "kms-key-id" + AttachmentTypeISCSI = "iscsi" + AttachmentTypeParavirtualized = "paravirtualized" + AttachmentType = "attachment-type" + FstypeKey = "csi.storage.k8s.io/fstype" + DataSourceVolumeSnapshotKind = "VolumeSnapshot" + DataSourceVolumeSnapshotAPIGroup = "snapshot.storage.k8s.io" + DataSourceVolumePVCKind = "PersistentVolumeClaim" ) // PVCTestJig is a jig to help create PVC tests. @@ -137,14 +138,24 @@ func (j *PVCTestJig) pvcAddDataSource(pvc *v1.PersistentVolumeClaim, if pvc != nil { var apiGroupVar = DataSourceVolumeSnapshotAPIGroup pvc.Spec.DataSource = &v1.TypedLocalObjectReference{ - Name: volumeSnapshotName, - Kind: DataSourceVolumeSnapshotKind, + Name: volumeSnapshotName, + Kind: DataSourceVolumeSnapshotKind, APIGroup: &apiGroupVar, } } return pvc } +func (j *PVCTestJig) pvcAddPvcDataSource(pvc *v1.PersistentVolumeClaim, sourcePvcName string) *v1.PersistentVolumeClaim { + if pvc != nil { + pvc.Spec.DataSource = &v1.TypedLocalObjectReference{ + Name: sourcePvcName, + Kind: DataSourceVolumePVCKind, + } + } + return pvc +} + func (j *PVCTestJig) pvcAddVolumeName(pvc *v1.PersistentVolumeClaim, volumeName string) *v1.PersistentVolumeClaim { if pvc != nil { pvc.Spec.VolumeName = volumeName @@ -225,6 +236,15 @@ func (j *PVCTestJig) newPVCTemplateSnapshotSource(namespace, volumeSize, scName return pvc } +func (j *PVCTestJig) newPVCTemplatePVCSource(namespace, volumeSize, scName string, volumeMode v1.PersistentVolumeMode, accessMode v1.PersistentVolumeAccessMode, sourcePvc string) *v1.PersistentVolumeClaim { + pvc := j.CreatePVCTemplate(namespace, volumeSize) + pvc = j.pvcAddAccessMode(pvc, accessMode) + pvc = j.pvcAddStorageClassName(pvc, scName) + pvc = j.pvcAddVolumeMode(pvc, volumeMode) + pvc = j.pvcAddPvcDataSource(pvc, sourcePvc) + return pvc +} + func (j *PVCTestJig) CheckPVCorFail(pvc *v1.PersistentVolumeClaim, tweak func(pvc *v1.PersistentVolumeClaim), namespace, volumeSize string) *v1.PersistentVolumeClaim { if tweak != nil { @@ -278,6 +298,12 @@ func (j *PVCTestJig) CreatePVCorFailCSI(namespace string, volumeSize string, scN return j.CheckPVCorFail(pvc, tweak, namespace, volumeSize) } +func (j *PVCTestJig) CreateClonePVCorFailCSI(namespace, volumeSize, scName, sourcePvc string, + tweak func(pvc *v1.PersistentVolumeClaim), volumeMode v1.PersistentVolumeMode, accessMode v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { + pvc := j.newPVCTemplatePVCSource(namespace, volumeSize, scName, volumeMode, accessMode, sourcePvc) + return j.CheckPVCorFail(pvc, tweak, namespace, volumeSize) +} + // CreatePVCorFailStaticFSS creates a new claim based on the jig's // defaults. Callers can provide a function to tweak the claim object // before it is created. @@ -361,6 +387,12 @@ func (j *PVCTestJig) CreateAndAwaitPVCOrFailCSI(namespace, volumeSize, scName st return j.CheckAndAwaitPVCOrFail(pvc, namespace, expectedPVCPhase) } +func (j *PVCTestJig) CreateAndAwaitClonePVCOrFailCSI(namespace, volumeSize, scName, sourcePvc string, + tweak func(pvc *v1.PersistentVolumeClaim), volumeMode v1.PersistentVolumeMode, accessMode v1.PersistentVolumeAccessMode, expectedPVCPhase v1.PersistentVolumeClaimPhase) *v1.PersistentVolumeClaim { + pvc := j.CreateClonePVCorFailCSI(namespace, volumeSize, scName, sourcePvc, tweak, volumeMode, accessMode) + return j.CheckAndAwaitPVCOrFail(pvc, namespace, expectedPVCPhase) +} + // CreateAndAwaitPVCOrFailDynamicFSS creates a new PVC based on the // jig's defaults, waits for it to become ready, and then sanity checks it and // its dependant resources. Callers can provide a function to tweak the @@ -654,6 +686,59 @@ func (j *PVCTestJig) NewPodForCSI(name string, namespace string, claimName strin return pod.Name } +func (j *PVCTestJig) NewPodForCSIClone(name string, namespace string, claimName string, adLabel string) string { + By("Creating a pod with the claiming PVC created by CSI") + + pod, err := j.KubeClient.CoreV1().Pods(namespace).Create(context.Background(), &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: j.Name, + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: name, + Image: nginx, + VolumeMounts: []v1.VolumeMount{ + { + Name: "persistent-storage", + MountPath: "/data", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "persistent-storage", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: claimName, + }, + }, + }, + }, + NodeSelector: map[string]string{ + plugin.LabelZoneFailureDomain: adLabel, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + Failf("Pod %q Create API error: %v", pod.Name, err) + } + + // Waiting for pod to be running + err = j.waitTimeoutForPodRunningInNamespace(pod.Name, namespace, slowPodStartTimeout) + if err != nil { + Failf("Pod %q is not Running: %v", pod.Name, err) + } + zap.S().With(pod.Namespace).With(pod.Name).Info("CSI POD is created.") + return pod.Name +} + func (j *PVCTestJig) NewPodForCSIWithoutWait(name string, namespace string, claimName string, adLabel string) string { By("Creating a pod with the claiming PVC created by CSI") @@ -1333,6 +1418,34 @@ func (j *PVCTestJig) CheckISCSIQueueDepthOnNode(namespace, podName string) { Expect(strings.TrimSpace(output)).To(Equal("node.session.queue_depth = 128")) } +func (j *PVCTestJig) DeleteAndAwaitPVC(namespace, pvcName string) error { + err := j.KubeClient.CoreV1().PersistentVolumeClaims(namespace).Delete(context.Background(), pvcName, metav1.DeleteOptions{}) + if err != nil { + Failf("Error deleting PVC %s: %v", pvcName, err) + } + + return wait.PollImmediate(Poll, 5*time.Minute, j.pvcDeleted(namespace, pvcName)) +} + +func (j *PVCTestJig) DeleteAndAwaitPod(namespace, podName string) error { + err := j.KubeClient.CoreV1().Pods(namespace).Delete(context.Background(), podName, metav1.DeleteOptions{}) + if err != nil { + Failf("Error deleting Pod %s: %v", podName, err) + } + + return wait.PollImmediate(Poll, 5*time.Minute, func() (done bool, err error) { + _, err = j.KubeClient.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return true, err + } + return false, nil + }) +} + // WaitTimeoutForPVNotFound waits default amount of time for the specified Persistent Volume to be terminated. // If the PV Get api returns IsNotFound then the wait stops and nil is returned. If the Get api returns // an error other than "not found" then that error is returned and the wait stops. @@ -1358,12 +1471,26 @@ func (j *PVCTestJig) CheckPVExists(pvName string) bool { func (j *PVCTestJig) ChangePVReclaimPolicy(pvName string, newReclaimPolicy string) error { Logf("Changing ReclaimPolicy for PV %s to %s.", pvName, newReclaimPolicy) - pvPatchBytes := []byte(fmt.Sprintf("{\"spec\": {\"persistentVolumeReclaimPolicy\": \"%s\"}}",newReclaimPolicy)) - pv, err := j.KubeClient.CoreV1().PersistentVolumes().Patch(context.Background(), pvName,types.StrategicMergePatchType, pvPatchBytes, metav1.PatchOptions{}) + pvPatchBytes := []byte(fmt.Sprintf("{\"spec\": {\"persistentVolumeReclaimPolicy\": \"%s\"}}", newReclaimPolicy)) + pv, err := j.KubeClient.CoreV1().PersistentVolumes().Patch(context.Background(), pvName, types.StrategicMergePatchType, pvPatchBytes, metav1.PatchOptions{}) Logf("ReclaimPolicy for PV %s Updated to %s.", pvName, pv.Spec.PersistentVolumeReclaimPolicy) return err } +func (j *PVCTestJig) pvcDeleted(namespace, pvcName string) wait.ConditionFunc { + return func() (bool, error) { + _, err := j.KubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.Background(), pvcName, metav1.GetOptions{}) + + if apierrors.IsNotFound(err) { + return true, nil // done + } + if err != nil { + return true, err // stop wait with error + } + return false, nil + } +} + func (j *PVCTestJig) pvNotFound(pvName string) wait.ConditionFunc { return func() (bool, error) { _, err := j.KubeClient.CoreV1().PersistentVolumes().Get(context.Background(), pvName, metav1.GetOptions{})