Skip to content

Commit

Permalink
Add snapshot data mover logic in PVC RIA v2.
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Jiang <[email protected]>
  • Loading branch information
Xun Jiang committed Jun 15, 2023
1 parent d6d822c commit a1dead2
Show file tree
Hide file tree
Showing 5 changed files with 607 additions and 41 deletions.
2 changes: 1 addition & 1 deletion internal/backup/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *PVCBackupItemAction) Execute(item runtime.Unstructured, backup *velerov
var itemToUpdate []velero.ResourceIdentifier

if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) {
operationID = label.GetValidName(string(backup.UID) + "." + string(pvc.UID))
operationID = label.GetValidName(string(util.AsyncOperationIDPrefixDataUpload) + string(backup.UID) + "." + string(pvc.UID))
dataUploadLog := p.Log.WithFields(logrus.Fields{
"Source PVC": fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name),
"VolumeSnapshot": fmt.Sprintf("%s/%s", upd.Namespace, upd.Name),
Expand Down
6 changes: 3 additions & 3 deletions internal/backup/pvc_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import (
snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/vmware-tanzu/velero-plugin-for-csi/internal/util"
velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"

"github.com/vmware-tanzu/velero-plugin-for-csi/internal/util"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/builder"
velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
)

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestExecute(t *testing.T) {
velerov1api.BackupNameLabel: "test",
velerov1api.BackupUIDLabel: "",
velerov1api.PVCUIDLabel: "",
util.AsyncOperationIDLabel: ".",
util.AsyncOperationIDLabel: "du-.",
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down
272 changes: 237 additions & 35 deletions internal/restore/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"encoding/json"
"fmt"

jsonpatch "github.com/evanphx/json-patch"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

"github.com/vmware-tanzu/velero-plugin-for-csi/internal/util"
Expand Down Expand Up @@ -110,12 +111,20 @@ func setPVCStorageResourceRequest(pvc *corev1api.PersistentVolumeClaim, restoreS
// Execute modifies the PVC's spec to use the volumesnapshot object as the data source ensuring that the newly provisioned volume
// can be pre-populated with data from the volumesnapshot.
func (p *PVCRestoreItemAction) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) {
var pvc corev1api.PersistentVolumeClaim

var pvc, pvcFromBackup corev1api.PersistentVolumeClaim
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(input.Item.UnstructuredContent(), &pvc); err != nil {
return nil, errors.WithStack(err)
}
p.Log.Infof("Starting PVCRestoreItemAction for PVC %s/%s", pvc.Namespace, pvc.Name)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(input.ItemFromBackup.UnstructuredContent(), &pvcFromBackup); err != nil {
return nil, errors.WithStack(err)
}

logger := p.Log.WithFields(logrus.Fields{
"Action": "PVCRestoreItemAction",
"PVC": pvc.Namespace + "/" + pvc.Name,
"Restore": input.Restore.Namespace + "/" + input.Restore.Name,
})
logger.Info("Starting PVCRestoreItemAction for PVC")

removePVCAnnotations(&pvc,
[]string{AnnBindCompleted, AnnBoundByController, AnnStorageProvisioner, AnnBetaStorageProvisioner, AnnSelectedNode})
Expand All @@ -126,56 +135,49 @@ func (p *PVCRestoreItemAction) Execute(input *velero.RestoreItemActionExecuteInp
pvc.SetNamespace(val)
}

volumeSnapshotName, ok := pvc.Annotations[util.VolumeSnapshotLabel]
if !ok {
p.Log.Infof("Skipping PVCRestoreItemAction for PVC %s/%s, PVC does not have a CSI volumesnapshot.", pvc.Namespace, pvc.Name)
return &velero.RestoreItemActionExecuteOutput{
UpdatedItem: input.Item,
}, nil
}
operationID := ""

if boolptr.IsSetToFalse(input.Restore.Spec.RestorePVs) {
p.Log.Infof("Restore did not request for PVs to be restored from snapshot %s/%s.", input.Restore.Namespace, input.Restore.Name)
logger.Info("Restore did not request for PVs to be restored from snapshot")
pvc.Spec.VolumeName = ""
pvc.Spec.DataSource = nil
pvc.Spec.DataSourceRef = nil
} else {
_, snapClient, err := util.GetClients()
backup, err := p.VeleroClient.VeleroV1().Backups(input.Restore.Namespace).Get(context.Background(),
input.Restore.Spec.BackupName, metav1.GetOptions{})
if err != nil {
return nil, errors.WithStack(err)
logger.Error("Fail to get backup for restore.")
return nil, fmt.Errorf("fail to get backup for restore: %s", err.Error())
}

vs, err := snapClient.SnapshotV1().VolumeSnapshots(pvc.Namespace).Get(context.TODO(), volumeSnapshotName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, fmt.Sprintf("Failed to get Volumesnapshot %s/%s to restore PVC %s/%s", pvc.Namespace, volumeSnapshotName, pvc.Namespace, pvc.Name))
}
if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) {
logger.Info("Start DataMover restore.")

if _, exists := vs.Annotations[util.VolumeSnapshotRestoreSize]; exists {
restoreSize, err := resource.ParseQuantity(vs.Annotations[util.VolumeSnapshotRestoreSize])
operationID = label.GetValidName(string(util.AsyncOperationIDPrefixDataDownload) + string(input.Restore.UID) + "." + string(pvcFromBackup.UID))
dataDownload, err := restoreFromDataUploadResult(context.Background(), input.Restore, &pvc,
operationID, p.Client, p.VeleroClient)
if err != nil {
return nil, errors.Wrapf(err, fmt.Sprintf("Failed to parse %s from annotation on Volumesnapshot %s/%s into restore size",
vs.Annotations[util.VolumeSnapshotRestoreSize], vs.Namespace, vs.Name))
logger.Errorf("Fail to restore from DataUploadResult: %s", err.Error())
return nil, errors.WithStack(err)
}
logger.Infof("DataDownload %s/%s is created successfully.", dataDownload.Namespace, dataDownload.Name)
} else {
if err := restoreFromVolumeSnapshot(&pvc, p.SnapshotClient, logger); err != nil {
logger.Errorf("Failed to restore PVC from VolumeSnapshot.")
return nil, errors.WithStack(err)
}
// It is possible that the volume provider allocated a larger capacity volume than what was requested in the backed up PVC.
// In this scenario the volumesnapshot of the PVC will endup being larger than its requested storage size.
// Such a PVC, on restore as-is, will be stuck attempting to use a Volumesnapshot as a data source for a PVC that
// is not large enough.
// To counter that, here we set the storage request on the PVC to the larger of the PVC's storage request and the size of the
// VolumeSnapshot
setPVCStorageResourceRequest(&pvc, restoreSize, p.Log)
}

resetPVCSpec(&pvc, volumeSnapshotName)
}

pvcMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pvc)
if err != nil {
return nil, errors.WithStack(err)
}
p.Log.Infof("Returning from PVCRestoreItemAction for PVC %s/%s", pvc.Namespace, pvc.Name)
logger.Info("Returning from PVCRestoreItemAction for PVC")

return &velero.RestoreItemActionExecuteOutput{
UpdatedItem: &unstructured.Unstructured{Object: pvcMap},
OperationID: operationID,
}, nil
}

Expand All @@ -189,19 +191,77 @@ func (p *PVCRestoreItemAction) Progress(operationID string, restore *velerov1api
if operationID == "" {
return progress, riav2.InvalidOperationIDError(operationID)
}
logger := p.Log.WithFields(logrus.Fields{
"Action": "PVCRestoreItemAction",
"OperationID": operationID,
"Namespace": restore.Namespace,
})

dataDownload, err := getDataDownload(context.Background(), restore.Namespace, operationID, p.VeleroClient)
if err != nil {
logger.Errorf("fail to get DataDownload: %s", err.Error())
return progress, err
}
if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseNew ||
dataDownload.Status.Phase == "" {
logger.Debugf("DataDownload is still not processed yet. Skip progress update.")
return progress, nil
}

progress.Description = string(dataDownload.Status.Phase)
progress.OperationUnits = "Bytes"
progress.NCompleted = dataDownload.Status.Progress.BytesDone
progress.NTotal = dataDownload.Status.Progress.TotalBytes

if dataDownload.Status.StartTimestamp != nil {
progress.Started = dataDownload.Status.StartTimestamp.Time
}

if dataDownload.Status.CompletionTimestamp != nil {
progress.Updated = dataDownload.Status.CompletionTimestamp.Time
}

if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseCompleted {
progress.Completed = true
} else if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseCanceled {
progress.Completed = true
progress.Err = fmt.Sprintf("DataDownload is canceled")
} else if dataDownload.Status.Phase == velerov2alpha1.DataDownloadPhaseFailed {
progress.Completed = true
progress.Err = dataDownload.Status.Message
}

return progress, nil
}

func (p *PVCRestoreItemAction) Cancel(operationID string, restore *velerov1api.Restore) error {
return nil
if operationID == "" {
return riav2.InvalidOperationIDError(operationID)
}
logger := p.Log.WithFields(logrus.Fields{
"Action": "PVCRestoreItemAction",
"OperationID": operationID,
"Namespace": restore.Namespace,
})

dataDownload, err := getDataDownload(context.Background(), restore.Namespace, operationID, p.VeleroClient)
if err != nil {
logger.Errorf("fail to get DataDownload: %s", err.Error())
return err
}

err = cancelDataDownload(context.Background(), p.VeleroClient, dataDownload)
if err != nil {
logger.Errorf("fail to cancel DataDownload %s: %s", dataDownload.Name, err.Error())
}
return err
}

func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc *corev1api.PersistentVolumeClaim,
kubeClient kubernetes.Interface) (*velerov2alpha1.DataUploadResult, error) {
cmList, err := kubeClient.CoreV1().ConfigMaps(restore.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", velerov1api.BackupNameLabel, label.GetValidName(restore.Spec.BackupName),
util.PVCNamespaceNameLabel, pvc.Namespace+"/"+pvc.Name),
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", util.PVCNamespaceNameLabel, pvc.Namespace+"."+pvc.Name,
velerov1api.BackupNameLabel, label.GetValidName(restore.Spec.BackupName)),
})
if err != nil {
return nil, errors.Wrapf(err, "error to get DataUpload result cm with name %s", restore.Spec.BackupName)
Expand All @@ -228,3 +288,145 @@ func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc

return &result, nil
}

func getDataDownload(ctx context.Context, namespace string, operationID string, veleroClient veleroClientSet.Interface) (*velerov2alpha1.DataDownload, error) {
dataDownloadList, err := veleroClient.VeleroV2alpha1().DataDownloads(namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", util.AsyncOperationIDLabel, operationID),
})
if err != nil {
return nil, errors.Wrap(err, "fail to list DataDownload")
}

if len(dataDownloadList.Items) == 0 {
return nil, errors.Errorf("didn't find DataDownload")
}

if len(dataDownloadList.Items) > 1 {
return nil, errors.Errorf("find multiple DataDownloads")
}

return &dataDownloadList.Items[0], nil
}

func cancelDataDownload(ctx context.Context, veleroClient veleroClientSet.Interface,
dataDownload *velerov2alpha1.DataDownload) error {
oldData, err := json.Marshal(dataDownload)
if err != nil {
return errors.Wrap(err, "fail to marshal origin DataDownload")
}

updatedDataDownload := dataDownload.DeepCopy()
updatedDataDownload.Spec.Cancel = true

newData, err := json.Marshal(updatedDataDownload)
if err != nil {
return errors.Wrap(err, "fail to marshal updated DataDownload")
}

patchData, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return errors.Wrap(err, "fail to create merge patch for DataDownload")
}

_, err = veleroClient.VeleroV2alpha1().DataDownloads(dataDownload.Namespace).Patch(ctx, dataDownload.Name,
types.MergePatchType, patchData, metav1.PatchOptions{})
return err
}

func newDataDownload(restore *velerov1api.Restore, dataUploadResult *velerov2alpha1.DataUploadResult,
pvc *corev1api.PersistentVolumeClaim, operationID string) *velerov2alpha1.DataDownload {
dataDownload := &velerov2alpha1.DataDownload{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov2alpha1.SchemeGroupVersion.String(),
Kind: "DataDownload",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: restore.Namespace,
GenerateName: restore.Name + "-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: velerov1api.SchemeGroupVersion.String(),
Kind: "Restore",
Name: restore.Name,
UID: restore.UID,
Controller: boolptr.True(),
},
},
Labels: map[string]string{
velerov1api.RestoreNameLabel: label.GetValidName(restore.Name),
velerov1api.RestoreUIDLabel: string(restore.UID),
util.AsyncOperationIDLabel: operationID,
},
},
Spec: velerov2alpha1.DataDownloadSpec{
TargetVolume: velerov2alpha1.TargetVolumeSpec{
PVC: pvc.Name,
Namespace: pvc.Namespace,
},
BackupStorageLocation: dataUploadResult.BackupStorageLocation,
DataMover: dataUploadResult.DataMover,
SnapshotID: dataUploadResult.SnapshotID,
SourceNamespace: dataUploadResult.SourceNamespace,
OperationTimeout: restore.Spec.ItemOperationTimeout,
},
}

return dataDownload
}

func restoreFromVolumeSnapshot(pvc *corev1api.PersistentVolumeClaim, snapClient snapshotterClientSet.Interface,
logger logrus.FieldLogger) error {
volumeSnapshotName, ok := pvc.Annotations[util.VolumeSnapshotLabel]
if !ok {
logger.Info("Skipping PVCRestoreItemAction for PVC , PVC does not have a CSI volumesnapshot.")
return nil
}

vs, err := snapClient.SnapshotV1().VolumeSnapshots(pvc.Namespace).Get(context.TODO(), volumeSnapshotName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, fmt.Sprintf("Failed to get Volumesnapshot %s/%s to restore PVC %s/%s", pvc.Namespace, volumeSnapshotName, pvc.Namespace, pvc.Name))
}

if _, exists := vs.Annotations[util.VolumeSnapshotRestoreSize]; exists {
restoreSize, err := resource.ParseQuantity(vs.Annotations[util.VolumeSnapshotRestoreSize])
if err != nil {
return errors.Wrapf(err, fmt.Sprintf("Failed to parse %s from annotation on Volumesnapshot %s/%s into restore size",
vs.Annotations[util.VolumeSnapshotRestoreSize], vs.Namespace, vs.Name))
}
// It is possible that the volume provider allocated a larger capacity volume than what was requested in the backed up PVC.
// In this scenario the volumesnapshot of the PVC will end being larger than its requested storage size.
// Such a PVC, on restore as-is, will be stuck attempting to use a Volumesnapshot as a data source for a PVC that
// is not large enough.
// To counter that, here we set the storage request on the PVC to the larger of the PVC's storage request and the size of the
// VolumeSnapshot
setPVCStorageResourceRequest(pvc, restoreSize, logger)
}

resetPVCSpec(pvc, volumeSnapshotName)

return nil
}

func restoreFromDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc *corev1api.PersistentVolumeClaim,
operationID string, kubeClient kubernetes.Interface, veleroClient veleroClientSet.Interface) (*velerov2alpha1.DataDownload, error) {
dataUploadResult, err := getDataUploadResult(ctx, restore, pvc, kubeClient)
if err != nil {
return nil, errors.Wrapf(err, "fail get DataUploadResult for restore: %s", restore.Name)
}
pvc.Spec.VolumeName = ""
if pvc.Spec.Selector == nil {
pvc.Spec.Selector = &metav1.LabelSelector{}
}
if pvc.Spec.Selector.MatchLabels == nil {
pvc.Spec.Selector.MatchLabels = make(map[string]string)
}
pvc.Spec.Selector.MatchLabels[util.DynamicPVRestoreLabel] = label.GetValidName(fmt.Sprintf("%s.%s", pvc.Namespace, pvc.Name))

dataDownload := newDataDownload(restore, dataUploadResult, pvc, operationID)
_, err = veleroClient.VeleroV2alpha1().DataDownloads(restore.Namespace).Create(ctx, dataDownload, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "fail to create DataDownload")
}

return dataDownload, nil
}
Loading

0 comments on commit a1dead2

Please sign in to comment.