From f9c3ffde4ad8fe4ac5d252957ca3d7da2cfc38f7 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Tue, 13 Jun 2023 20:43:11 +0800 Subject: [PATCH] Add snapshot data mover logic in PVC RIA v2. Signed-off-by: Xun Jiang --- go.mod | 2 +- go.sum | 4 +- internal/backup/pvc_action.go | 2 +- internal/backup/pvc_action_test.go | 6 +- internal/restore/pvc_action.go | 288 ++++++++++++++++++---- internal/restore/pvc_action_test.go | 357 +++++++++++++++++++++++++++- internal/util/labels_annotations.go | 11 + 7 files changed, 620 insertions(+), 50 deletions(-) diff --git a/go.mod b/go.mod index 170183a0..41a20845 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.2 // TODO: need to use velero v1.12.0 after releasing. - github.com/vmware-tanzu/velero v0.0.0-20230612131245-c86018a0f891 + github.com/vmware-tanzu/velero v0.0.0-20230620063525-6f3adcf7282b k8s.io/api v0.25.6 k8s.io/apimachinery v0.25.6 k8s.io/client-go v0.25.6 diff --git a/go.sum b/go.sum index a65f72d8..414bb8df 100644 --- a/go.sum +++ b/go.sum @@ -658,8 +658,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tg123/go-htpasswd v1.2.1 h1:i4wfsX1KvvkyoMiHZzjS0VzbAPWfxzI8INcZAKtutoU= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/vmware-tanzu/velero v0.0.0-20230612131245-c86018a0f891 h1:74uNecHF4T8FIQS9b39jgV4zPtb3fsKIjXudwTF7wS8= -github.com/vmware-tanzu/velero v0.0.0-20230612131245-c86018a0f891/go.mod h1:URz7drKyNV58T6MuVdDH/iATs3/DorJI+IEVPZwdHZE= +github.com/vmware-tanzu/velero v0.0.0-20230620063525-6f3adcf7282b h1:mrsiuap4DAaF5ukPvJrFAiELcuHmLF2QVswJpIyVSP8= +github.com/vmware-tanzu/velero v0.0.0-20230620063525-6f3adcf7282b/go.mod h1:URz7drKyNV58T6MuVdDH/iATs3/DorJI+IEVPZwdHZE= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= diff --git a/internal/backup/pvc_action.go b/internal/backup/pvc_action.go index bd014fab..2ffa2b1c 100644 --- a/internal/backup/pvc_action.go +++ b/internal/backup/pvc_action.go @@ -172,7 +172,7 @@ func (p *PVCBackupItemAction) Execute(item runtime.Unstructured, backup *velerov var itemToUpdate []velero.ResourceIdentifier if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) { - operationID = label.GetValidName(string(backup.UID) + "." + string(pvc.UID)) + operationID = label.GetValidName(string(util.AsyncOperationIDPrefixDataUpload) + string(backup.UID) + "." + string(pvc.UID)) dataUploadLog := p.Log.WithFields(logrus.Fields{ "Source PVC": fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name), "VolumeSnapshot": fmt.Sprintf("%s/%s", upd.Namespace, upd.Name), diff --git a/internal/backup/pvc_action_test.go b/internal/backup/pvc_action_test.go index 90573e79..2c905330 100644 --- a/internal/backup/pvc_action_test.go +++ b/internal/backup/pvc_action_test.go @@ -26,8 +26,6 @@ import ( snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" - velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,10 +33,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" + velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" "github.com/vmware-tanzu/velero/pkg/plugin/velero" ) @@ -89,7 +89,7 @@ func TestExecute(t *testing.T) { velerov1api.BackupNameLabel: "test", velerov1api.BackupUIDLabel: "", velerov1api.PVCUIDLabel: "", - util.AsyncOperationIDLabel: ".", + util.AsyncOperationIDLabel: "du-.", }, OwnerReferences: []metav1.OwnerReference{ { diff --git a/internal/restore/pvc_action.go b/internal/restore/pvc_action.go index a8a7c44a..e5a21e99 100644 --- a/internal/restore/pvc_action.go +++ b/internal/restore/pvc_action.go @@ -21,16 +21,18 @@ import ( "encoding/json" "fmt" + jsonpatch "github.com/evanphx/json-patch" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" "github.com/pkg/errors" "github.com/sirupsen/logrus" - corev1api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" @@ -51,6 +53,10 @@ const ( AnnSelectedNode = "volume.kubernetes.io/selected-node" ) +const ( + GenerateNameRandomLength = 5 +) + // PVCRestoreItemAction is a restore item action plugin for Velero type PVCRestoreItemAction struct { Log logrus.FieldLogger @@ -110,12 +116,20 @@ func setPVCStorageResourceRequest(pvc *corev1api.PersistentVolumeClaim, restoreS // Execute modifies the PVC's spec to use the volumesnapshot object as the data source ensuring that the newly provisioned volume // can be pre-populated with data from the volumesnapshot. func (p *PVCRestoreItemAction) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) { - var pvc corev1api.PersistentVolumeClaim - + var pvc, pvcFromBackup corev1api.PersistentVolumeClaim if err := runtime.DefaultUnstructuredConverter.FromUnstructured(input.Item.UnstructuredContent(), &pvc); err != nil { return nil, errors.WithStack(err) } - p.Log.Infof("Starting PVCRestoreItemAction for PVC %s/%s", pvc.Namespace, pvc.Name) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(input.ItemFromBackup.UnstructuredContent(), &pvcFromBackup); err != nil { + return nil, errors.WithStack(err) + } + + logger := p.Log.WithFields(logrus.Fields{ + "Action": "PVCRestoreItemAction", + "PVC": pvc.Namespace + "/" + pvc.Name, + "Restore": input.Restore.Namespace + "/" + input.Restore.Name, + }) + logger.Info("Starting PVCRestoreItemAction for PVC") removePVCAnnotations(&pvc, []string{AnnBindCompleted, AnnBoundByController, AnnStorageProvisioner, AnnBetaStorageProvisioner, AnnSelectedNode}) @@ -126,56 +140,49 @@ func (p *PVCRestoreItemAction) Execute(input *velero.RestoreItemActionExecuteInp pvc.SetNamespace(val) } - volumeSnapshotName, ok := pvc.Annotations[util.VolumeSnapshotLabel] - if !ok { - p.Log.Infof("Skipping PVCRestoreItemAction for PVC %s/%s, PVC does not have a CSI volumesnapshot.", pvc.Namespace, pvc.Name) - return &velero.RestoreItemActionExecuteOutput{ - UpdatedItem: input.Item, - }, nil - } + operationID := "" if boolptr.IsSetToFalse(input.Restore.Spec.RestorePVs) { - p.Log.Infof("Restore did not request for PVs to be restored from snapshot %s/%s.", input.Restore.Namespace, input.Restore.Name) + logger.Info("Restore did not request for PVs to be restored from snapshot") pvc.Spec.VolumeName = "" pvc.Spec.DataSource = nil pvc.Spec.DataSourceRef = nil } else { - _, snapClient, err := util.GetClients() + backup, err := p.VeleroClient.VeleroV1().Backups(input.Restore.Namespace).Get(context.Background(), + input.Restore.Spec.BackupName, metav1.GetOptions{}) if err != nil { - return nil, errors.WithStack(err) + logger.Error("Fail to get backup for restore.") + return nil, fmt.Errorf("fail to get backup for restore: %s", err.Error()) } - vs, err := snapClient.SnapshotV1().VolumeSnapshots(pvc.Namespace).Get(context.TODO(), volumeSnapshotName, metav1.GetOptions{}) - if err != nil { - return nil, errors.Wrapf(err, fmt.Sprintf("Failed to get Volumesnapshot %s/%s to restore PVC %s/%s", pvc.Namespace, volumeSnapshotName, pvc.Namespace, pvc.Name)) - } + if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) { + logger.Info("Start DataMover restore.") - if _, exists := vs.Annotations[util.VolumeSnapshotRestoreSize]; exists { - restoreSize, err := resource.ParseQuantity(vs.Annotations[util.VolumeSnapshotRestoreSize]) + operationID = label.GetValidName(string(util.AsyncOperationIDPrefixDataDownload) + string(input.Restore.UID) + "." + string(pvcFromBackup.UID)) + dataDownload, err := restoreFromDataUploadResult(context.Background(), input.Restore, &pvc, + operationID, pvcFromBackup.Namespace, p.Client, p.VeleroClient) if err != nil { - return nil, errors.Wrapf(err, fmt.Sprintf("Failed to parse %s from annotation on Volumesnapshot %s/%s into restore size", - vs.Annotations[util.VolumeSnapshotRestoreSize], vs.Namespace, vs.Name)) + logger.Errorf("Fail to restore from DataUploadResult: %s", err.Error()) + return nil, errors.WithStack(err) + } + logger.Infof("DataDownload %s/%s is created successfully.", dataDownload.Namespace, dataDownload.Name) + } else { + if err := restoreFromVolumeSnapshot(&pvc, p.SnapshotClient, logger); err != nil { + logger.Errorf("Failed to restore PVC from VolumeSnapshot.") + return nil, errors.WithStack(err) } - // It is possible that the volume provider allocated a larger capacity volume than what was requested in the backed up PVC. - // In this scenario the volumesnapshot of the PVC will endup being larger than its requested storage size. - // Such a PVC, on restore as-is, will be stuck attempting to use a Volumesnapshot as a data source for a PVC that - // is not large enough. - // To counter that, here we set the storage request on the PVC to the larger of the PVC's storage request and the size of the - // VolumeSnapshot - setPVCStorageResourceRequest(&pvc, restoreSize, p.Log) } - - resetPVCSpec(&pvc, volumeSnapshotName) } pvcMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pvc) if err != nil { return nil, errors.WithStack(err) } - p.Log.Infof("Returning from PVCRestoreItemAction for PVC %s/%s", pvc.Namespace, pvc.Name) + logger.Info("Returning from PVCRestoreItemAction for PVC") return &velero.RestoreItemActionExecuteOutput{ UpdatedItem: &unstructured.Unstructured{Object: pvcMap}, + OperationID: operationID, }, nil } @@ -189,12 +196,70 @@ func (p *PVCRestoreItemAction) Progress(operationID string, restore *velerov1api if operationID == "" { return progress, riav2.InvalidOperationIDError(operationID) } + logger := p.Log.WithFields(logrus.Fields{ + "Action": "PVCRestoreItemAction", + "OperationID": operationID, + "Namespace": restore.Namespace, + }) + + dataDownload, err := getDataDownload(context.Background(), restore.Namespace, operationID, p.VeleroClient) + if err != nil { + logger.Errorf("fail to get DataDownload: %s", err.Error()) + return progress, err + } + if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseNew || + dataDownload.Status.Phase == "" { + logger.Debugf("DataDownload is still not processed yet. Skip progress update.") + return progress, nil + } + + progress.Description = string(dataDownload.Status.Phase) + progress.OperationUnits = "Bytes" + progress.NCompleted = dataDownload.Status.Progress.BytesDone + progress.NTotal = dataDownload.Status.Progress.TotalBytes + + if dataDownload.Status.StartTimestamp != nil { + progress.Started = dataDownload.Status.StartTimestamp.Time + } + + if dataDownload.Status.CompletionTimestamp != nil { + progress.Updated = dataDownload.Status.CompletionTimestamp.Time + } + + if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseCompleted { + progress.Completed = true + } else if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseCanceled { + progress.Completed = true + progress.Err = fmt.Sprintf("DataDownload is canceled") + } else if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseFailed { + progress.Completed = true + progress.Err = dataDownload.Status.Message + } return progress, nil } func (p *PVCRestoreItemAction) Cancel(operationID string, restore *velerov1api.Restore) error { - return nil + if operationID == "" { + return riav2.InvalidOperationIDError(operationID) + } + logger := p.Log.WithFields(logrus.Fields{ + "Action": "PVCRestoreItemAction", + "OperationID": operationID, + "Namespace": restore.Namespace, + }) + + dataDownload, err := getDataDownload(context.Background(), restore.Namespace, operationID, p.VeleroClient) + if err != nil { + logger.Errorf("fail to get DataDownload: %s", err.Error()) + return err + } + + err = cancelDataDownload(context.Background(), p.VeleroClient, dataDownload) + if err != nil { + logger.Errorf("fail to cancel DataDownload %s: %s", dataDownload.Name, err.Error()) + } + return err } func (p *PVCRestoreItemAction) AreAdditionalItemsReady(additionalItems []velero.ResourceIdentifier, restore *velerov1api.Restore) (bool, error) { @@ -202,21 +267,20 @@ func (p *PVCRestoreItemAction) AreAdditionalItemsReady(additionalItems []velero. } func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc *corev1api.PersistentVolumeClaim, - kubeClient kubernetes.Interface) (*velerov2alpha1.DataUploadResult, error) { - cmList, err := kubeClient.CoreV1().ConfigMaps(restore.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s,%s=%s", velerov1api.BackupNameLabel, label.GetValidName(restore.Spec.BackupName), - util.PVCNamespaceNameLabel, pvc.Namespace+"/"+pvc.Name), - }) + sourceNamespace string, kubeClient kubernetes.Interface) (*velerov2alpha1.DataUploadResult, error) { + labelSelector := fmt.Sprintf("%s=%s,%s=%s", util.PVCNamespaceNameLabel, sourceNamespace+"."+pvc.Name, + velerov1api.RestoreUIDLabel, label.GetValidName(string(restore.UID))) + cmList, err := kubeClient.CoreV1().ConfigMaps(restore.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { - return nil, errors.Wrapf(err, "error to get DataUpload result cm with name %s", restore.Spec.BackupName) + return nil, errors.Wrapf(err, "error to get DataUpload result cm with labels %s", labelSelector) } if len(cmList.Items) == 0 { - return nil, errors.Errorf("no DataUpload result cm found for %s", restore.Spec.BackupName) + return nil, errors.Errorf("no DataUpload result cm found with labels %s", labelSelector) } if len(cmList.Items) > 1 { - return nil, errors.Errorf("multiple DataUpload result cms found for %s", restore.Spec.BackupName) + return nil, errors.Errorf("multiple DataUpload result cms found with labels %s", labelSelector) } jsonBytes, exist := cmList.Items[0].Data[string(restore.UID)] @@ -232,3 +296,145 @@ func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc return &result, nil } + +func getDataDownload(ctx context.Context, namespace string, operationID string, veleroClient veleroClientSet.Interface) (*velerov2alpha1.DataDownload, error) { + dataDownloadList, err := veleroClient.VeleroV2alpha1().DataDownloads(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", util.AsyncOperationIDLabel, operationID), + }) + if err != nil { + return nil, errors.Wrap(err, "fail to list DataDownload") + } + + if len(dataDownloadList.Items) == 0 { + return nil, errors.Errorf("didn't find DataDownload") + } + + if len(dataDownloadList.Items) > 1 { + return nil, errors.Errorf("find multiple DataDownloads") + } + + return &dataDownloadList.Items[0], nil +} + +func cancelDataDownload(ctx context.Context, veleroClient veleroClientSet.Interface, + dataDownload *velerov2alpha1.DataDownload) error { + oldData, err := json.Marshal(dataDownload) + if err != nil { + return errors.Wrap(err, "fail to marshal origin DataDownload") + } + + updatedDataDownload := dataDownload.DeepCopy() + updatedDataDownload.Spec.Cancel = true + + newData, err := json.Marshal(updatedDataDownload) + if err != nil { + return errors.Wrap(err, "fail to marshal updated DataDownload") + } + + patchData, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return errors.Wrap(err, "fail to create merge patch for DataDownload") + } + + _, err = veleroClient.VeleroV2alpha1().DataDownloads(dataDownload.Namespace).Patch(ctx, dataDownload.Name, + types.MergePatchType, patchData, metav1.PatchOptions{}) + return err +} + +func newDataDownload(restore *velerov1api.Restore, dataUploadResult *velerov2alpha1.DataUploadResult, + pvc *corev1api.PersistentVolumeClaim, operationID string) *velerov2alpha1.DataDownload { + dataDownload := &velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov2alpha1.SchemeGroupVersion.String(), + Kind: "DataDownload", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: restore.Namespace, + GenerateName: restore.Name + "-", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: velerov1api.SchemeGroupVersion.String(), + Kind: "Restore", + Name: restore.Name, + UID: restore.UID, + Controller: boolptr.True(), + }, + }, + Labels: map[string]string{ + velerov1api.RestoreNameLabel: label.GetValidName(restore.Name), + velerov1api.RestoreUIDLabel: string(restore.UID), + util.AsyncOperationIDLabel: operationID, + }, + }, + Spec: velerov2alpha1.DataDownloadSpec{ + TargetVolume: velerov2alpha1.TargetVolumeSpec{ + PVC: pvc.Name, + Namespace: pvc.Namespace, + }, + BackupStorageLocation: dataUploadResult.BackupStorageLocation, + DataMover: dataUploadResult.DataMover, + SnapshotID: dataUploadResult.SnapshotID, + SourceNamespace: dataUploadResult.SourceNamespace, + OperationTimeout: restore.Spec.ItemOperationTimeout, + }, + } + + return dataDownload +} + +func restoreFromVolumeSnapshot(pvc *corev1api.PersistentVolumeClaim, snapClient snapshotterClientSet.Interface, + logger logrus.FieldLogger) error { + volumeSnapshotName, ok := pvc.Annotations[util.VolumeSnapshotLabel] + if !ok { + logger.Info("Skipping PVCRestoreItemAction for PVC , PVC does not have a CSI volumesnapshot.") + return nil + } + + vs, err := snapClient.SnapshotV1().VolumeSnapshots(pvc.Namespace).Get(context.TODO(), volumeSnapshotName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, fmt.Sprintf("Failed to get Volumesnapshot %s/%s to restore PVC %s/%s", pvc.Namespace, volumeSnapshotName, pvc.Namespace, pvc.Name)) + } + + if _, exists := vs.Annotations[util.VolumeSnapshotRestoreSize]; exists { + restoreSize, err := resource.ParseQuantity(vs.Annotations[util.VolumeSnapshotRestoreSize]) + if err != nil { + return errors.Wrapf(err, fmt.Sprintf("Failed to parse %s from annotation on Volumesnapshot %s/%s into restore size", + vs.Annotations[util.VolumeSnapshotRestoreSize], vs.Namespace, vs.Name)) + } + // It is possible that the volume provider allocated a larger capacity volume than what was requested in the backed up PVC. + // In this scenario the volumesnapshot of the PVC will end being larger than its requested storage size. + // Such a PVC, on restore as-is, will be stuck attempting to use a Volumesnapshot as a data source for a PVC that + // is not large enough. + // To counter that, here we set the storage request on the PVC to the larger of the PVC's storage request and the size of the + // VolumeSnapshot + setPVCStorageResourceRequest(pvc, restoreSize, logger) + } + + resetPVCSpec(pvc, volumeSnapshotName) + + return nil +} + +func restoreFromDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc *corev1api.PersistentVolumeClaim, + operationID string, sourceNamespace string, kubeClient kubernetes.Interface, veleroClient veleroClientSet.Interface) (*velerov2alpha1.DataDownload, error) { + dataUploadResult, err := getDataUploadResult(ctx, restore, pvc, sourceNamespace, kubeClient) + if err != nil { + return nil, errors.Wrapf(err, "fail get DataUploadResult for restore: %s", restore.Name) + } + pvc.Spec.VolumeName = "" + if pvc.Spec.Selector == nil { + pvc.Spec.Selector = &metav1.LabelSelector{} + } + if pvc.Spec.Selector.MatchLabels == nil { + pvc.Spec.Selector.MatchLabels = make(map[string]string) + } + pvc.Spec.Selector.MatchLabels[util.DynamicPVRestoreLabel] = label.GetValidName(fmt.Sprintf("%s.%s.%s", pvc.Namespace, pvc.Name, utilrand.String(GenerateNameRandomLength))) + + dataDownload := newDataDownload(restore, dataUploadResult, pvc, operationID) + _, err = veleroClient.VeleroV2alpha1().DataDownloads(restore.Namespace).Create(ctx, dataDownload, metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "fail to create DataDownload") + } + + return dataDownload, nil +} diff --git a/internal/restore/pvc_action_test.go b/internal/restore/pvc_action_test.go index 5572d085..ffc1050c 100644 --- a/internal/restore/pvc_action_test.go +++ b/internal/restore/pvc_action_test.go @@ -17,15 +17,30 @@ limitations under the License. package restore import ( + "context" "testing" + "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" - + "github.com/stretchr/testify/require" corev1api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + + "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" + "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/builder" + velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" + "github.com/vmware-tanzu/velero/pkg/plugin/velero" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" ) func TestRemovePVCAnnotations(t *testing.T) { @@ -331,3 +346,341 @@ func TestResetPVCResourceRequest(t *testing.T) { }) } } + +func TestProgress(t *testing.T) { + currentTime := time.Now() + tests := []struct { + name string + restore *velerov1api.Restore + dataDownload *velerov2alpha1.DataDownload + operationID string + expectedErr string + expectedProgress velero.OperationProgress + }{ + { + name: "DataDownload cannot be found", + restore: builder.ForRestore("velero", "test").Result(), + operationID: "testing", + expectedErr: "didn't find DataDownload", + }, + { + name: "DataUpload is found", + restore: builder.ForRestore("velero", "test").Result(), + dataDownload: &velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataUpload", + APIVersion: "v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "testing", + Labels: map[string]string{ + util.AsyncOperationIDLabel: "testing", + }, + }, + Status: velerov2alpha1.DataDownloadStatus{ + Phase: velerov2alpha1.DataDownloadPhaseFailed, + Progress: shared.DataMoveOperationProgress{ + BytesDone: 1000, + TotalBytes: 1000, + }, + StartTimestamp: &metav1.Time{Time: currentTime}, + CompletionTimestamp: &metav1.Time{Time: currentTime}, + Message: "Testing error", + }, + }, + operationID: "testing", + expectedProgress: velero.OperationProgress{ + Completed: true, + Err: "Testing error", + NCompleted: 1000, + NTotal: 1000, + OperationUnits: "Bytes", + Description: "Failed", + Started: currentTime, + Updated: currentTime, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(*testing.T) { + pvcRIA := PVCRestoreItemAction{ + Log: logrus.New(), + VeleroClient: velerofake.NewSimpleClientset(), + } + if tc.dataDownload != nil { + _, err := pvcRIA.VeleroClient.VeleroV2alpha1().DataDownloads(tc.dataDownload.Namespace).Create(context.Background(), tc.dataDownload, metav1.CreateOptions{}) + require.NoError(t, err) + } + + progress, err := pvcRIA.Progress(tc.operationID, tc.restore) + if tc.expectedErr != "" { + require.Equal(t, tc.expectedErr, err.Error()) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expectedProgress, progress) + }) + } +} + +func TestCancel(t *testing.T) { + tests := []struct { + name string + restore *velerov1api.Restore + dataDownload *velerov2alpha1.DataDownload + operationID string + expectedErr string + expectedDataDownload velerov2alpha1.DataDownload + }{ + { + name: "Cancel DataUpload", + restore: builder.ForRestore("velero", "test").Result(), + dataDownload: &velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataDownload", + APIVersion: "v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "testing", + Labels: map[string]string{ + util.AsyncOperationIDLabel: "testing", + }, + }, + }, + operationID: "testing", + expectedErr: "", + expectedDataDownload: velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataDownload", + APIVersion: "v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "testing", + Labels: map[string]string{ + util.AsyncOperationIDLabel: "testing", + }, + }, + Spec: velerov2alpha1.DataDownloadSpec{ + Cancel: true, + }, + }, + }, + { + name: "Cannot find DataUpload", + restore: builder.ForRestore("velero", "test").Result(), + dataDownload: nil, + operationID: "testing", + expectedErr: "didn't find DataDownload", + expectedDataDownload: velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataDownload", + APIVersion: "v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "testing", + Labels: map[string]string{ + util.AsyncOperationIDLabel: "testing", + }, + }, + Spec: velerov2alpha1.DataDownloadSpec{ + Cancel: true, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(*testing.T) { + pvcRIA := PVCRestoreItemAction{ + Log: logrus.New(), + VeleroClient: velerofake.NewSimpleClientset(), + } + if tc.dataDownload != nil { + _, err := pvcRIA.VeleroClient.VeleroV2alpha1().DataDownloads(tc.dataDownload.Namespace).Create(context.Background(), tc.dataDownload, metav1.CreateOptions{}) + require.NoError(t, err) + } + + err := pvcRIA.Cancel(tc.operationID, tc.restore) + if tc.expectedErr != "" { + require.Equal(t, tc.expectedErr, err.Error()) + return + } + require.NoError(t, err) + + resultDataDownload, err := pvcRIA.VeleroClient.VeleroV2alpha1().DataDownloads(tc.dataDownload.Namespace).Get(context.Background(), tc.dataDownload.Name, metav1.GetOptions{}) + require.NoError(t, err) + + require.Equal(t, tc.expectedDataDownload, *resultDataDownload) + }) + } +} + +func TestExecute(t *testing.T) { + tests := []struct { + name string + backup *velerov1api.Backup + restore *velerov1api.Restore + pvc *corev1api.PersistentVolumeClaim + vs *snapshotv1api.VolumeSnapshot + dataUploadResult *corev1api.ConfigMap + expectedErr string + expectedDataDownload *velerov2alpha1.DataDownload + expectedPVC *corev1api.PersistentVolumeClaim + }{ + { + name: "Don't restore PV", + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").RestorePVs(false).Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").Result(), + expectedPVC: builder.ForPersistentVolumeClaim("velero", "testPVC").VolumeName("").Result(), + }, + { + name: "restore's backup cannot be found", + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").Result(), + expectedErr: "fail to get backup for restore: backups.velero.io \"testBackup\" not found", + }, + { + name: "VolumeSnapshot cannot be found", + backup: builder.ForBackup("velero", "testBackup").Result(), + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations(util.VolumeSnapshotLabel, "testVS")).Result(), + expectedErr: "Failed to get Volumesnapshot velero/testVS to restore PVC velero/testPVC: volumesnapshots.snapshot.storage.k8s.io \"testVS\" not found", + }, + { + name: "Restore from VolumeSnapshot", + backup: builder.ForBackup("velero", "testBackup").Result(), + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations(util.VolumeSnapshotLabel, "testVS")).Result(), + vs: builder.ForVolumeSnapshot("velero", "testVS").ObjectMeta(builder.WithAnnotations(util.VolumeSnapshotRestoreSize, "10Gi")).Result(), + expectedPVC: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations("velero.io/volume-snapshot-name", "testVS")).Result(), + }, + { + name: "DataUploadResult cannot be found", + backup: builder.ForBackup("velero", "testBackup").SnapshotMoveData(true).Result(), + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations(util.VolumeSnapshotRestoreSize, "10Gi")).Result(), + expectedPVC: builder.ForPersistentVolumeClaim("velero", "testPVC").Result(), + expectedErr: "fail get DataUploadResult for restore: testRestore: no DataUpload result cm found with labels velero.io/pvc-namespace-name=velero.testPVC,velero.io/restore-uid=", + }, + { + name: "Restore from DataUploadResult", + backup: builder.ForBackup("velero", "testBackup").SnapshotMoveData(true).Result(), + restore: builder.ForRestore("velero", "testRestore").Backup("testBackup").ObjectMeta(builder.WithUID("uid")).Result(), + pvc: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations(util.VolumeSnapshotRestoreSize, "10Gi")).Result(), + dataUploadResult: builder.ForConfigMap("velero", "testCM").Data("uid", "{}").ObjectMeta(builder.WithLabels(velerov1api.RestoreUIDLabel, "uid", util.PVCNamespaceNameLabel, "velero.testPVC")).Result(), + expectedPVC: builder.ForPersistentVolumeClaim("velero", "testPVC").ObjectMeta(builder.WithAnnotations("velero.io/vsi-volumesnapshot-restore-size", "10Gi")).Result(), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(*testing.T) { + pvcRIA := PVCRestoreItemAction{ + Log: logrus.New(), + Client: fake.NewSimpleClientset(), + SnapshotClient: snapshotfake.NewSimpleClientset(), + VeleroClient: velerofake.NewSimpleClientset(), + } + input := new(velero.RestoreItemActionExecuteInput) + + // TODO: after needed Velero builder methods are added, need to remove this block. + if tc.name == "Restore from VolumeSnapshot" { + requestMemory, _ := resource.ParseQuantity("10Gi") + tc.expectedPVC.Spec.Resources.Requests = make(map[corev1api.ResourceName]resource.Quantity) + tc.expectedPVC.Spec.Resources.Requests[corev1api.ResourceStorage] = requestMemory + tc.expectedPVC.Spec.DataSource = &corev1api.TypedLocalObjectReference{ + APIGroup: &snapshotv1api.SchemeGroupVersion.Group, + Kind: util.VolumeSnapshotKindName, + Name: "testVS", + } + tc.expectedPVC.Spec.DataSourceRef = &corev1api.TypedLocalObjectReference{ + APIGroup: &snapshotv1api.SchemeGroupVersion.Group, + Kind: util.VolumeSnapshotKindName, + Name: "testVS", + } + } else if tc.name == "Restore from DataUploadResult" { + tc.expectedDataDownload = &velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataDownload", + APIVersion: velerov2alpha1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: tc.restore.Name + "-", + Namespace: "velero", + Labels: map[string]string{ + util.AsyncOperationIDLabel: "dd-uid.", + velerov1api.RestoreNameLabel: tc.restore.Name, + velerov1api.RestoreUIDLabel: string(tc.restore.UID), + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: velerov1api.SchemeGroupVersion.String(), + Kind: "Restore", + Name: tc.restore.Name, + UID: tc.restore.UID, + Controller: boolptr.True(), + }, + }, + }, + Spec: velerov2alpha1.DataDownloadSpec{ + TargetVolume: velerov2alpha1.TargetVolumeSpec{ + PVC: tc.pvc.Name, + Namespace: tc.pvc.Namespace, + }, + }, + } + } + + if tc.pvc != nil { + pvcMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.pvc) + require.NoError(t, err) + + input.Item = &unstructured.Unstructured{Object: pvcMap} + input.ItemFromBackup = &unstructured.Unstructured{Object: pvcMap} + input.Restore = tc.restore + } + + if tc.backup != nil { + _, err := pvcRIA.VeleroClient.VeleroV1().Backups(tc.backup.Namespace).Create(context.Background(), tc.backup, metav1.CreateOptions{}) + require.NoError(t, err) + } + + if tc.vs != nil { + _, err := pvcRIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.vs.Namespace).Create(context.Background(), tc.vs, metav1.CreateOptions{}) + require.NoError(t, err) + } + + if tc.dataUploadResult != nil { + _, err := pvcRIA.Client.CoreV1().ConfigMaps(tc.dataUploadResult.Namespace).Create(context.Background(), tc.dataUploadResult, metav1.CreateOptions{}) + require.NoError(t, err) + } + + output, err := pvcRIA.Execute(input) + if tc.expectedErr != "" { + require.Equal(t, tc.expectedErr, err.Error()) + return + } + require.NoError(t, err) + + if tc.expectedPVC != nil { + pvc := new(corev1api.PersistentVolumeClaim) + err := runtime.DefaultUnstructuredConverter.FromUnstructured(output.UpdatedItem.UnstructuredContent(), pvc) + require.NoError(t, err) + require.Equal(t, tc.expectedPVC.GetObjectMeta(), pvc.GetObjectMeta()) + if pvc.Spec.Selector != nil && pvc.Spec.Selector.MatchLabels != nil { + require.Contains(t, pvc.Spec.Selector.MatchLabels[util.DynamicPVRestoreLabel], tc.pvc.Namespace+"."+tc.pvc.Name) + } + } + if tc.expectedDataDownload != nil { + dataDownload, err := pvcRIA.VeleroClient.VeleroV2alpha1().DataDownloads(tc.expectedDataDownload.Namespace).Get(context.Background(), tc.expectedDataDownload.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, tc.expectedDataDownload, dataDownload) + } + }) + } +} diff --git a/internal/util/labels_annotations.go b/internal/util/labels_annotations.go index 59db96df..f824c2c9 100644 --- a/internal/util/labels_annotations.go +++ b/internal/util/labels_annotations.go @@ -50,4 +50,15 @@ const ( // PVCNameLabel is the label key used to identify the the PVC's namespace and name. // The format is /. PVCNamespaceNameLabel = "velero.io/pvc-namespace-name" + + // DynamicPVRestoreLabel is the label key for dynamic PV restore + DynamicPVRestoreLabel = "velero.io/dynamic-pv-restore" +) + +// TODO: need to use Velero server side type after it's added. +type AsyncOperationIDPrefix string + +const ( + AsyncOperationIDPrefixDataDownload AsyncOperationIDPrefix = "dd-" + AsyncOperationIDPrefixDataUpload AsyncOperationIDPrefix = "du-" )