Skip to content

Commit

Permalink
CSI Volume Cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
YashwantGohokar committed Oct 2, 2023
1 parent e43e445 commit 149c88a
Show file tree
Hide file tree
Showing 15 changed files with 618 additions and 81 deletions.
5 changes: 5 additions & 0 deletions pkg/cloudprovider/providers/oci/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ func (c *MockNetworkLoadBalancerClient) UpdateNetworkSecurityGroups(ctx context.
// MockBlockStorageClient mocks BlockStorage client implementation
type MockBlockStorageClient struct{}

// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface.
func (*MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) {
return nil, nil
}

func (MockBlockStorageClient) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) {
return nil, nil
}
Expand Down
131 changes: 89 additions & 42 deletions pkg/csi/driver/bv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,80 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi
}

availableDomainShortName := ""
if req.AccessibilityRequirements != nil && req.AccessibilityRequirements.Preferred != nil {
volumeName := req.Name

dimensionsMap := make(map[string]string)
dimensionsMap[metrics.ResourceOCIDDimension] = volumeName

srcSnapshotId := ""
srcVolumeId := ""
volumeContentSource := req.GetVolumeContentSource()
if volumeContentSource != nil {
_, isVolumeContentSource_Snapshot := volumeContentSource.GetType().(*csi.VolumeContentSource_Snapshot)
_, isVolumeContentSource_Volume := volumeContentSource.GetType().(*csi.VolumeContentSource_Volume)

if !isVolumeContentSource_Snapshot && !isVolumeContentSource_Volume {
log.Error("Unsupported volumeContentSource")
return nil, status.Error(codes.InvalidArgument, "Unsupported volumeContentSource")
}

if isVolumeContentSource_Snapshot {
srcSnapshot := volumeContentSource.GetSnapshot()
if srcSnapshot == nil {
log.With("volumeSourceType", "snapshot").Error("Error fetching snapshot from the volumeContentSource")
return nil, status.Error(codes.InvalidArgument, "Error fetching snapshot from the volumeContentSource")
}

id := srcSnapshot.GetSnapshotId()
volumeBackup, err := d.client.BlockStorage().GetVolumeBackup(ctx, id)
if err != nil {
if k8sapierrors.IsNotFound(err) {
log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get snapshot with ID %v", id)
return nil, status.Errorf(codes.NotFound, "Failed to get snapshot with ID %v", id)
}
log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch snapshot with ID %v with error %v", id, err)
return nil, status.Errorf(codes.Internal, "Failed to fetch snapshot with ID %v with error %v", id, err)
}

volumeBackupSize := *volumeBackup.SizeInMBs * client.MiB
if volumeBackupSize < size {
volumeContext[needResize] = "true"
volumeContext[newSize] = strconv.FormatInt(size, 10)
}

srcSnapshotId = id
} else {
srcVolume := volumeContentSource.GetVolume()
if srcVolume == nil {
log.With("volumeSourceType", "pvc").Error("Error fetching volume from the volumeContentSource")
return nil, status.Error(codes.InvalidArgument, "Error fetching volume from the volumeContentSource")
}

id := srcVolume.GetVolumeId()
srcBlockVolume, err := d.client.BlockStorage().GetVolume(ctx, id)
if err != nil {
if client.IsNotFound(err) {
log.With("service", "blockstorage", "verb", "get", "resource", "blockVolume", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get volume with ID %v", id)
return nil, status.Errorf(codes.NotFound, "Failed to get volume with ID %v", id)
}
log.With("service", "blockstorage", "verb", "get", "resource", "blockVolume", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch volume with ID %v with error %v", id, err)
return nil, status.Errorf(codes.Internal, "Failed to fetch volume with ID %v with error %v", id, err)
}

availableDomainShortName = *srcBlockVolume.AvailabilityDomain
log.With("AD", availableDomainShortName).Info("Using availability domain of source volume to provision clone volume.")

srcBlockVolumeSize := *srcBlockVolume.SizeInMBs * client.MiB
if srcBlockVolumeSize < size {
volumeContext["needResize"] = "true"
volumeContext["newSize"] = strconv.FormatInt(size, 10)
}

srcVolumeId = id
}
}

if req.AccessibilityRequirements != nil && req.AccessibilityRequirements.Preferred != nil && availableDomainShortName == "" {
for _, t := range req.AccessibilityRequirements.Preferred {
availableDomainShortName, _ = t.Segments[kubeAPI.LabelZoneFailureDomain]
log.With("AD", availableDomainShortName).Info("Using preferred topology for AD.")
Expand All @@ -257,50 +330,16 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi
}
}

volumeName := req.Name

dimensionsMap := make(map[string]string)
dimensionsMap[metrics.ResourceOCIDDimension] = volumeName

srcSnapshotId := ""
volumeContentSource := req.GetVolumeContentSource()
if volumeContentSource != nil {
if _, ok := volumeContentSource.GetType().(*csi.VolumeContentSource_Snapshot); !ok {
log.Error("Unsupported volumeContentSource")
return nil, status.Error(codes.InvalidArgument, "Unsupported volumeContentSource")
}
srcSnapshot := volumeContentSource.GetSnapshot()
if srcSnapshot == nil {
log.Error("Error fetching snapshot from the volumeContentSource")
return nil, status.Error(codes.InvalidArgument, "Error fetching snapshot from the volumeContentSource")
}

id := srcSnapshot.GetSnapshotId()
volumeBackup, err := d.client.BlockStorage().GetVolumeBackup(ctx, id)
if err != nil {
if k8sapierrors.IsNotFound(err) {
log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to get snapshot with ID %v", id)
return nil, status.Errorf(codes.NotFound, "Failed to get snapshot with ID %v", id)
}
log.With("service", "blockstorage", "verb", "get", "resource", "volumeBackup", "statusCode", util.GetHttpStatusCode(err)).Errorf("Failed to fetch snapshot with ID %v with error %v", id, err)
return nil, status.Errorf(codes.Internal, "Failed to fetch snapshot with ID %v with error %v", id, err)
}

volumeBackupSize := *volumeBackup.SizeInMBs * client.MiB
if volumeBackupSize < size {
volumeContext[needResize] = "true"
volumeContext[newSize] = strconv.FormatInt(size, 10)
}

srcSnapshotId = id
}

metric := metrics.PVProvision
metricType := util.CSIStorageType
if srcSnapshotId != "" {
metric = metrics.BlockSnapshotRestore
metricType = util.CSIStorageType
}
if srcVolumeId != "" {
metric = metrics.PVClone
metricType = util.CSIStorageType
}

if availableDomainShortName == "" {
metricDimension = util.GetMetricDimensionForComponent(util.ErrValidation, metricType)
Expand Down Expand Up @@ -377,7 +416,7 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi
bvTags = scTags
}

provisionedVolume, err = provision(log, d.client, volumeName, size, *ad.Name, d.config.CompartmentID, srcSnapshotId,
provisionedVolume, err = provision(log, d.client, volumeName, size, *ad.Name, d.config.CompartmentID, srcSnapshotId, srcVolumeId,
volumeParams.diskEncryptionKey, volumeParams.vpusPerGB, timeout, bvTags)
if err != nil {
log.With("Ad name", *ad.Name, "Compartment Id", d.config.CompartmentID).With(zap.Error(err)).Error("New volume creation failed.")
Expand All @@ -391,7 +430,12 @@ func (d *BlockVolumeControllerDriver) CreateVolume(ctx context.Context, req *csi
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
log.Info("Waiting for volume to become available.")
_, err = d.client.BlockStorage().AwaitVolumeAvailableORTimeout(ctx, *provisionedVolume.Id)

if srcVolumeId != "" {
_, err = d.client.BlockStorage().AwaitVolumeCloneAvailableOrTimeout(ctx, *provisionedVolume.Id)
} else {
_, err = d.client.BlockStorage().AwaitVolumeAvailableORTimeout(ctx, *provisionedVolume.Id)
}
if err != nil {
log.With("service", "blockstorage", "verb", "get", "resource", "volume", "statusCode", util.GetHttpStatusCode(err)).
With("volumeName", volumeName).Error("Create volume failed with time out")
Expand Down Expand Up @@ -832,6 +876,7 @@ func (d *BlockVolumeControllerDriver) ControllerGetCapabilities(ctx context.Cont
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
} {
caps = append(caps, newCap(cap))
}
Expand Down Expand Up @@ -1185,7 +1230,7 @@ func (d *BlockVolumeControllerDriver) ControllerGetVolume(ctx context.Context, r
}

func provision(log *zap.SugaredLogger, c client.Interface, volName string, volSize int64, availDomainName, compartmentID,
backupID, kmsKeyID string, vpusPerGB int64, timeout time.Duration, bvTags *config.TagConfig) (core.Volume, error) {
backupID, srcVolumeID, kmsKeyID string, vpusPerGB int64, timeout time.Duration, bvTags *config.TagConfig) (core.Volume, error) {

ctx := context.Background()

Expand All @@ -1205,6 +1250,8 @@ func provision(log *zap.SugaredLogger, c client.Interface, volName string, volSi

if backupID != "" {
volumeDetails.SourceDetails = &core.VolumeSourceFromVolumeBackupDetails{Id: &backupID}
} else if srcVolumeID != "" {
volumeDetails.SourceDetails = &core.VolumeSourceFromVolumeDetails{Id: &srcVolumeID}
}

if kmsKeyID != "" {
Expand Down
5 changes: 5 additions & 0 deletions pkg/csi/driver/bv_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (MockOCIClient) Identity() client.IdentityInterface {

type MockBlockStorageClient struct{}

// AwaitVolumeCloneAvailableOrTimeout implements client.BlockStorageInterface.
func (c *MockBlockStorageClient) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) {
return &core.Volume{}, nil
}

func (c *MockBlockStorageClient) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id string) (*core.VolumeBackup, error) {
return &core.VolumeBackup{}, nil
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/csi/driver/bv_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/hostutil"

"github.com/oracle/oci-cloud-controller-manager/pkg/csi-util"
csi_util "github.com/oracle/oci-cloud-controller-manager/pkg/csi-util"
"github.com/oracle/oci-cloud-controller-manager/pkg/oci/client"
"github.com/oracle/oci-cloud-controller-manager/pkg/util/disk"
)
Expand Down Expand Up @@ -190,6 +190,17 @@ func (d BlockVolumeNodeDriver) NodeStageVolume(ctx context.Context, req *csi.Nod
}
}

existingFs, err := mountHandler.GetDiskFormat(devicePath)
if err != nil {
logger.With("devicePath", devicePath, zap.Error(err)).Error("GetDiskFormatFailed")
}

if existingFs != "" && existingFs != fsType {
returnError := fmt.Sprintf("FS Type mismatch detected. The existing fs type on the volume: %q doesn't match the requested fs type: %q. Please change fs type in PV to match the existing fs type.", existingFs, fsType)
logger.Error(returnError)
return nil, status.Error(codes.Internal, returnError)
}

logger.With("devicePath", devicePath,
"fsType", fsType).Info("mounting the volume to staging path.")
err = mountHandler.FormatAndMount(devicePath, req.StagingTargetPath, fsType, options)
Expand Down Expand Up @@ -394,7 +405,7 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N
}

if needsResize {
logger.Info("Starting to expand volume restored from snapshot")
logger.Info("Starting to expand volume to requested size")

requestedSize, err := strconv.ParseInt(req.PublishContext[newSize], 10, 64)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
PVDelete = "PV_DELETE"
// PVExpand is the OCI metric suffix for PV Expand
PVExpand = "PV_EXPAND"
// PVClone is the OCI metric for PV Clone
PVClone = "PV_CLONE"

// FSSProvision is the OCI metric suffix for FSS provision
FSSProvision = "FSS_PROVISION"
Expand Down
47 changes: 40 additions & 7 deletions pkg/oci/client/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
)

const (
volumePollInterval = 5 * time.Second
volumePollInterval = 5 * time.Second
volumeBackupPollInterval = 5 * time.Second
volumeClonePollInterval = 10 * time.Second
// OCIVolumeID is the name of the oci volume id.
OCIVolumeID = "ociVolumeID"
// OCIVolumeBackupID is the name of the oci volume backup id annotation.
Expand All @@ -51,6 +52,7 @@ const (
// by the volume provisioner.
type BlockStorageInterface interface {
AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error)
AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error)
CreateVolume(ctx context.Context, details core.CreateVolumeDetails) (*core.Volume, error)
DeleteVolume(ctx context.Context, id string) error
GetVolume(ctx context.Context, id string) (*core.Volume, error)
Expand Down Expand Up @@ -99,7 +101,7 @@ func (c *client) GetVolumeBackup(ctx context.Context, id string) (*core.VolumeBa
return &resp.VolumeBackup, nil
}

//AwaitVolumeAvailableORTimeout takes context as timeout
// AwaitVolumeAvailableORTimeout takes context as timeout
func (c *client) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (*core.Volume, error) {
var vol *core.Volume
if err := wait.PollImmediateUntil(volumePollInterval, func() (bool, error) {
Expand Down Expand Up @@ -128,7 +130,7 @@ func (c *client) AwaitVolumeAvailableORTimeout(ctx context.Context, id string) (
return vol, nil
}

//AwaitVolumeBackupAvailableOrTimeout takes context as timeout
// AwaitVolumeBackupAvailableOrTimeout takes context as timeout
func (c *client) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id string) (*core.VolumeBackup, error) {
var volBackup *core.VolumeBackup
if err := wait.PollImmediateUntil(volumeBackupPollInterval, func() (bool, error) {
Expand Down Expand Up @@ -157,6 +159,37 @@ func (c *client) AwaitVolumeBackupAvailableOrTimeout(ctx context.Context, id str
return volBackup, nil
}

func (c *client) AwaitVolumeCloneAvailableOrTimeout(ctx context.Context, id string) (*core.Volume, error) {
var volClone *core.Volume
if err := wait.PollImmediateUntil(volumeClonePollInterval, func() (bool, error) {
var err error
volClone, err = c.GetVolume(ctx, id)
if err != nil {
if !IsRetryable(err) {
return false, err
}
return false, nil
}

switch state := volClone.LifecycleState; state {
case core.VolumeLifecycleStateAvailable:
if *volClone.IsHydrated == true {
return true, nil
}
return false, nil
case core.VolumeLifecycleStateFaulty,
core.VolumeLifecycleStateTerminated,
core.VolumeLifecycleStateTerminating:
return false, errors.Errorf("Clone volume did not become available and hydrated (lifecycleState=%q) (hydrationStatus=%v)", state, *volClone.IsHydrated)
}
return false, nil
}, ctx.Done()); err != nil {
return nil, err
}

return volClone, nil
}

func (c *client) CreateVolume(ctx context.Context, details core.CreateVolumeDetails) (*core.Volume, error) {
if !c.rateLimiter.Writer.TryAccept() {
return nil, RateLimitError(true, "CreateVolume")
Expand Down Expand Up @@ -238,7 +271,7 @@ func (c *client) DeleteVolumeBackup(ctx context.Context, id string) error {
}

_, err := c.bs.DeleteVolumeBackup(ctx, core.DeleteVolumeBackupRequest{
VolumeBackupId: &id,
VolumeBackupId: &id,
RequestMetadata: c.requestMetadata})
incRequestCounter(err, deleteVerb, snapshotResource)

Expand Down Expand Up @@ -306,10 +339,10 @@ func (c *client) GetVolumeBackupsByName(ctx context.Context, snapshotName, compa
}

listVolumeBackupsResponse, err := c.bs.ListVolumeBackups(ctx,
core.ListVolumeBackupsRequest {
core.ListVolumeBackupsRequest{
CompartmentId: &compartmentID,
Page: page,
DisplayName: &snapshotName,
DisplayName: &snapshotName,
RequestMetadata: c.requestMetadata,
})

Expand All @@ -318,7 +351,7 @@ func (c *client) GetVolumeBackupsByName(ctx context.Context, snapshotName, compa
}

logger := c.logger.With("snapshotName", snapshotName, "CompartmentID", compartmentID,
"OpcRequestId",*(listVolumeBackupsResponse.OpcRequestId))
"OpcRequestId", *(listVolumeBackupsResponse.OpcRequestId))
logger.Info("OPC Request ID recorded while fetching volume backups by name.")

for _, volumeBackup := range listVolumeBackupsResponse.Items {
Expand Down
Loading

0 comments on commit 149c88a

Please sign in to comment.