Skip to content

Commit

Permalink
Merge rancher/shepherd branch 'release/v2.8' commit:3a3ced82b56295ef0…
Browse files Browse the repository at this point in the history
…150dd423944994c92a50f85 into cnrancher release/v2.8-ent
  • Loading branch information
jianghang8421 committed Oct 25, 2024
2 parents eb1218f + 3a3ced8 commit 66d6e14
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 136 deletions.
8 changes: 8 additions & 0 deletions clients/corral/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type Packages struct {
HasCustomRepo string `json:"hasCustomRepo" yaml:"hasCustomRepo"`
}

// Args is a struct that contains arguments to a corral create command, and any updates to the config
// that should be applied before creating the corral
type Args struct {
Name string
PackageName string
Updates map[string]string
}

// PackagesConfig is a function that reads in the corral package object from the config file
func PackagesConfig() *Packages {
var corralPackages Packages
Expand Down
142 changes: 135 additions & 7 deletions clients/corral/corral.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/rancher/shepherd/pkg/session"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -70,29 +74,153 @@ func SetCustomRepo(repo string) error {

// CreateCorral creates a corral taking the corral name, the package path, and a debug set so if someone wants to view the
// corral create log
func CreateCorral(ts *session.Session, corralName, packageName string, debug bool, cleanup bool) ([]byte, error) {
func CreateCorral(ts *session.Session, corralName, packageName string, debug, cleanup bool) ([]byte, error) {
command, err := startCorral(ts, corralName, packageName, debug, cleanup)
if err != nil {
return nil, err
}

return runAndWaitOnCommand(command)
}

func runAndWaitOnCommand(command *exec.Cmd) ([]byte, error) {
err := command.Wait()
var msg []byte
if command.Stdout != nil {
msg = command.Stdout.(*bytes.Buffer).Bytes()
}

if msg != nil {
logrus.Infof("Stdout: %s", string(msg))
}

return msg, errors.Wrap(err, "Debug: "+string(msg))
}

func startCorral(ts *session.Session, corralName, packageName string, debug, cleanup bool) (*exec.Cmd, error) {
ts.RegisterCleanupFunc(func() error {
return DeleteCorral(corralName)
})

args := []string{"create"}

if !cleanup {
args = append(args, skipCleanupFlag)
}
if debug {
args = append(args, debugFlag)
}

args = append(args, corralName, packageName)
logrus.Infof("Creating corral with the following parameters: %v", args)
// complicated, but running the command in a way that allows us to
// capture the output and error(s) and print it to the console
msg, err := exec.Command("corral", args...).CombinedOutput()
logrus.Infof("Corral create output: %s", string(msg))

cmdToRun := exec.Command("corral", args...)

// create a buffer for stdout/stderr so we can read from it later. commands initiate this to nil by default.
var b bytes.Buffer
cmdToRun.Stdout = &b
cmdToRun.Stderr = &b
err := cmdToRun.Start()
if err != nil {
return nil, err
}

// this ensures corral is completely initiated. Otherwise, race conditions occur.
err = waitForCorralConfig(corralName)
if err != nil {
return nil, err
}

return cmdToRun, err
}

func waitForCorralConfig(corralName string) error {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.1,
Jitter: 0.1,
Steps: 10,
}

homeDir, err := os.UserHomeDir()
if err != nil {
return nil, errors.Wrap(err, "Unable to create corral: "+string(msg))
return err
}

corralOSPath := homeDir + "/.corral/corrals/" + corralName + "/corral.yaml"

return wait.ExponentialBackoff(backoff, func() (finished bool, err error) {
_, err = os.Stat(corralOSPath)
if err != nil {
return false, nil
}

fileContents, err := os.ReadFile(corralOSPath)
if err != nil {
return false, nil
}

if len(string(fileContents)) <= 0 {
return false, nil
}

return true, err
})
}

// CreateMultipleCorrals creates corrals taking the corral name, the package path, and a debug set so if someone wants to view the
// corral create log. Using this function implies calling WaitOnCorralWithCombinedOutput to get the output once finished.
func CreateMultipleCorrals(ts *session.Session, commands []Args, debug, cleanup bool) ([][]byte, error) {
var waitGroup sync.WaitGroup

var msgs [][]byte
var errStrings []string

for _, currentCommand := range commands {
// break out of any error that comes up before we run the waitGroup, to avoid running if we're already in an error state.
for key, value := range currentCommand.Updates {
logrus.Info(key, ": ", value)
err := UpdateCorralConfig(key, value)
if err != nil {
errStrings = append(errStrings, fmt.Sprint(err.Error(), "Unable to update corral: "+currentCommand.Name+" for "+key+": "+value))
break
}
}

cmdToRun, err := startCorral(ts, currentCommand.Name, currentCommand.PackageName, debug, cleanup)
if err != nil {
errStrings = append(errStrings, err.Error())
break
}

waitGroup.Add(1)

go func() {
defer waitGroup.Done()

msg, err := runAndWaitOnCommand(cmdToRun)
if err != nil {
errStrings = append(errStrings, err.Error())
}

msgs = append(msgs, msg)
}()

}

waitGroup.Wait()

var formattedError error
var longString string
if len(errStrings) > 0 {
for _, err := range errStrings {
longString += err
}
formattedError = fmt.Errorf(longString)
}

return msg, nil
logrus.Info("done with registration")
return msgs, formattedError
}

// DeleteCorral deletes a corral based on the corral name
Expand Down
68 changes: 59 additions & 9 deletions extensions/clusters/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clusters
import (
"context"
"fmt"
"strings"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -400,17 +400,22 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err
if err != nil {
return
}

checkFuncWaitToBeInUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.Transitioning && !summarizedCluster.Error && (summarizedCluster.State == clusterStateUpdating || summarizedCluster.State == clusterStateUpgrading) {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)
}

Expand Down Expand Up @@ -440,16 +445,20 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro
return
}
checkFuncWaitUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.IsReady() {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)

}
Expand Down Expand Up @@ -483,12 +492,53 @@ func WaitClusterToBeUpgraded(client *rancher.Client, clusterID string) (err erro
return
}

func isClusterInaccessible(messages []string) (isInaccessible bool) {
clusterCPErrorMessage := "Cluster health check failed: Failed to communicate with API server during namespace check" // For GKE
clusterModifiedErrorMessage := "the object has been modified" // For provisioning node driver K3s and RKE2
// WaitOnClusterAfterSnapshot waits for a cluster to finish taking a snapshot and return to an active state.
func WaitOnClusterAfterSnapshot(client *rancher.Client, clusterID string) error {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return err
}

isTransitioning := cluster.State == nil || cluster.State.Transitioning

if !isTransitioning {
err = kwait.PollUntilContextTimeout(context.TODO(), defaults.FiveHundredMillisecondTimeout, defaults.OneMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.
if cluster.State == nil {
return false, nil
}
return cluster.State.Transitioning, nil
})
if err != nil {
return err
}
}

err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

if cluster.State == nil {
return false, nil
}
// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.

return cluster.State.Name == active, nil
})

return err
}

func isClusterInaccessible(messages, acceptableErrors []string) (isInaccessible bool) {
for _, message := range messages {
if strings.Contains(message, clusterCPErrorMessage) || strings.Contains(message, clusterModifiedErrorMessage) {
if slices.Contains(acceptableErrors, message) {
isInaccessible = true
break
}
Expand Down
9 changes: 8 additions & 1 deletion extensions/clusters/eks/eks_cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type NodeGroupConfig struct {
Subnets []string `json:"subnets" yaml:"subnets"`
Tags map[string]string `json:"tags" yaml:"tags"`
UserData *string `json:"userData,omitempty" yaml:"userData,omitempty"`
Version *string `json:"version,omitempty" yaml:"version,omitempty"`
}

// LaunchTemplateConfig is the configuration need for a node group launch template
Expand All @@ -64,6 +65,12 @@ func nodeGroupsConstructor(nodeGroupsConfig *[]NodeGroupConfig, kubernetesVersio
Version: nodeGroupConfig.LaunchTemplateConfig.Version,
}
}
var version *string
if nodeGroupConfig.Version != nil {
version = nodeGroupConfig.Version
} else {
version = &kubernetesVersion
}
nodeGroup := management.NodeGroup{
DesiredSize: nodeGroupConfig.DesiredSize,
DiskSize: nodeGroupConfig.DiskSize,
Expand All @@ -83,7 +90,7 @@ func nodeGroupsConstructor(nodeGroupsConfig *[]NodeGroupConfig, kubernetesVersio
Subnets: &nodeGroupConfig.Subnets,
Tags: &nodeGroupConfig.Tags,
UserData: nodeGroupConfig.UserData,
Version: &kubernetesVersion,
Version: version,
}
nodeGroups = append(nodeGroups, nodeGroup)
}
Expand Down
Loading

0 comments on commit 66d6e14

Please sign in to comment.