From a19af739bd86c4d42c94c9910b13a6fbfa8aaeb8 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Fri, 15 Mar 2024 16:40:26 +0800 Subject: [PATCH 1/2] Code refactor. 1. use function passed-in context. 2. replace the depracated apimachinery/pkg/util/wait method and variable. 3. remove unneeded function parameter. Signed-off-by: Xun Jiang --- internal/backup/pvc_action.go | 10 +++++----- internal/restore/pvc_action.go | 7 +++++-- internal/util/util.go | 12 ++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/backup/pvc_action.go b/internal/backup/pvc_action.go index 11b02218..3339cf02 100644 --- a/internal/backup/pvc_action.go +++ b/internal/backup/pvc_action.go @@ -253,7 +253,7 @@ func (p *PVCBackupItemAction) Progress(operationID string, backup *velerov1api.B return progress, biav2.InvalidOperationIDError(operationID) } - dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID) + dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID) if err != nil { p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error()) return progress, err @@ -294,7 +294,7 @@ func (p *PVCBackupItemAction) Cancel(operationID string, backup *velerov1api.Bac return biav2.InvalidOperationIDError(operationID) } - dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID) + dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID) if err != nil { p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error()) return err @@ -365,10 +365,10 @@ func createDataUpload(ctx context.Context, backup *velerov1api.Backup, crClient return dataUpload, err } -func getDataUpload(ctx context.Context, backup *velerov1api.Backup, +func getDataUpload(ctx context.Context, crClient crclient.Client, operationID string) (*velerov2alpha1.DataUpload, error) { dataUploadList := new(velerov2alpha1.DataUploadList) - err := crClient.List(context.Background(), dataUploadList, &crclient.ListOptions{ + err := crClient.List(ctx, dataUploadList, &crclient.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}), }) if err != nil { @@ -392,7 +392,7 @@ func cancelDataUpload(ctx context.Context, crClient crclient.Client, updatedDataUpload := dataUpload.DeepCopy() updatedDataUpload.Spec.Cancel = true - err := crClient.Patch(context.Background(), updatedDataUpload, crclient.MergeFrom(dataUpload)) + err := crClient.Patch(ctx, updatedDataUpload, crclient.MergeFrom(dataUpload)) if err != nil { return errors.Wrap(err, "error patch DataUpload") } diff --git a/internal/restore/pvc_action.go b/internal/restore/pvc_action.go index c8f7e670..130bd96b 100644 --- a/internal/restore/pvc_action.go +++ b/internal/restore/pvc_action.go @@ -334,7 +334,10 @@ func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc func getDataDownload(ctx context.Context, namespace string, operationID string, crClient crclient.Client) (*velerov2alpha1.DataDownload, error) { dataDownloadList := new(velerov2alpha1.DataDownloadList) - err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID})}) + err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}), + Namespace: namespace, + }) if err != nil { return nil, errors.Wrap(err, "fail to list DataDownload") } @@ -356,7 +359,7 @@ func cancelDataDownload(ctx context.Context, crClient crclient.Client, updatedDataDownload := dataDownload.DeepCopy() updatedDataDownload.Spec.Cancel = true - err := crClient.Patch(context.Background(), updatedDataDownload, crclient.MergeFrom(dataDownload)) + err := crClient.Patch(ctx, updatedDataDownload, crclient.MergeFrom(dataDownload)) return err } diff --git a/internal/util/util.go b/internal/util/util.go index a96e7c84..5b99fc34 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -236,8 +236,8 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap interval := 5 * time.Second var snapshotContent *snapshotv1api.VolumeSnapshotContent - err := wait.PollImmediate(interval, timeout, func() (bool, error) { - vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(context.TODO(), volSnap.Name, metav1.GetOptions{}) + err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) { + vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(ctx, volSnap.Name, metav1.GetOptions{}) if err != nil { return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volSnap.Namespace, volSnap.Name)) } @@ -247,7 +247,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap return false, nil } - snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(context.TODO(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{}) + snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(ctx, *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{}) if err != nil { return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshotcontent %s for volumesnapshot %s/%s", *vs.Status.BoundVolumeSnapshotContentName, vs.Namespace, vs.Name)) } @@ -267,7 +267,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap }) if err != nil { - if err == wait.ErrWaitTimeout { + if err == wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) { if snapshotContent != nil && snapshotContent.Status != nil && snapshotContent.Status.Error != nil { log.Errorf("Timed out awaiting reconciliation of volumesnapshot, Volumesnapshotcontent %s has error: %v", snapshotContent.Name, *snapshotContent.Status.Error.Message) return nil, errors.Errorf("CSI got timed out with error: %v", *snapshotContent.Status.Error.Message) @@ -483,8 +483,8 @@ func recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent, back } // Check VolumeSnapshotContents is already deleted, before re-creating it. - err = wait.PollImmediate(interval, timeout, func() (bool, error) { - _, err := snapshotClient.VolumeSnapshotContents().Get(context.TODO(), vsc.Name, metav1.GetOptions{}) + err = wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) { + _, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vsc.Name, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { return true, nil From 96c47e7b927c707d7c02a335ad6db3877e4bce32 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Fri, 15 Mar 2024 17:30:52 +0800 Subject: [PATCH 2/2] Only generate one Async Operation for CSI backup. Signed-off-by: Xun Jiang --- internal/backup/pvc_action_test.go | 4 +- internal/backup/volumesnapshot_action.go | 36 +++++++++- .../backup/volumesnapshotcontent_action.go | 71 +------------------ internal/restore/pvc_action_test.go | 19 +++++ internal/util/util.go | 2 +- 5 files changed, 58 insertions(+), 74 deletions(-) diff --git a/internal/backup/pvc_action_test.go b/internal/backup/pvc_action_test.go index 6f6d8026..26cdcfbe 100644 --- a/internal/backup/pvc_action_test.go +++ b/internal/backup/pvc_action_test.go @@ -172,8 +172,8 @@ func TestExecute(t *testing.T) { if boolptr.IsSetToTrue(tc.backup.Spec.SnapshotMoveData) == true { go func() { var vsList *v1.VolumeSnapshotList - err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { - vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(context.Background(), metav1.ListOptions{}) + err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) { + vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(ctx, metav1.ListOptions{}) require.NoError(t, err) if err != nil || len(vsList.Items) == 0 { return false, nil diff --git a/internal/backup/volumesnapshot_action.go b/internal/backup/volumesnapshot_action.go index 98f072ce..a0933693 100644 --- a/internal/backup/volumesnapshot_action.go +++ b/internal/backup/volumesnapshot_action.go @@ -197,6 +197,10 @@ func (p *VolumeSnapshotBackupItemAction) Execute(item runtime.Unstructured, back Namespace: vs.Namespace, Name: vs.Name, }, + { + GroupResource: kuberesource.VolumeSnapshotContents, + Name: vsc.Name, + }, } } @@ -229,7 +233,8 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve return progress, errors.WithStack(err) } - vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get(context.Background(), operationIDParts[1], metav1.GetOptions{}) + vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get( + context.Background(), operationIDParts[1], metav1.GetOptions{}) if err != nil { p.Log.Errorf("error getting volumesnapshot %s/%s: %s", operationIDParts[0], operationIDParts[1], err.Error()) return progress, errors.WithStack(err) @@ -241,13 +246,40 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve } if boolptr.IsSetToTrue(vs.Status.ReadyToUse) { - progress.Completed = true + p.Log.Debugf("VolumeSnapshot %s/%s is ReadyToUse. Continue on querying corresponding VolumeSnapshotContent.", + vs.Namespace, vs.Name) } else if vs.Status.Error != nil { errorMessage := "" if vs.Status.Error.Message != nil { errorMessage = *vs.Status.Error.Message } p.Log.Warnf("VolumeSnapshot has a temporary error %s. Snapshot controller will retry later.", errorMessage) + + return progress, nil + } + + if vs.Status != nil && vs.Status.BoundVolumeSnapshotContentName != nil { + vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get( + context.Background(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{}) + if err != nil { + p.Log.Errorf("error getting VolumeSnapshotContent %s: %s", *vs.Status.BoundVolumeSnapshotContentName, err.Error()) + return progress, errors.WithStack(err) + } + + if vsc.Status == nil { + p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name) + return progress, nil + } + + if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) { + progress.Completed = true + } else if vsc.Status.Error != nil { + progress.Completed = true + if vsc.Status.Error.Message != nil { + progress.Err = *vsc.Status.Error.Message + } + p.Log.Warnf("VolumeSnapshotContent meets an error %s.", progress.Err) + } } return progress, nil diff --git a/internal/backup/volumesnapshotcontent_action.go b/internal/backup/volumesnapshotcontent_action.go index 32ac7557..5df8d21d 100644 --- a/internal/backup/volumesnapshotcontent_action.go +++ b/internal/backup/volumesnapshotcontent_action.go @@ -17,26 +17,18 @@ limitations under the License. package backup import ( - "context" "fmt" - "strings" - "time" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/vmware-tanzu/velero-plugin-for-csi/internal/util" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/kuberesource" - "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/plugin/velero" - biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2" - "github.com/vmware-tanzu/velero/pkg/util/boolptr" ) // VolumeSnapshotContentBackupItemAction is a backup item action plugin to backup @@ -91,25 +83,8 @@ func (p *VolumeSnapshotContentBackupItemAction) Execute(item runtime.Unstructure return nil, nil, "", nil, errors.WithStack(err) } - backupOnGoing := snapCont.GetLabels()[velerov1api.BackupNameLabel] == label.GetValidName(backup.Name) - operationID := "" - var itemToUpdate []velero.ResourceIdentifier - - // Only return Async operation for VSC created for this backup. - if backupOnGoing { - // The operationID is of the form / - operationID = snapCont.Name + "/" + time.Now().Format(time.RFC3339) - itemToUpdate = []velero.ResourceIdentifier{ - { - GroupResource: kuberesource.VolumeSnapshotContents, - Name: snapCont.Name, - }, - } - - } - p.Log.Infof("Returning from VolumeSnapshotContentBackupItemAction with %d additionalItems to backup", len(additionalItems)) - return &unstructured.Unstructured{Object: snapContMap}, additionalItems, operationID, itemToUpdate, nil + return &unstructured.Unstructured{Object: snapContMap}, additionalItems, "", nil, nil } func (p *VolumeSnapshotContentBackupItemAction) Name() string { @@ -117,49 +92,7 @@ func (p *VolumeSnapshotContentBackupItemAction) Name() string { } func (p *VolumeSnapshotContentBackupItemAction) Progress(operationID string, backup *velerov1api.Backup) (velero.OperationProgress, error) { - progress := velero.OperationProgress{} - if operationID == "" { - return progress, biav2.InvalidOperationIDError(operationID) - } - - // The operationId is of the form / - operationsIDParts := strings.Split(operationID, "/") - if len(operationsIDParts) != 2 { - p.Log.WithField("operationID", operationID).Error("Invalid operationID") - return progress, biav2.InvalidOperationIDError(operationID) - } - var err error - if progress.Started, err = time.Parse(time.RFC3339, operationsIDParts[1]); err != nil { - p.Log.Errorf("error parsing operationID's StartedTime part into time %s: %s", operationID, err.Error()) - return progress, errors.WithStack(fmt.Errorf("fail to parse StartedTime: %s", err.Error())) - } - - _, snapshotClient, err := util.GetClients() - if err != nil { - return progress, errors.WithStack(err) - } - - vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get(context.Background(), operationsIDParts[0], metav1.GetOptions{}) - if err != nil { - p.Log.Errorf("error getting volumesnapshotcontent %s: %s", operationsIDParts[0], err.Error()) - return progress, errors.WithStack(err) - } - - if vsc.Status == nil { - p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name) - return progress, nil - } - - if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) { - progress.Completed = true - } else if vsc.Status.Error != nil { - progress.Completed = true - if vsc.Status.Error.Message != nil { - progress.Err = *vsc.Status.Error.Message - } - } - - return progress, nil + return velero.OperationProgress{}, nil } func (p *VolumeSnapshotContentBackupItemAction) Cancel(operationID string, backup *velerov1api.Backup) error { diff --git a/internal/restore/pvc_action_test.go b/internal/restore/pvc_action_test.go index 60431117..a2008db8 100644 --- a/internal/restore/pvc_action_test.go +++ b/internal/restore/pvc_action_test.go @@ -368,6 +368,25 @@ func TestProgress(t *testing.T) { operationID: "testing", expectedErr: "didn't find DataDownload", }, + { + name: "DataDownload is not in the expected namespace", + restore: builder.ForRestore("velero", "test").Result(), + dataDownload: &velerov2alpha1.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataUpload", + APIVersion: velerov2alpha1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "invalid-namespace", + Name: "testing", + Labels: map[string]string{ + velerov1api.AsyncOperationIDLabel: "testing", + }, + }, + }, + operationID: "testing", + expectedErr: "didn't find DataDownload", + }, { name: "DataUpload is found", restore: builder.ForRestore("velero", "test").Result(), diff --git a/internal/util/util.go b/internal/util/util.go index 5b99fc34..9b0c67ad 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -418,7 +418,7 @@ func CleanupVolumeSnapshot(volSnap *snapshotv1api.VolumeSnapshot, snapshotClient } } -// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot +// DeleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot // instance. func DeleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vsc snapshotv1api.VolumeSnapshotContent, backup *velerov1api.Backup, snapshotClient snapshotter.SnapshotV1Interface, logger logrus.FieldLogger) {