diff --git a/clients/corral/config.go b/clients/corral/config.go index 7ade971e..bab1d121 100644 --- a/clients/corral/config.go +++ b/clients/corral/config.go @@ -25,6 +25,14 @@ type Packages struct { HasCustomRepo string `json:"hasCustomRepo" yaml:"hasCustomRepo"` } +// Args is a struct that contains arguments to a corral create command, and any updates to the config +// that should be applied before creating the corral +type Args struct { + Name string + PackageName string + Updates map[string]string +} + // PackagesConfig is a function that reads in the corral package object from the config file func PackagesConfig() *Packages { var corralPackages Packages diff --git a/clients/corral/corral.go b/clients/corral/corral.go index cc02aec6..448f079f 100644 --- a/clients/corral/corral.go +++ b/clients/corral/corral.go @@ -4,13 +4,17 @@ import ( "bytes" "fmt" "io" + "os" "os/exec" "regexp" "strings" + "sync" + "time" "github.com/pkg/errors" "github.com/rancher/shepherd/pkg/session" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -70,29 +74,153 @@ func SetCustomRepo(repo string) error { // CreateCorral creates a corral taking the corral name, the package path, and a debug set so if someone wants to view the // corral create log -func CreateCorral(ts *session.Session, corralName, packageName string, debug bool, cleanup bool) ([]byte, error) { +func CreateCorral(ts *session.Session, corralName, packageName string, debug, cleanup bool) ([]byte, error) { + command, err := startCorral(ts, corralName, packageName, debug, cleanup) + if err != nil { + return nil, err + } + + return runAndWaitOnCommand(command) +} + +func runAndWaitOnCommand(command *exec.Cmd) ([]byte, error) { + err := command.Wait() + var msg []byte + if command.Stdout != nil { + msg = command.Stdout.(*bytes.Buffer).Bytes() + } + + if msg != nil { + logrus.Infof("Stdout: %s", string(msg)) + } + + return msg, errors.Wrap(err, "Debug: "+string(msg)) +} + +func startCorral(ts *session.Session, corralName, packageName string, debug, cleanup bool) (*exec.Cmd, error) { ts.RegisterCleanupFunc(func() error { return DeleteCorral(corralName) }) args := []string{"create"} + if !cleanup { args = append(args, skipCleanupFlag) } if debug { args = append(args, debugFlag) } + args = append(args, corralName, packageName) logrus.Infof("Creating corral with the following parameters: %v", args) - // complicated, but running the command in a way that allows us to - // capture the output and error(s) and print it to the console - msg, err := exec.Command("corral", args...).CombinedOutput() - logrus.Infof("Corral create output: %s", string(msg)) + + cmdToRun := exec.Command("corral", args...) + + // create a buffer for stdout/stderr so we can read from it later. commands initiate this to nil by default. + var b bytes.Buffer + cmdToRun.Stdout = &b + cmdToRun.Stderr = &b + err := cmdToRun.Start() + if err != nil { + return nil, err + } + + // this ensures corral is completely initiated. Otherwise, race conditions occur. + err = waitForCorralConfig(corralName) + if err != nil { + return nil, err + } + + return cmdToRun, err +} + +func waitForCorralConfig(corralName string) error { + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 1.1, + Jitter: 0.1, + Steps: 10, + } + + homeDir, err := os.UserHomeDir() if err != nil { - return nil, errors.Wrap(err, "Unable to create corral: "+string(msg)) + return err + } + + corralOSPath := homeDir + "/.corral/corrals/" + corralName + "/corral.yaml" + + return wait.ExponentialBackoff(backoff, func() (finished bool, err error) { + _, err = os.Stat(corralOSPath) + if err != nil { + return false, nil + } + + fileContents, err := os.ReadFile(corralOSPath) + if err != nil { + return false, nil + } + + if len(string(fileContents)) <= 0 { + return false, nil + } + + return true, err + }) +} + +// CreateMultipleCorrals creates corrals taking the corral name, the package path, and a debug set so if someone wants to view the +// corral create log. Using this function implies calling WaitOnCorralWithCombinedOutput to get the output once finished. +func CreateMultipleCorrals(ts *session.Session, commands []Args, debug, cleanup bool) ([][]byte, error) { + var waitGroup sync.WaitGroup + + var msgs [][]byte + var errStrings []string + + for _, currentCommand := range commands { + // break out of any error that comes up before we run the waitGroup, to avoid running if we're already in an error state. + for key, value := range currentCommand.Updates { + logrus.Info(key, ": ", value) + err := UpdateCorralConfig(key, value) + if err != nil { + errStrings = append(errStrings, fmt.Sprint(err.Error(), "Unable to update corral: "+currentCommand.Name+" for "+key+": "+value)) + break + } + } + + cmdToRun, err := startCorral(ts, currentCommand.Name, currentCommand.PackageName, debug, cleanup) + if err != nil { + errStrings = append(errStrings, err.Error()) + break + } + + waitGroup.Add(1) + + go func() { + defer waitGroup.Done() + + msg, err := runAndWaitOnCommand(cmdToRun) + if err != nil { + errStrings = append(errStrings, err.Error()) + } + + msgs = append(msgs, msg) + }() + + } + + waitGroup.Wait() + + var formattedError error + var longString string + if len(errStrings) > 0 { + for _, err := range errStrings { + longString += err + } + formattedError = fmt.Errorf(longString) } - return msg, nil + logrus.Info("done with registration") + return msgs, formattedError } // DeleteCorral deletes a corral based on the corral name diff --git a/extensions/clusters/clusters.go b/extensions/clusters/clusters.go index 2996b2ac..86cca917 100644 --- a/extensions/clusters/clusters.go +++ b/extensions/clusters/clusters.go @@ -3,7 +3,7 @@ package clusters import ( "context" "fmt" - "strings" + "slices" "time" "github.com/pkg/errors" @@ -400,7 +400,12 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err if err != nil { return } + checkFuncWaitToBeInUpgrade := func(event watch.Event) (bool, error) { + acceptableErrorMessages := []string{ + "Cluster health check failed: Failed to communicate with API server during namespace check", + "the object has been modified", + } clusterUnstructured := event.Object.(*unstructured.Unstructured) summarizedCluster := summary.Summarize(clusterUnstructured) @@ -408,9 +413,9 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err if summarizedCluster.Transitioning && !summarizedCluster.Error && (summarizedCluster.State == clusterStateUpdating || summarizedCluster.State == clusterStateUpgrading) { return true, nil - } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, nil - } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, errors.Wrap(err, clusterErrorStateMessage) } @@ -440,6 +445,10 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro return } checkFuncWaitUpgrade := func(event watch.Event) (bool, error) { + acceptableErrorMessages := []string{ + "Cluster health check failed: Failed to communicate with API server during namespace check", + "the object has been modified", + } clusterUnstructured := event.Object.(*unstructured.Unstructured) summarizedCluster := summary.Summarize(clusterUnstructured) @@ -447,9 +456,9 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro if summarizedCluster.IsReady() { return true, nil - } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, nil - } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) { + } else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) { return false, errors.Wrap(err, clusterErrorStateMessage) } @@ -483,12 +492,53 @@ func WaitClusterToBeUpgraded(client *rancher.Client, clusterID string) (err erro return } -func isClusterInaccessible(messages []string) (isInaccessible bool) { - clusterCPErrorMessage := "Cluster health check failed: Failed to communicate with API server during namespace check" // For GKE - clusterModifiedErrorMessage := "the object has been modified" // For provisioning node driver K3s and RKE2 +// WaitOnClusterAfterSnapshot waits for a cluster to finish taking a snapshot and return to an active state. +func WaitOnClusterAfterSnapshot(client *rancher.Client, clusterID string) error { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return err + } + + isTransitioning := cluster.State == nil || cluster.State.Transitioning + + if !isTransitioning { + err = kwait.PollUntilContextTimeout(context.TODO(), defaults.FiveHundredMillisecondTimeout, defaults.OneMinuteTimeout, true, func(ctx context.Context) (bool, error) { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return false, err + } + + // note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots. + if cluster.State == nil { + return false, nil + } + return cluster.State.Transitioning, nil + }) + if err != nil { + return err + } + } + + err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (bool, error) { + cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID) + if err != nil { + return false, err + } + + if cluster.State == nil { + return false, nil + } + // note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots. + + return cluster.State.Name == active, nil + }) + + return err +} +func isClusterInaccessible(messages, acceptableErrors []string) (isInaccessible bool) { for _, message := range messages { - if strings.Contains(message, clusterCPErrorMessage) || strings.Contains(message, clusterModifiedErrorMessage) { + if slices.Contains(acceptableErrors, message) { isInaccessible = true break } diff --git a/extensions/clusters/eks/eks_cluster_config.go b/extensions/clusters/eks/eks_cluster_config.go index 31df892d..53e3e5c5 100644 --- a/extensions/clusters/eks/eks_cluster_config.go +++ b/extensions/clusters/eks/eks_cluster_config.go @@ -46,6 +46,7 @@ type NodeGroupConfig struct { Subnets []string `json:"subnets" yaml:"subnets"` Tags map[string]string `json:"tags" yaml:"tags"` UserData *string `json:"userData,omitempty" yaml:"userData,omitempty"` + Version *string `json:"version,omitempty" yaml:"version,omitempty"` } // LaunchTemplateConfig is the configuration need for a node group launch template @@ -64,6 +65,12 @@ func nodeGroupsConstructor(nodeGroupsConfig *[]NodeGroupConfig, kubernetesVersio Version: nodeGroupConfig.LaunchTemplateConfig.Version, } } + var version *string + if nodeGroupConfig.Version != nil { + version = nodeGroupConfig.Version + } else { + version = &kubernetesVersion + } nodeGroup := management.NodeGroup{ DesiredSize: nodeGroupConfig.DesiredSize, DiskSize: nodeGroupConfig.DiskSize, @@ -83,7 +90,7 @@ func nodeGroupsConstructor(nodeGroupsConfig *[]NodeGroupConfig, kubernetesVersio Subnets: &nodeGroupConfig.Subnets, Tags: &nodeGroupConfig.Tags, UserData: nodeGroupConfig.UserData, - Version: &kubernetesVersion, + Version: version, } nodeGroups = append(nodeGroups, nodeGroup) } diff --git a/extensions/etcdsnapshot/etcdsnapshot.go b/extensions/etcdsnapshot/etcdsnapshot.go index 7a03612f..3c4148aa 100644 --- a/extensions/etcdsnapshot/etcdsnapshot.go +++ b/extensions/etcdsnapshot/etcdsnapshot.go @@ -2,8 +2,9 @@ package etcdsnapshot import ( "context" - "sort" - "strings" + "errors" + "fmt" + "net/url" "time" "github.com/rancher/norman/types" @@ -14,102 +15,44 @@ import ( rancherv1 "github.com/rancher/shepherd/clients/rancher/v1" "github.com/rancher/shepherd/extensions/clusters" "github.com/rancher/shepherd/extensions/defaults" - "github.com/rancher/shepherd/extensions/defaults/stevetypes" "github.com/sirupsen/logrus" kwait "k8s.io/apimachinery/pkg/util/wait" ) const ( ProvisioningSteveResouceType = "provisioning.cattle.io.cluster" + SnapshotSteveResourceType = "rke.cattle.io.etcdsnapshot" + SnapshotClusterNameLabel = "rke.cattle.io/cluster-name" fleetNamespace = "fleet-default" localClusterName = "local" active = "active" readyStatus = "Resource is ready" ) -// GetRKE1Snapshots is a helper function to get the existing snapshots for a downstream RKE1 cluster. -func GetRKE1Snapshots(client *rancher.Client, clusterName string) ([]management.EtcdBackup, error) { - clusterID, err := clusters.GetClusterIDByName(client, clusterName) - if err != nil { - return nil, err - } - - snapshotSteveObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ - Filters: map[string]interface{}{ - "clusterId": clusterID, - }, - }) - if err != nil { - return nil, err - } - - snapshots := []management.EtcdBackup{} - - for _, snapshot := range snapshotSteveObjList.Data { - if strings.Contains(snapshot.Name, clusterID) { - snapshots = append(snapshots, snapshot) - } - } - - sort.Slice(snapshots, func(i, j int) bool { - return snapshots[i].Created > snapshots[j].Created - }) - - return snapshots, nil -} - -// GetRKE2K3SSnapshots is a helper function to get the existing snapshots for a downstream RKE2/K3S cluster. -func GetRKE2K3SSnapshots(client *rancher.Client, clusterName string) ([]rancherv1.SteveAPIObject, error) { - localclusterID, err := clusters.GetClusterIDByName(client, localClusterName) - if err != nil { - return nil, err - } - - steveclient, err := client.Steve.ProxyDownstream(localclusterID) - if err != nil { - return nil, err - } - - snapshotSteveObjList, err := steveclient.SteveType(stevetypes.EtcdSnapshot).List(nil) - if err != nil { - return nil, err - } - - snapshots := []rancherv1.SteveAPIObject{} - - for _, snapshot := range snapshotSteveObjList.Data { - if strings.Contains(snapshot.ObjectMeta.Name, clusterName) { - snapshots = append(snapshots, snapshot) - } - } - - sort.Slice(snapshots, func(i, j int) bool { - return snapshots[i].ObjectMeta.CreationTimestamp.Before(&snapshots[j].ObjectMeta.CreationTimestamp) - }) +// CreateRKE1Snapshot is a helper function to create a snapshot on an RKE1 cluster. Returns error if any. +func CreateRKE1Snapshot(client *rancher.Client, clusterName string) ([]management.EtcdBackup, error) { - return snapshots, nil -} + updateTimestamp := time.Now().UTC() -// CreateRKE1Snapshot is a helper function to create a snapshot on an RKE1 cluster. Returns error if any. -func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { clusterID, err := clusters.GetClusterIDByName(client, clusterName) if err != nil { - return err + return nil, err } clusterResp, err := client.Management.Cluster.ByID(clusterID) if err != nil { - return err + return nil, err } logrus.Infof("Creating snapshot...") err = client.Management.Cluster.ActionBackupEtcd(clusterResp) if err != nil { - return err + return nil, err } + var snapshots []management.EtcdBackup err = kwait.PollUntilContextTimeout(context.TODO(), 5*time.Second, defaults.FiveMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - snapshotSteveObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ + snapshotManagementObjList, err := client.Management.EtcdBackup.ListAll(&types.ListOpts{ Filters: map[string]interface{}{ "clusterId": clusterID, }, @@ -118,7 +61,9 @@ func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { return false, nil } - for _, snapshot := range snapshotSteveObjList.Data { + snapshots = []management.EtcdBackup{} + for _, snapshot := range snapshotManagementObjList.Data { + snapshotObj, err := client.Management.EtcdBackup.ByID(snapshot.ID) if err != nil { return false, nil @@ -127,23 +72,36 @@ func CreateRKE1Snapshot(client *rancher.Client, clusterName string) error { if snapshotObj.State != active { return false, nil } + + snapshotTime, err := time.Parse(time.RFC3339, snapshot.Created) + if err != nil { + return false, err + } + + // time.Parse doesn't include nanoseconds, but time.Now() does. Rounding up by 1 Second. + snapshotTime = snapshotTime.Add(time.Second) + + if snapshotTime.Compare(updateTimestamp) > -1 { + snapshots = append(snapshots, snapshot) + } + } + + if len(snapshots) == 0 { + return false, nil } - logrus.Infof("All snapshots in the cluster are in an active state!") return true, nil }) - if err != nil { - return err - } - return nil + return snapshots, err } -// CreateRKE2K3SSnapshot is a helper function to create a snapshot on an RKE2 or k3s cluster. Returns error if any. -func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) error { +// CreateRKE2K3SSnapshot is a helper function to create a snapshot on an RKE2 or k3s cluster. +// returns the list of snapshots and an error, if any. +func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) ([]rancherv1.SteveAPIObject, error) { clusterObject, clusterSteveObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) if err != nil { - return err + return nil, err } if clusterObject.Spec.RKEConfig != nil { @@ -165,41 +123,57 @@ func CreateRKE2K3SSnapshot(client *rancher.Client, clusterName string) error { } logrus.Infof("Creating snapshot...") - _, err = client.Steve.SteveType(clusters.ProvisioningSteveResourceType).Update(clusterSteveObject, clusterObject) + updatedCluster, err := client.Steve.SteveType(clusters.ProvisioningSteveResourceType).Update(clusterSteveObject, clusterObject) if err != nil { - return err + return nil, err + } + + updateTimestamp := time.Now() + err = clusters.WaitOnClusterAfterSnapshot(client, updatedCluster.ID) + if err != nil { + return nil, err } + var snapshots []rancherv1.SteveAPIObject + err = kwait.PollUntilContextTimeout(context.TODO(), 5*time.Second, defaults.FiveMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - snapshotSteveObjList, err := client.Steve.SteveType("rke.cattle.io.etcdsnapshot").List(nil) + query, err := url.ParseQuery(fmt.Sprintf("labelSelector=%s=%s", SnapshotClusterNameLabel, clusterName)) if err != nil { return false, nil } - _, clusterSteveObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) + snapshotSteveObjList, err := client.Steve.SteveType(SnapshotSteveResourceType).List(query) if err != nil { return false, nil } + if len(snapshotSteveObjList.Data) == 0 { + return false, nil + } + + snapshots = []rancherv1.SteveAPIObject{} for _, snapshot := range snapshotSteveObjList.Data { - snapshotObj, err := client.Steve.SteveType("rke.cattle.io.etcdsnapshot").ByID(snapshot.ID) + _, err = client.Steve.SteveType(SnapshotSteveResourceType).ByID(snapshot.ID) if err != nil { return false, nil } - if snapshotObj.ObjectMeta.State.Name == active && clusterSteveObject.ObjectMeta.State.Name == active { - logrus.Infof("All snapshots in the cluster are in an active state!") - return true, nil + // snapshot time doesn't include nanoseconds, but time.Now() does. Rounding up by 1 Second. + if snapshot.CreationTimestamp.Time.Add(time.Duration(time.Second)).Compare(updateTimestamp) > -1 { + snapshots = append(snapshots, snapshot) } } - return false, nil + if len(snapshots) == 0 { + return false, nil + } + + return true, nil }) - if err != nil { - return err - } - return nil + // not registering cleanup func; users do not delete snapshots through rancher + + return snapshots, err } // RestoreRKE1Snapshot is a helper function to restore a snapshot on an RKE1 cluster. Returns error if any. @@ -258,62 +232,50 @@ func RestoreRKE1Snapshot(client *rancher.Client, clusterName string, snapshotRes // RestoreRKE2K3SSnapshot is a helper function to restore a snapshot on an RKE2 or k3s cluster. Returns error if any. func RestoreRKE2K3SSnapshot(client *rancher.Client, snapshotRestore *rkev1.ETCDSnapshotRestore, clusterName string) error { - clusterObject, existingSteveAPIObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) + _, existingSteveAPIObject, err := clusters.GetProvisioningClusterByName(client, clusterName, fleetNamespace) if err != nil { return err } + steveWithUpdates := existingSteveAPIObject - clusterObject.Spec.RKEConfig.ETCDSnapshotRestore = snapshotRestore - - logrus.Infof("Restoring snapshot: %v", snapshotRestore.Name) - updatedCluster, err := client.Steve.SteveType(ProvisioningSteveResouceType).Update(existingSteveAPIObject, clusterObject) + clusterSpec := &apisV1.ClusterSpec{} + err = rancherv1.ConvertToK8sType(steveWithUpdates.Spec, clusterSpec) if err != nil { return err } - err = kwait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, defaults.OneMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { - clusterResp, err := client.Steve.SteveType(ProvisioningSteveResouceType).ByID(updatedCluster.ID) - if err != nil { - return false, err - } - - clusterStatus := &apisV1.ClusterStatus{} - err = rancherv1.ConvertToK8sType(clusterResp.Status, clusterStatus) - if err != nil { - return false, err - } + clusterSpec.RKEConfig.ETCDSnapshotRestore = snapshotRestore - if clusterResp.ObjectMeta.State.Name != active { - return true, nil - } + steveWithUpdates.Spec = clusterSpec - return false, nil - }) + logrus.Infof("Restoring snapshot: %v", snapshotRestore.Name) + updatedCluster, err := client.Steve.SteveType(ProvisioningSteveResouceType).Update(existingSteveAPIObject, steveWithUpdates) if err != nil { return err } - err = kwait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { + err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.TwoMinuteTimeout, true, func(ctx context.Context) (done bool, err error) { clusterResp, err := client.Steve.SteveType(ProvisioningSteveResouceType).ByID(updatedCluster.ID) if err != nil { return false, err } + if clusterResp.State.Error { + return false, errors.New(clusterResp.State.Message) + } + clusterStatus := &apisV1.ClusterStatus{} err = rancherv1.ConvertToK8sType(clusterResp.Status, clusterStatus) if err != nil { return false, err } - if clusterResp.ObjectMeta.State.Name == active { + if clusterResp.State.Message == "waiting for etcd restore" { return true, nil } return false, nil }) - if err != nil { - return err - } - return nil + return err }