Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only generate one Async Operation for CSI backup and some code refactor #231

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/backup/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (p *PVCBackupItemAction) Progress(operationID string, backup *velerov1api.B
return progress, biav2.InvalidOperationIDError(operationID)
}

dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID)
dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID)
if err != nil {
p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error())
return progress, err
Expand Down Expand Up @@ -294,7 +294,7 @@ func (p *PVCBackupItemAction) Cancel(operationID string, backup *velerov1api.Bac
return biav2.InvalidOperationIDError(operationID)
}

dataUpload, err := getDataUpload(context.Background(), backup, p.CRClient, operationID)
dataUpload, err := getDataUpload(context.Background(), p.CRClient, operationID)
if err != nil {
p.Log.Errorf("fail to get DataUpload for backup %s/%s: %s", backup.Namespace, backup.Name, err.Error())
return err
Expand Down Expand Up @@ -365,10 +365,10 @@ func createDataUpload(ctx context.Context, backup *velerov1api.Backup, crClient
return dataUpload, err
}

func getDataUpload(ctx context.Context, backup *velerov1api.Backup,
func getDataUpload(ctx context.Context,
crClient crclient.Client, operationID string) (*velerov2alpha1.DataUpload, error) {
dataUploadList := new(velerov2alpha1.DataUploadList)
err := crClient.List(context.Background(), dataUploadList, &crclient.ListOptions{
err := crClient.List(ctx, dataUploadList, &crclient.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}),
})
if err != nil {
Expand All @@ -392,7 +392,7 @@ func cancelDataUpload(ctx context.Context, crClient crclient.Client,
updatedDataUpload := dataUpload.DeepCopy()
updatedDataUpload.Spec.Cancel = true

err := crClient.Patch(context.Background(), updatedDataUpload, crclient.MergeFrom(dataUpload))
err := crClient.Patch(ctx, updatedDataUpload, crclient.MergeFrom(dataUpload))
if err != nil {
return errors.Wrap(err, "error patch DataUpload")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/backup/pvc_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func TestExecute(t *testing.T) {
if boolptr.IsSetToTrue(tc.backup.Spec.SnapshotMoveData) == true {
go func() {
var vsList *v1.VolumeSnapshotList
err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(context.Background(), metav1.ListOptions{})
err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) {
vsList, err = pvcBIA.SnapshotClient.SnapshotV1().VolumeSnapshots(tc.pvc.Namespace).List(ctx, metav1.ListOptions{})
require.NoError(t, err)
if err != nil || len(vsList.Items) == 0 {
return false, nil
Expand Down
36 changes: 34 additions & 2 deletions internal/backup/volumesnapshot_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func (p *VolumeSnapshotBackupItemAction) Execute(item runtime.Unstructured, back
Namespace: vs.Namespace,
Name: vs.Name,
},
{
GroupResource: kuberesource.VolumeSnapshotContents,
Name: vsc.Name,
},
}
}

Expand Down Expand Up @@ -229,7 +233,8 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve
return progress, errors.WithStack(err)
}

vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get(context.Background(), operationIDParts[1], metav1.GetOptions{})
vs, err := snapshotClient.SnapshotV1().VolumeSnapshots(operationIDParts[0]).Get(
context.Background(), operationIDParts[1], metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting volumesnapshot %s/%s: %s", operationIDParts[0], operationIDParts[1], err.Error())
return progress, errors.WithStack(err)
Expand All @@ -241,13 +246,40 @@ func (p *VolumeSnapshotBackupItemAction) Progress(operationID string, backup *ve
}

if boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
progress.Completed = true
p.Log.Debugf("VolumeSnapshot %s/%s is ReadyToUse. Continue on querying corresponding VolumeSnapshotContent.",
vs.Namespace, vs.Name)
} else if vs.Status.Error != nil {
errorMessage := ""
if vs.Status.Error.Message != nil {
errorMessage = *vs.Status.Error.Message
}
p.Log.Warnf("VolumeSnapshot has a temporary error %s. Snapshot controller will retry later.", errorMessage)

return progress, nil
}

if vs.Status != nil && vs.Status.BoundVolumeSnapshotContentName != nil {
vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get(
context.Background(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting VolumeSnapshotContent %s: %s", *vs.Status.BoundVolumeSnapshotContentName, err.Error())
return progress, errors.WithStack(err)
}

if vsc.Status == nil {
p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name)
return progress, nil
}

if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) {
progress.Completed = true
} else if vsc.Status.Error != nil {
progress.Completed = true
if vsc.Status.Error.Message != nil {
progress.Err = *vsc.Status.Error.Message
}
p.Log.Warnf("VolumeSnapshotContent meets an error %s.", progress.Err)
}
}

return progress, nil
Expand Down
71 changes: 2 additions & 69 deletions internal/backup/volumesnapshotcontent_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,18 @@ limitations under the License.
package backup

import (
"context"
"fmt"
"strings"
"time"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/vmware-tanzu/velero-plugin-for-csi/internal/util"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)

// VolumeSnapshotContentBackupItemAction is a backup item action plugin to backup
Expand Down Expand Up @@ -91,75 +83,16 @@ func (p *VolumeSnapshotContentBackupItemAction) Execute(item runtime.Unstructure
return nil, nil, "", nil, errors.WithStack(err)
}

backupOnGoing := snapCont.GetLabels()[velerov1api.BackupNameLabel] == label.GetValidName(backup.Name)
operationID := ""
var itemToUpdate []velero.ResourceIdentifier

// Only return Async operation for VSC created for this backup.
if backupOnGoing {
// The operationID is of the form <volumesnapshotcontent-name>/<started-time>
operationID = snapCont.Name + "/" + time.Now().Format(time.RFC3339)
itemToUpdate = []velero.ResourceIdentifier{
{
GroupResource: kuberesource.VolumeSnapshotContents,
Name: snapCont.Name,
},
}

}

reasonerjt marked this conversation as resolved.
Show resolved Hide resolved
p.Log.Infof("Returning from VolumeSnapshotContentBackupItemAction with %d additionalItems to backup", len(additionalItems))
return &unstructured.Unstructured{Object: snapContMap}, additionalItems, operationID, itemToUpdate, nil
return &unstructured.Unstructured{Object: snapContMap}, additionalItems, "", nil, nil
}

func (p *VolumeSnapshotContentBackupItemAction) Name() string {
return "VolumeSnapshotContentBackupItemAction"
}

func (p *VolumeSnapshotContentBackupItemAction) Progress(operationID string, backup *velerov1api.Backup) (velero.OperationProgress, error) {
progress := velero.OperationProgress{}
if operationID == "" {
return progress, biav2.InvalidOperationIDError(operationID)
}

// The operationId is of the form <volumesnapshotcontent-name>/<started-time>
operationsIDParts := strings.Split(operationID, "/")
reasonerjt marked this conversation as resolved.
Show resolved Hide resolved
if len(operationsIDParts) != 2 {
p.Log.WithField("operationID", operationID).Error("Invalid operationID")
return progress, biav2.InvalidOperationIDError(operationID)
}
var err error
if progress.Started, err = time.Parse(time.RFC3339, operationsIDParts[1]); err != nil {
p.Log.Errorf("error parsing operationID's StartedTime part into time %s: %s", operationID, err.Error())
return progress, errors.WithStack(fmt.Errorf("fail to parse StartedTime: %s", err.Error()))
}

_, snapshotClient, err := util.GetClients()
if err != nil {
return progress, errors.WithStack(err)
}

vsc, err := snapshotClient.SnapshotV1().VolumeSnapshotContents().Get(context.Background(), operationsIDParts[0], metav1.GetOptions{})
if err != nil {
p.Log.Errorf("error getting volumesnapshotcontent %s: %s", operationsIDParts[0], err.Error())
return progress, errors.WithStack(err)
}

if vsc.Status == nil {
p.Log.Debugf("VolumeSnapshotContent %s has an empty Status. Skip progress update.", vsc.Name)
return progress, nil
}

if boolptr.IsSetToTrue(vsc.Status.ReadyToUse) {
progress.Completed = true
} else if vsc.Status.Error != nil {
progress.Completed = true
if vsc.Status.Error.Message != nil {
progress.Err = *vsc.Status.Error.Message
}
}

return progress, nil
return velero.OperationProgress{}, nil
}

func (p *VolumeSnapshotContentBackupItemAction) Cancel(operationID string, backup *velerov1api.Backup) error {
Expand Down
7 changes: 5 additions & 2 deletions internal/restore/pvc_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func getDataUploadResult(ctx context.Context, restore *velerov1api.Restore, pvc

func getDataDownload(ctx context.Context, namespace string, operationID string, crClient crclient.Client) (*velerov2alpha1.DataDownload, error) {
dataDownloadList := new(velerov2alpha1.DataDownloadList)
err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID})})
err := crClient.List(ctx, dataDownloadList, &crclient.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{velerov1api.AsyncOperationIDLabel: operationID}),
Namespace: namespace,
})
if err != nil {
return nil, errors.Wrap(err, "fail to list DataDownload")
}
Expand All @@ -356,7 +359,7 @@ func cancelDataDownload(ctx context.Context, crClient crclient.Client,
updatedDataDownload := dataDownload.DeepCopy()
updatedDataDownload.Spec.Cancel = true

err := crClient.Patch(context.Background(), updatedDataDownload, crclient.MergeFrom(dataDownload))
err := crClient.Patch(ctx, updatedDataDownload, crclient.MergeFrom(dataDownload))
return err
}

Expand Down
19 changes: 19 additions & 0 deletions internal/restore/pvc_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,25 @@ func TestProgress(t *testing.T) {
operationID: "testing",
expectedErr: "didn't find DataDownload",
},
{
name: "DataDownload is not in the expected namespace",
restore: builder.ForRestore("velero", "test").Result(),
dataDownload: &velerov2alpha1.DataDownload{
TypeMeta: metav1.TypeMeta{
Kind: "DataUpload",
APIVersion: velerov2alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "invalid-namespace",
Name: "testing",
Labels: map[string]string{
velerov1api.AsyncOperationIDLabel: "testing",
},
},
},
operationID: "testing",
expectedErr: "didn't find DataDownload",
},
{
name: "DataUpload is found",
restore: builder.ForRestore("velero", "test").Result(),
Expand Down
14 changes: 7 additions & 7 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
interval := 5 * time.Second
var snapshotContent *snapshotv1api.VolumeSnapshotContent

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(context.TODO(), volSnap.Name, metav1.GetOptions{})
err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) {
vs, err := snapshotClient.VolumeSnapshots(volSnap.Namespace).Get(ctx, volSnap.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volSnap.Namespace, volSnap.Name))
}
Expand All @@ -247,7 +247,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
return false, nil
}

snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(context.TODO(), *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
snapshotContent, err = snapshotClient.VolumeSnapshotContents().Get(ctx, *vs.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshotcontent %s for volumesnapshot %s/%s", *vs.Status.BoundVolumeSnapshotContentName, vs.Namespace, vs.Name))
}
Expand All @@ -267,7 +267,7 @@ func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnap
})

if err != nil {
if err == wait.ErrWaitTimeout {
if err == wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) {
if snapshotContent != nil && snapshotContent.Status != nil && snapshotContent.Status.Error != nil {
log.Errorf("Timed out awaiting reconciliation of volumesnapshot, Volumesnapshotcontent %s has error: %v", snapshotContent.Name, *snapshotContent.Status.Error.Message)
return nil, errors.Errorf("CSI got timed out with error: %v", *snapshotContent.Status.Error.Message)
Expand Down Expand Up @@ -418,7 +418,7 @@ func CleanupVolumeSnapshot(volSnap *snapshotv1api.VolumeSnapshot, snapshotClient
}
}

// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// DeleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// instance.
func DeleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vsc snapshotv1api.VolumeSnapshotContent,
backup *velerov1api.Backup, snapshotClient snapshotter.SnapshotV1Interface, logger logrus.FieldLogger) {
Expand Down Expand Up @@ -483,8 +483,8 @@ func recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent, back
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
_, err := snapshotClient.VolumeSnapshotContents().Get(context.TODO(), vsc.Name, metav1.GetOptions{})
err = wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, func(ctx context.Context) (bool, error) {
_, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vsc.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
Expand Down
Loading