From a51cb8e0102b3eecd118173d10a8ac880b61047e Mon Sep 17 00:00:00 2001 From: Caleb Warren Date: Fri, 27 Sep 2024 15:40:23 -0700 Subject: [PATCH] improve snapshot creation and validation --- extensions/clusters/clusters.go | 68 ++++++-- extensions/etcdsnapshot/etcdsnapshot.go | 200 ++++++++++-------------- 2 files changed, 140 insertions(+), 128 deletions(-) diff --git a/extensions/clusters/clusters.go b/extensions/clusters/clusters.go index 2996b2ac..86cca917 100644 --- a/extensions/clusters/clusters.go +++ b/extensions/clusters/clusters.go @@ -3,7 +3,7 @@ package clusters import ( "context" "fmt" - "strings" + "slices" "time" "github.com/pkg/errors" @@ -400,7 +400,12 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err if err != nil { return } + checkFuncWaitToBeInUpgrade := func(event watch.Event) (bool, error) { + acceptableErrorMessages := []string{ + "Cluster health check failed: Failed to communicate with API server during namespace check", + "the object has been modified", + } clusterUnstructured := event.Object.(*unstructured.Unstructured) summarizedCluster := summary.Summarize(clusterUnstructured) @@ -408,9 +413,9 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err if summarizedCluster.Transitioning && !summarizedCluster.Error && (summarizedCluster.State == clusterStateUpdating || summarizedCluster.State == clusterStateUpgrading) { return true, nil - } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, nil - } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, errors.Wrap(err, clusterErrorStateMessage) } @@ -440,6 +445,10 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro return } checkFuncWaitUpgrade := func(event watch.Event) (bool, error) { + acceptableErrorMessages := []string{ + "Cluster health check failed: Failed to communicate with API server during namespace check", + "the object has been modified", + } clusterUnstructured := event.Object.(*unstructured.Unstructured) summarizedCluster := summary.Summarize(clusterUnstructured) @@ -447,9 +456,9 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro if summarizedCluster.IsReady() { return true, nil - } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, nil - } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, errors.Wrap(err, clusterErrorStateMessage) } @@ -483,12 +492,53 @@ func WaitClusterToBeUpgraded(client *rancher.Client, clusterID string) (err erro return } -func isClusterInaccessible(messages []string) (isInaccessible bool) { - clusterCPErrorMessage := "Cluster health check failed: Failed to communicate with API server during namespace check" // For GKE - clusterModifiedErrorMessage := "the object has been modified" // For provisioning node driver K3s and RKE2 +// WaitOnClusterAfterSnapshot waits for a cluster to finish taking a snapshot and return to an active state. +func WaitOnClusterAfterSnapshot(client *rancher.Client, clusterID string) error { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return err + } + + isTransitioning := cluster.State == nil || cluster.State.Transitioning + + if !isTransitioning { + err = kwait.PollUntilContextTimeout(context.TODO(), defaults.FiveHundredMillisecondTimeout, defaults.OneMinuteTimeout, true, func(ctx context.Context) (bool, error) { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return false, err + } + + // note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots. + if cluster.State == nil { + return false, nil + } + return cluster.State.Transitioning, nil + }) + if err != nil { + return err + } + } + + err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (bool, error) { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return false, err + } + + if cluster.State == nil { + return false, nil + } + // note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots. + + return cluster.State.Name == active, nil + }) + + return err +} +func isClusterInaccessible(messages, acceptableErrors []string) (isInaccessible bool) { for _, message := range messages { - if strings.Contains(message, clusterCPErrorMessage) || strings.Contains(message, clusterModifiedErrorMessage) { + if slices.Contains(acceptableErrors, message) { isInaccessible = true break } diff --git a/extensions/etcdsnapshot/etcdsnapshot.go b/extensions/etcdsnapshot/etcdsnapshot.go index 7a03612f..3c4148aa 100644 --- a/extensions/etcdsnapshot/etcdsnapshot.go +++ b/extensions/etcdsnapshot/etcdsnapshot.go @@ -2,8 +2,9 @@ package etcdsnapshot import ( "context" - "sort" - "strings" + "errors" + "fmt" + "net/url" "time" "github.com/rancher/norman/types" @@ -14,102 +15,44 @@ import ( rancherv1 "github.com/rancher/shepherd/clients/rancher/v1" "github.com/rancher/shepherd/extensions/clusters" "github.com/rancher/shepherd/extensions/defaults" - "github.com/rancher/shepherd/extensions/defaults/stevetypes" "github.com/sirupsen/logrus" kwait "k8s.io/apimachinery/pkg/util/wait" ) const ( ProvisioningSteveResouceType = "provisioning.cattle.io.cluster" + SnapshotSteveResourceType = "rke.cattle.io.etcdsnapshot" + SnapshotClusterNameLabel = "rke.cattle.io/cluster-name" fleetNamespace = "fleet-default" localClusterName = "local" active = "active" readyStatus = "Resource is ready" ) -// GetRKE1Snapshots is a helper function to get the existing snapshots for a downstream RKE1 cluster. -func GetRKE1Snapshots(client *rancher.Client, clusterName string) ([]management.EtcdBackup, error) { - clusterID, err := clusters.GetClusterIDByName(client, clusterName) - if err != nil { - return nil, err - } - - snapshotSteveObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ - Filters: map[string]interface{}{ - "clusterId": clusterID, - }, - }) - if err != nil { - return nil, err - } - - snapshots := []management.EtcdBackup{} - - for _, snapshot := range snapshotSteveObjList.Data { - if strings.Contains(snapshot.Name, clusterID) { - snapshots = append(snapshots, snapshot) - } - } - - sort.Slice(snapshots, func(i, j int) bool { - return snapshots[i].Created > snapshots[j].Created - }) - - return snapshots, nil -} - -// GetRKE2K3SSnapshots is a helper function to get the existing snapshots for a downstream RKE2/K3S cluster. -func GetRKE2K3SSnapshots(client *rancher.Client, clusterName string) ([]rancherv1.SteveAPIObject, error) { - localclusterID, err := clusters.GetClusterIDByName(client, localClusterName) - if err != nil { - return nil, err - } - - steveclient, err := client.Steve.ProxyDownstream(localclusterID) - if err != nil { - return nil, err - } - - snapshotSteveObjList, err := steveclient.SteveType(stevetypes.EtcdSnapshot).List(nil) - if err != nil { - return nil, err - } - - snapshots := []rancherv1.SteveAPIObject{} - - for _, snapshot := range snapshotSteveObjList.Data { - if strings.Contains(snapshot.ObjectMeta.Name, clusterName) { - snapshots = append(snapshots, snapshot) - } - } - - sort.Slice(snapshots, func(i, j int) bool { - return snapshots[i].ObjectMeta.CreationTimestamp.Before(&snapshots[j].ObjectMeta.CreationTimestamp) - }) +// CreateRKE1Snapshot is a helper function to create a snapshot on an RKE1 cluster. Returns error if any. +func CreateRKE1Snapshot(client *rancher.Client, clusterName string) ([]management.EtcdBackup, error) { - return snapshots, nil -} + updateTimestamp := time.Now().UTC() -// CreateRKE1Snapshot is a helper function to create a snapshot on an RKE1 cluster. Returns error if any. -func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { clusterID, err := clusters.GetClusterIDByName(client, clusterName) if err != nil { - return err + return nil, err } clusterResp, err := client.Management.Cluster.ByID(clusterID) if err != nil { - return err + return nil, err } logrus.Infof("Creating snapshot...") err = client.Management.Cluster.ActionBackupEtcd(clusterResp) if err != nil { - return err + return nil, err } + var snapshots []management.EtcdBackup err = kwait.PollUntilContextTimeout(context.TODO(), 5*time.Second, defaults.FiveMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - snapshotSteveObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ + snapshotManagementObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ Filters: map[string]interface{}{ "clusterId": clusterID, }, @@ -118,7 +61,9 @@ func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { return false, nil } - for _, snapshot := range snapshotSteveObjList.Data { + snapshots = []management.EtcdBackup{} + for _, snapshot := range snapshotManagementObjList.Data { + snapshotObj, err := client.Management.EtcdBackup.ByID(snapshot.ID) if err != nil { return false, nil @@ -127,23 +72,36 @@ func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { if snapshotObj.State != active { return false, nil } + + snapshotTime, err := time.Parse(time.RFC3339, snapshot.Created) + if err != nil { + return false, err + } + + // time.Parse doesn't include nanoseconds, but time.Now() does. Rounding up by 1 Second. + snapshotTime = snapshotTime.Add(time.Second) + + if snapshotTime.Compare(updateTimestamp) > -1 { + snapshots = append(snapshots, snapshot) + } + } + + if len(snapshots) == 0 { + return false, nil } - logrus.Infof("All snapshots in the cluster are in an active state!") return true, nil }) - if err != nil { - return err - } - return nil + return snapshots, err } -// CreateRKE2K3SSnapshot is a helper function to create a snapshot on an RKE2 or k3s cluster. Returns error if any. -func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) error { +// CreateRKE2K3SSnapshot is a helper function to create a snapshot on an RKE2 or k3s cluster. +// returns the list of snapshots and an error, if any. +func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) ([]rancherv1.SteveAPIObject, error) { clusterObject, clusterSteveObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) if err != nil { - return err + return nil, err } if clusterObject.Spec.RKEConfig != nil { @@ -165,41 +123,57 @@ func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) error { } logrus.Infof("Creating snapshot...") - _, err = client.Steve.SteveType(clusters.ProvisioningSteveResourceType).Update(clusterSteveObject, clusterObject) + updatedCluster, err := client.Steve.SteveType(clusters.ProvisioningSteveResourceType).Update(clusterSteveObject, clusterObject) if err != nil { - return err + return nil, err + } + + updateTimestamp := time.Now() + err = clusters.WaitOnClusterAfterSnapshot(client, updatedCluster.ID) + if err != nil { + return nil, err } + var snapshots []rancherv1.SteveAPIObject + err = kwait.PollUntilContextTimeout(context.TODO(), 5*time.Second, defaults.FiveMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - snapshotSteveObjList, err := client.Steve.SteveType("rke.cattle.io.etcdsnapshot").List(nil) + query, err := url.ParseQuery(fmt.Sprintf("labelSelector=%s=%s", SnapshotClusterNameLabel, clusterName)) if err != nil { return false, nil } - _, clusterSteveObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) + snapshotSteveObjList, err := client.Steve.SteveType(SnapshotSteveResourceType).List(query) if err != nil { return false, nil } + if len(snapshotSteveObjList.Data) == 0 { + return false, nil + } + + snapshots = []rancherv1.SteveAPIObject{} for _, snapshot := range snapshotSteveObjList.Data { - snapshotObj, err := client.Steve.SteveType("rke.cattle.io.etcdsnapshot").ByID(snapshot.ID) + _, err = client.Steve.SteveType(SnapshotSteveResourceType).ByID(snapshot.ID) if err != nil { return false, nil } - if snapshotObj.ObjectMeta.State.Name == active && clusterSteveObject.ObjectMeta.State.Name == active { - logrus.Infof("All snapshots in the cluster are in an active state!") - return true, nil + // snapshot time doesn't include nanoseconds, but time.Now() does. Rounding up by 1 Second. + if snapshot.CreationTimestamp.Time.Add(time.Duration(time.Second)).Compare(updateTimestamp) > -1 { + snapshots = append(snapshots, snapshot) } } - return false, nil + if len(snapshots) == 0 { + return false, nil + } + + return true, nil }) - if err != nil { - return err - } - return nil + // not registering cleanup func; users do not delete snapshots through rancher + + return snapshots, err } // RestoreRKE1Snapshot is a helper function to restore a snapshot on an RKE1 cluster. Returns error if any. @@ -258,62 +232,50 @@ func RestoreRKE1Snapshot(client *rancher.Client, clusterName string, snapshotRes // RestoreRKE2K3SSnapshot is a helper function to restore a snapshot on an RKE2 or k3s cluster. Returns error if any. func RestoreRKE2K3SSnapshot(client *rancher.Client, snapshotRestore *rkev1.ETCDSnapshotRestore, clusterName string) error { - clusterObject, existingSteveAPIObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) + _, existingSteveAPIObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) if err != nil { return err } + steveWithUpdates := existingSteveAPIObject - clusterObject.Spec.RKEConfig.ETCDSnapshotRestore = snapshotRestore - - logrus.Infof("Restoring snapshot: %v", snapshotRestore.Name) - updatedCluster, err := client.Steve.SteveType(ProvisioningSteveResouceType).Update(existingSteveAPIObject, clusterObject) + clusterSpec := &apisV1.ClusterSpec{} + err = rancherv1.ConvertToK8sType(steveWithUpdates.Spec, clusterSpec) if err != nil { return err } - err = kwait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, defaults.OneMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - clusterResp, err := client.Steve.SteveType(ProvisioningSteveResouceType).ByID(updatedCluster.ID) - if err != nil { - return false, err - } - - clusterStatus := &apisV1.ClusterStatus{} - err = rancherv1.ConvertToK8sType(clusterResp.Status, clusterStatus) - if err != nil { - return false, err - } + clusterSpec.RKEConfig.ETCDSnapshotRestore = snapshotRestore - if clusterResp.ObjectMeta.State.Name != active { - return true, nil - } + steveWithUpdates.Spec = clusterSpec - return false, nil - }) + logrus.Infof("Restoring snapshot: %v", snapshotRestore.Name) + updatedCluster, err := client.Steve.SteveType(ProvisioningSteveResouceType).Update(existingSteveAPIObject, steveWithUpdates) if err != nil { return err } - err = kwait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { + err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.TwoMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { clusterResp, err := client.Steve.SteveType(ProvisioningSteveResouceType).ByID(updatedCluster.ID) if err != nil { return false, err } + if clusterResp.State.Error { + return false, errors.New(clusterResp.State.Message) + } + clusterStatus := &apisV1.ClusterStatus{} err = rancherv1.ConvertToK8sType(clusterResp.Status, clusterStatus) if err != nil { return false, err } - if clusterResp.ObjectMeta.State.Name == active { + if clusterResp.State.Message == "waiting for etcd restore" { return true, nil } return false, nil }) - if err != nil { - return err - } - return nil + return err }